From 7306262e4fa18e87bad4413a460ecd55266197d0 Mon Sep 17 00:00:00 2001 From: Brendan Hansen Date: Thu, 21 Oct 2021 14:02:12 -0500 Subject: [PATCH] added queuable work units for tasking to threads --- site/js/onyx-loader.js | 15 ++++- site/js/webgl2.js | 8 +++ src/app/app.onyx | 31 +++------ src/app/debug_log.onyx | 13 ++++ src/app/work_units.onyx | 125 ++++++++++++++++++++++++++++++++++++ src/build.onyx | 1 + src/features/wasm/wasm.onyx | 13 +++- 7 files changed, 180 insertions(+), 26 deletions(-) create mode 100644 src/app/work_units.onyx diff --git a/site/js/onyx-loader.js b/site/js/onyx-loader.js index e60ae57..bb2514d 100644 --- a/site/js/onyx-loader.js +++ b/site/js/onyx-loader.js @@ -4,6 +4,7 @@ window.ONYX_MEMORY = null; window.ONYX_INSTANCE = null; window.ONYX_BYTES = null; window.ONYX_THREAD_SCRIPT = "onyx-thread.js"; +window.ONYX_WORKERS = {}; window.ONYX_MODULES.push({ module_name: "host", @@ -28,8 +29,8 @@ window.ONYX_MODULES.push({ } } - const worker = new Worker(window.ONYX_THREAD_SCRIPT); - worker.postMessage({ + window.ONYX_WORKERS[id] = new Worker(window.ONYX_THREAD_SCRIPT); + window.ONYX_WORKERS[id].postMessage({ thread_id : id, memory : window.ONYX_MEMORY, wasm_bytes : window.ONYX_BYTES, @@ -45,6 +46,16 @@ window.ONYX_MODULES.push({ return 0; } }, + + kill_thread(id) { + if (window.ONYX_WORKERS[id] == null) return 0; + + window.ONYX_WORKERS[id].terminate(); + delete window.ONYX_WORKERS[id]; + ONYX_WORKERS[id] = null; + + return 1; + }, }); function onyx_decode_text(ptr, len) { diff --git a/site/js/webgl2.js b/site/js/webgl2.js index 6964651..32c28af 100644 --- a/site/js/webgl2.js +++ b/site/js/webgl2.js @@ -284,6 +284,14 @@ window.ONYX_MODULES.push({ uniform3i(loc, x, y, z) { gl.uniform3i(uniformlocs[loc], x, y, z); }, uniform4f(loc, x, y, z, w) { gl.uniform4f(uniformlocs[loc], x, y, z, w); }, uniform4i(loc, x, y, z, w) { gl.uniform4i(uniformlocs[loc], x, y, z, w); }, + uniform1iv(loc, valueptr, valuelen) { gl.uniform1iv(uniformlocs[loc], new Int32Array(window.ONYX_MEMORY.buffer, valueptr, valuelen)); }, + uniform1fv(loc, valueptr, valuelen) { gl.uniform1fv(uniformlocs[loc], new Float32Array(window.ONYX_MEMORY.buffer, valueptr, valuelen)); }, + uniform2iv(loc, valueptr, valuelen) { gl.uniform2iv(uniformlocs[loc], new Int32Array(window.ONYX_MEMORY.buffer, valueptr, valuelen * 2)); }, + uniform2fv(loc, valueptr, valuelen) { gl.uniform2fv(uniformlocs[loc], new Float32Array(window.ONYX_MEMORY.buffer, valueptr, valuelen * 2)); }, + uniform3iv(loc, valueptr, valuelen) { gl.uniform3iv(uniformlocs[loc], new Int32Array(window.ONYX_MEMORY.buffer, valueptr, valuelen * 3)); }, + uniform3fv(loc, valueptr, valuelen) { gl.uniform3fv(uniformlocs[loc], new Float32Array(window.ONYX_MEMORY.buffer, valueptr, valuelen * 3)); }, + uniform4iv(loc, valueptr, valuelen) { gl.uniform4iv(uniformlocs[loc], new Int32Array(window.ONYX_MEMORY.buffer, valueptr, valuelen * 4)); }, + uniform4fv(loc, valueptr, valuelen) { gl.uniform4fv(uniformlocs[loc], new Float32Array(window.ONYX_MEMORY.buffer, valueptr, valuelen * 4)); }, uniformMatrix2(loc, transpose, valueptr) { const data = new Float32Array(window.ONYX_MEMORY.buffer, valueptr, 4); gl.uniformMatrix2fv(uniformlocs[loc], transpose, data); diff --git a/src/app/app.onyx b/src/app/app.onyx index c7b4d83..a082166 100644 --- a/src/app/app.onyx +++ b/src/app/app.onyx @@ -61,6 +61,8 @@ Application_Window :: struct { init :: () { debug_init(); + work_units_initialize(); + __initialize(^state); window_buffer := memory.make_slice(Application_Window, MAXIMUM_WINDOWS); state.windows_store = alloc.pool.make(window_buffer); @@ -196,30 +198,15 @@ init :: () { } } - use package core.intrinsics.atomics - - #persist #thread_local t : thread.Thread; - #persist #thread_local t2 : thread.Thread; - #persist barrier : sync.Barrier; - sync.barrier_init(^barrier, 2); - - thread.spawn(^t, null, (_: ^i32) { - printf("Hello from another thread!\n"); + #persist value := 0; + work_unit_submit(^value, (x: ^i32) -> Work_Unit_Status { + use package core.intrinsics.atomics + __atomic_wait(cast(^i32) null, 0, 1000000000); - for i: 1000 { - sync.barrier_wait(^barrier); - printf("Hello from another thread! {}\n", i); - } - }); + printf("Value: {}\n", *x); + *x += 1; - thread.spawn(^t2, null, (_: ^i32) { - i := 1; - while true { - i *= 2; - __atomic_wait(_, 0, 1000000000); - sync.barrier_wait(^barrier); - debug_log(.Warning, "Debug message: {}\n", i); - } + return (.Not_Done) if *x < 100 else .Failed; }); } diff --git a/src/app/debug_log.onyx b/src/app/debug_log.onyx index d743fd1..fa054ec 100644 --- a/src/app/debug_log.onyx +++ b/src/app/debug_log.onyx @@ -3,6 +3,7 @@ package debug use package core #private_file { + app :: package app ui :: package ui gfx :: package immediate_mode config :: package config @@ -122,6 +123,18 @@ draw_debug_log :: (window_rectangle: ui.Rectangle, site := #callsite) { if ui.button(clear_button_rect, "Clear log") { debug_log_clear(); } + + r = clear_button_rect; + r.y0 += 50; + r.y1 += 25; + + workers_used := app.workers_being_used(); + for i: workers_used.count { + ui.draw_rect(r, (.{0,0,1}) if workers_used[i] else .{0,0,0.2}); + + r.x0 += (200.0f / ~~workers_used.count); + r.x1 = r.x0 + (200.0f / ~~workers_used.count); + } } #private_file log_buffer : struct { diff --git a/src/app/work_units.onyx b/src/app/work_units.onyx new file mode 100644 index 0000000..8730559 --- /dev/null +++ b/src/app/work_units.onyx @@ -0,0 +1,125 @@ +package app + +use package core +use package debug { debug_log } + +Work_Unit_Status :: enum { + Success; + Failed; + Not_Done; +} + +Work_Unit :: struct { + func: (rawptr) -> Work_Unit_Status; + data: rawptr; + + id : i32; + next : ^Work_Unit; +} + +work_units_initialize :: () { + work_unit_buffer = memory.make_slice(Work_Unit, MAXIMUM_WORK_UNITS); + work_unit_pool = alloc.pool.make(work_unit_buffer); + + sync.mutex_init(^work_unit_mutex); + sync.semaphore_init(^work_unit_signal, 0); + + next_work_unit_id = 1; + + for i: WORKER_COUNT { + worker_data[i].id = i; + thread.spawn(^worker_threads[i], ^worker_data[i], work_unit_processor); + } +} + +work_unit_submit :: (data: rawptr, func: (rawptr) -> Work_Unit_Status) { + sync.mutex_lock(^work_unit_mutex); + work_unit := alloc.pool.pool_alloc(^work_unit_pool); + + work_unit.func = func; + work_unit.data = data; + work_unit.id = next_work_unit_id; + work_unit.next = null; + next_work_unit_id += 1; + + work_unit_insert(work_unit); + sync.mutex_unlock(^work_unit_mutex); +} + +work_unit_insert :: (w: ^Work_Unit) { + sync.mutex_lock(^work_unit_mutex); + + if first_work_unit == null { + first_work_unit = w; + } else { + work_unit := first_work_unit; + while work_unit.next != null do work_unit = work_unit.next; + + work_unit.next = w; + } + + sync.semaphore_post(^work_unit_signal, 1); + sync.mutex_unlock(^work_unit_mutex); +} + +workers_being_used :: () -> [WORKER_COUNT] bool { + used: [WORKER_COUNT] bool; + for i: WORKER_COUNT { + used[i] = worker_data[i].working; + } + + return used; +} + + +#private_file { + +WORKER_COUNT :: 4 +MAXIMUM_WORK_UNITS :: 64 + +Worker_Data :: struct { + id : i32; + working : bool; +} + +worker_data : [WORKER_COUNT] Worker_Data; +worker_threads : [WORKER_COUNT] thread.Thread; + +work_unit_buffer : [] Work_Unit; +work_unit_pool : alloc.pool.PoolAllocator(Work_Unit); + +next_work_unit_id : i32; +first_work_unit : ^Work_Unit; + +work_unit_mutex : sync.Mutex; +work_unit_signal : sync.Semaphore; + +work_unit_processor :: (thread_data: ^Worker_Data) { + while true { + sync.semaphore_wait(^work_unit_signal); + thread_data.working = true; + + sync.mutex_lock(^work_unit_mutex); + work_unit := first_work_unit; + assert(work_unit != null, "work_unit was null."); + first_work_unit = first_work_unit.next; + sync.mutex_unlock(^work_unit_mutex); + + // debug_log(.Debug, "Tasked {} on worker {}.", work_unit.func, thread_data.id); + + result := work_unit.func(work_unit.data); + switch result { + case .Failed { + debug_log(.Error, "Work unit {} failed.", next_work_unit_id); + } + + case .Not_Done { + work_unit_insert(work_unit); + } + } + + thread_data.working = false; + } +} + +} \ No newline at end of file diff --git a/src/build.onyx b/src/build.onyx index 357a339..12b6b76 100644 --- a/src/build.onyx +++ b/src/build.onyx @@ -24,6 +24,7 @@ #load "src/app/storage" #load "src/app/settings" #load "src/app/editor" + #load "src/app/work_units" #load "src/features/load_features" diff --git a/src/features/wasm/wasm.onyx b/src/features/wasm/wasm.onyx index d82e49b..88f0195 100644 --- a/src/features/wasm/wasm.onyx +++ b/src/features/wasm/wasm.onyx @@ -84,7 +84,9 @@ info_window_draw :: (_, win) => { } } -file_loaded :: () { +wasm_processing_thread: thread.Thread; + +process_file :: (_) => { if wasm_analyzed { wasm_utils.free(^wasm_state); wasm_utils.free_sections(^wasm_sections); @@ -95,7 +97,7 @@ file_loaded :: () { file_data := app.state.file.data; debug_log(.Info, "Wasm feature noticed file dropped with size {}", file_data.count); - if !string.starts_with(file_data, u8.[ 0, #char "a", #char "s", #char "m" ]) do return; + if !string.starts_with(file_data, u8.[ 0, #char "a", #char "s", #char "m" ]) do return app.Work_Unit_Status.Failed; debug_log(.Info, "THIS IS PROBABLY A WASM BINARY. ANALYZING!!!"); @@ -109,4 +111,11 @@ file_loaded :: () { } wasm_analyzed = true; + + return app.Work_Unit_Status.Success; +} + +file_loaded :: () { + app.work_unit_submit(null, process_file); + // thread.spawn(^wasm_processing_thread, null, process_file); } -- 2.25.1