added queuable work units for tasking to threads
authorBrendan Hansen <brendan.f.hansen@gmail.com>
Thu, 21 Oct 2021 19:02:12 +0000 (14:02 -0500)
committerBrendan Hansen <brendan.f.hansen@gmail.com>
Thu, 21 Oct 2021 19:02:12 +0000 (14:02 -0500)
site/js/onyx-loader.js
site/js/webgl2.js
src/app/app.onyx
src/app/debug_log.onyx
src/app/work_units.onyx [new file with mode: 0644]
src/build.onyx
src/features/wasm/wasm.onyx

index e60ae573048d720db9f335b89ab4e5f24c7a1088..bb2514d4eccc535b7851c1c85a3b7e2caa1e33ae 100644 (file)
@@ -4,6 +4,7 @@ window.ONYX_MEMORY   = null;
 window.ONYX_INSTANCE = null;
 window.ONYX_BYTES    = null;
 window.ONYX_THREAD_SCRIPT = "onyx-thread.js";
+window.ONYX_WORKERS  = {};
 
 window.ONYX_MODULES.push({
     module_name: "host",
@@ -28,8 +29,8 @@ window.ONYX_MODULES.push({
                 }
             }
 
-            const worker = new Worker(window.ONYX_THREAD_SCRIPT);
-            worker.postMessage({
+            window.ONYX_WORKERS[id] = new Worker(window.ONYX_THREAD_SCRIPT);
+            window.ONYX_WORKERS[id].postMessage({
                 thread_id  : id,
                 memory     : window.ONYX_MEMORY,
                 wasm_bytes : window.ONYX_BYTES,
@@ -45,6 +46,16 @@ window.ONYX_MODULES.push({
             return 0;
         }
     },
+
+    kill_thread(id) {
+        if (window.ONYX_WORKERS[id] == null) return 0;
+
+        window.ONYX_WORKERS[id].terminate();
+        delete window.ONYX_WORKERS[id];
+        ONYX_WORKERS[id] = null;
+
+        return 1;
+    },
 });
 
 function onyx_decode_text(ptr, len) {
index 6964651b794447977e0903c6ac98bc01548a917f..32c28af204d5cc4e9961fb7c4b8535ee397d36eb 100644 (file)
@@ -284,6 +284,14 @@ window.ONYX_MODULES.push({
     uniform3i(loc, x, y, z) { gl.uniform3i(uniformlocs[loc], x, y, z); },
     uniform4f(loc, x, y, z, w) { gl.uniform4f(uniformlocs[loc], x, y, z, w); },
     uniform4i(loc, x, y, z, w) { gl.uniform4i(uniformlocs[loc], x, y, z, w); },
+    uniform1iv(loc, valueptr, valuelen) { gl.uniform1iv(uniformlocs[loc], new Int32Array(window.ONYX_MEMORY.buffer,   valueptr, valuelen)); },
+    uniform1fv(loc, valueptr, valuelen) { gl.uniform1fv(uniformlocs[loc], new Float32Array(window.ONYX_MEMORY.buffer, valueptr, valuelen)); },
+    uniform2iv(loc, valueptr, valuelen) { gl.uniform2iv(uniformlocs[loc], new Int32Array(window.ONYX_MEMORY.buffer,   valueptr, valuelen * 2)); },
+    uniform2fv(loc, valueptr, valuelen) { gl.uniform2fv(uniformlocs[loc], new Float32Array(window.ONYX_MEMORY.buffer, valueptr, valuelen * 2)); },
+    uniform3iv(loc, valueptr, valuelen) { gl.uniform3iv(uniformlocs[loc], new Int32Array(window.ONYX_MEMORY.buffer,   valueptr, valuelen * 3)); },
+    uniform3fv(loc, valueptr, valuelen) { gl.uniform3fv(uniformlocs[loc], new Float32Array(window.ONYX_MEMORY.buffer, valueptr, valuelen * 3)); },
+    uniform4iv(loc, valueptr, valuelen) { gl.uniform4iv(uniformlocs[loc], new Int32Array(window.ONYX_MEMORY.buffer,   valueptr, valuelen * 4)); },
+    uniform4fv(loc, valueptr, valuelen) { gl.uniform4fv(uniformlocs[loc], new Float32Array(window.ONYX_MEMORY.buffer, valueptr, valuelen * 4)); },
     uniformMatrix2(loc, transpose, valueptr) {
         const data = new Float32Array(window.ONYX_MEMORY.buffer, valueptr, 4);
         gl.uniformMatrix2fv(uniformlocs[loc], transpose, data);
index c7b4d839d12f162b634b330a273396b6e7c59a2a..a08216640058053d3f6c9505c20d1c1c90930bf0 100644 (file)
@@ -61,6 +61,8 @@ Application_Window :: struct {
 init :: () {
     debug_init();
 
+    work_units_initialize();
+
     __initialize(^state);
     window_buffer := memory.make_slice(Application_Window, MAXIMUM_WINDOWS);
     state.windows_store = alloc.pool.make(window_buffer);
@@ -196,30 +198,15 @@ init :: () {
         }
     }
 
-    use package core.intrinsics.atomics
-
-    #persist #thread_local t  : thread.Thread;
-    #persist #thread_local t2 : thread.Thread;
-    #persist barrier : sync.Barrier;
-    sync.barrier_init(^barrier, 2);
-
-    thread.spawn(^t, null, (_: ^i32) {
-        printf("Hello from another thread!\n");
+    #persist value := 0;
+    work_unit_submit(^value, (x: ^i32) -> Work_Unit_Status {
+        use package core.intrinsics.atomics
+        __atomic_wait(cast(^i32) null, 0, 1000000000);
 
-        for i: 1000 {
-            sync.barrier_wait(^barrier);
-            printf("Hello from another thread! {}\n", i);
-        }
-    });
+        printf("Value: {}\n", *x);
+        *x += 1;
 
-    thread.spawn(^t2, null, (_: ^i32) {
-        i := 1;
-        while true {
-            i *= 2;
-            __atomic_wait(_, 0, 1000000000);
-            sync.barrier_wait(^barrier);
-            debug_log(.Warning, "Debug message: {}\n", i);
-        }
+        return (.Not_Done) if *x < 100 else .Failed;
     });
 }
 
index d743fd194daad0544d069ec1ebc0479cd7d81505..fa054ec20f9d5804c9939f00f98db990d64956f9 100644 (file)
@@ -3,6 +3,7 @@ package debug
 use package core
 
 #private_file {
+    app :: package app
     ui  :: package ui
     gfx :: package immediate_mode
     config :: package config
@@ -122,6 +123,18 @@ draw_debug_log :: (window_rectangle: ui.Rectangle, site := #callsite) {
     if ui.button(clear_button_rect, "Clear log") {
         debug_log_clear();
     }
+
+    r = clear_button_rect;
+    r.y0 += 50;
+    r.y1 += 25;
+
+    workers_used := app.workers_being_used();
+    for i: workers_used.count {
+        ui.draw_rect(r, (.{0,0,1}) if workers_used[i] else .{0,0,0.2});
+
+        r.x0 += (200.0f / ~~workers_used.count);
+        r.x1  = r.x0 + (200.0f / ~~workers_used.count);
+    }
 }
 
 #private_file log_buffer : struct {
diff --git a/src/app/work_units.onyx b/src/app/work_units.onyx
new file mode 100644 (file)
index 0000000..8730559
--- /dev/null
@@ -0,0 +1,125 @@
+package app
+
+use package core
+use package debug { debug_log }
+
+Work_Unit_Status :: enum {
+    Success;
+    Failed;
+    Not_Done;
+}
+
+Work_Unit :: struct {
+    func: (rawptr) -> Work_Unit_Status;
+    data: rawptr;
+
+    id   : i32;
+    next : ^Work_Unit;
+}
+
+work_units_initialize :: () {
+    work_unit_buffer = memory.make_slice(Work_Unit, MAXIMUM_WORK_UNITS);
+    work_unit_pool   = alloc.pool.make(work_unit_buffer);
+
+    sync.mutex_init(^work_unit_mutex);
+    sync.semaphore_init(^work_unit_signal, 0);
+
+    next_work_unit_id = 1;
+
+    for i: WORKER_COUNT {
+        worker_data[i].id = i;
+        thread.spawn(^worker_threads[i], ^worker_data[i], work_unit_processor);
+    }
+}
+
+work_unit_submit :: (data: rawptr, func: (rawptr) -> Work_Unit_Status) {
+    sync.mutex_lock(^work_unit_mutex);
+    work_unit := alloc.pool.pool_alloc(^work_unit_pool);
+
+    work_unit.func = func;
+    work_unit.data = data;
+    work_unit.id   = next_work_unit_id;
+    work_unit.next = null;
+    next_work_unit_id += 1;
+
+    work_unit_insert(work_unit);
+    sync.mutex_unlock(^work_unit_mutex);
+}
+
+work_unit_insert :: (w: ^Work_Unit) {
+    sync.mutex_lock(^work_unit_mutex);
+
+    if first_work_unit == null {
+        first_work_unit = w;
+    } else {
+        work_unit := first_work_unit;
+        while work_unit.next != null do work_unit = work_unit.next;
+
+        work_unit.next = w;
+    }
+
+    sync.semaphore_post(^work_unit_signal, 1);
+    sync.mutex_unlock(^work_unit_mutex);
+}
+
+workers_being_used :: () -> [WORKER_COUNT] bool {
+    used: [WORKER_COUNT] bool;
+    for i: WORKER_COUNT {
+        used[i] = worker_data[i].working;
+    }
+
+    return used;
+}
+
+
+#private_file {
+
+WORKER_COUNT       :: 4
+MAXIMUM_WORK_UNITS :: 64
+
+Worker_Data :: struct {
+    id      : i32;
+    working : bool;
+}
+
+worker_data    : [WORKER_COUNT] Worker_Data;
+worker_threads : [WORKER_COUNT] thread.Thread;
+
+work_unit_buffer : [] Work_Unit;
+work_unit_pool   : alloc.pool.PoolAllocator(Work_Unit);
+
+next_work_unit_id : i32;
+first_work_unit   : ^Work_Unit;
+
+work_unit_mutex  : sync.Mutex;
+work_unit_signal : sync.Semaphore;
+
+work_unit_processor :: (thread_data: ^Worker_Data) {
+    while true {
+        sync.semaphore_wait(^work_unit_signal);
+        thread_data.working = true;
+
+        sync.mutex_lock(^work_unit_mutex);
+        work_unit := first_work_unit;
+        assert(work_unit != null, "work_unit was null.");
+        first_work_unit = first_work_unit.next;
+        sync.mutex_unlock(^work_unit_mutex);
+
+        // debug_log(.Debug, "Tasked {} on worker {}.", work_unit.func, thread_data.id);
+
+        result := work_unit.func(work_unit.data);
+        switch result {
+            case .Failed {
+                debug_log(.Error, "Work unit {} failed.", next_work_unit_id);
+            }
+
+            case .Not_Done {
+                work_unit_insert(work_unit);
+            }
+        }
+
+        thread_data.working = false;
+    }
+}
+
+}
\ No newline at end of file
index 357a3390154b8164ba59b12964870c34bc2ec070..12b6b7676f1683d6f2aab744e829ecd69fd044f0 100644 (file)
@@ -24,6 +24,7 @@
     #load "src/app/storage"
     #load "src/app/settings"
     #load "src/app/editor"
+    #load "src/app/work_units"
 
     #load "src/features/load_features"
 
index d82e49b12c1aa3b743d1480e8c265cb971cf9852..88f01957449a07d127c0689ca581eec2defe45de 100644 (file)
@@ -84,7 +84,9 @@ info_window_draw :: (_, win) => {
     }
 }
 
-file_loaded :: () {
+wasm_processing_thread: thread.Thread;
+
+process_file :: (_) => {
     if wasm_analyzed {
         wasm_utils.free(^wasm_state);
         wasm_utils.free_sections(^wasm_sections);
@@ -95,7 +97,7 @@ file_loaded :: () {
     file_data := app.state.file.data;
     debug_log(.Info, "Wasm feature noticed file dropped with size {}", file_data.count);
 
-    if !string.starts_with(file_data, u8.[ 0, #char "a", #char "s", #char "m" ]) do return;
+    if !string.starts_with(file_data, u8.[ 0, #char "a", #char "s", #char "m" ]) do return app.Work_Unit_Status.Failed;
 
     debug_log(.Info, "THIS IS PROBABLY A WASM BINARY. ANALYZING!!!");
 
@@ -109,4 +111,11 @@ file_loaded :: () {
     }
 
     wasm_analyzed = true;
+
+    return app.Work_Unit_Status.Success;
+}
+
+file_loaded :: () {
+    app.work_unit_submit(null, process_file);
+    // thread.spawn(^wasm_processing_thread, null, process_file);
 }