From: Brendan Hansen Date: Sun, 17 Oct 2021 20:59:24 +0000 (-0500) Subject: added basic implementation of Mutex and Thread. made heap thread safe X-Git-Url: https://git.brendanfh.com/?a=commitdiff_plain;h=e236d43e0348fd3bd31aba321bf032052164dcdc;p=onyx.git added basic implementation of Mutex and Thread. made heap thread safe --- diff --git a/bin/onyx-js b/bin/onyx-js index 0aa2a126..36668b58 100755 --- a/bin/onyx-js +++ b/bin/onyx-js @@ -21,10 +21,11 @@ const ENV = { process.exit(status); }, - spawn_thread(funcidx, dataptr) { + spawn_thread(id, funcidx, dataptr) { try { const worker = new Worker(__filename, { workerData: { + thread_id: id, memory: wasm_memory, wasm_bytes: wasm_bytes, funcidx: funcidx, @@ -32,11 +33,6 @@ const ENV = { }, }); - /*worker.on("exit", (code) => { - console.log("THREAD STOPPED"); - }); - */ - return 1; } catch (e) { @@ -63,7 +59,7 @@ if (isMainThread) { }); } else { - let { memory, wasm_bytes, funcidx, dataptr } = workerData; + let { thread_id, memory, wasm_bytes, funcidx, dataptr } = workerData; ENV.onyx.memory = memory; wasm_memory = memory; @@ -75,6 +71,7 @@ if (isMainThread) { const lib = res.instance.exports; lib._thread_start(funcidx, dataptr); + lib._thread_exit(thread_id); }); } diff --git a/core/alloc/heap.onyx b/core/alloc/heap.onyx index 09645b8e..73bd017a 100644 --- a/core/alloc/heap.onyx +++ b/core/alloc/heap.onyx @@ -13,6 +13,10 @@ Enable_Debug :: false #load "core/intrinsics/wasm" +#if runtime.Multi_Threading_Enabled { + heap_mutex: sync.Mutex; +} + init :: () { heap_state.free_list = null; heap_state.next_alloc = cast(rawptr) (cast(uintptr) __heap_start + 8); @@ -21,6 +25,10 @@ init :: () { use package core.alloc { heap_allocator } heap_allocator.data = ^heap_state; heap_allocator.func = heap_alloc_proc; + + #if runtime.Multi_Threading_Enabled { + sync.mutex_init(^heap_mutex); + } } get_watermark :: () => cast(u32) heap_state.next_alloc; @@ -31,8 +39,10 @@ get_watermark :: () => cast(u32) heap_state.next_alloc; memory_copy, memory_fill, } - memory :: package core.memory - math :: package core.math + memory :: package core.memory + math :: package core.math + runtime :: package runtime + sync :: package core.sync uintptr :: #type u32 @@ -66,6 +76,10 @@ get_watermark :: () => cast(u32) heap_state.next_alloc; heap_alloc :: (size_: u32, align: u32) -> rawptr { if size_ == 0 do return null; + #if runtime.Multi_Threading_Enabled { + sync.scoped_mutex(^heap_mutex); + } + size := size_ + sizeof heap_block; size = math.max(size, sizeof heap_freed_block); memory.align(~~^size, ~~align); @@ -137,6 +151,9 @@ get_watermark :: () => cast(u32) heap_state.next_alloc; heap_free :: (ptr: rawptr) { #if Enable_Debug do assert(ptr != null, "Trying to free a null pointer."); + #if runtime.Multi_Threading_Enabled { + sync.scoped_mutex(^heap_mutex); + } hb_ptr := cast(^heap_freed_block) (cast(uintptr) ptr - sizeof heap_allocated_block); #if Enable_Debug do assert(hb_ptr.size & Allocated_Flag == Allocated_Flag, "Corrupted heap on free. This could be due to a double free, or using memory past were you allocated it."); @@ -174,6 +191,10 @@ get_watermark :: () => cast(u32) heap_state.next_alloc; heap_resize :: (ptr: rawptr, new_size_: u32, align: u32) -> rawptr { if ptr == null do return null; + + #if runtime.Multi_Threading_Enabled { + sync.scoped_mutex(^heap_mutex); + } new_size := new_size_ + sizeof heap_block; new_size = math.max(new_size, sizeof heap_freed_block); diff --git a/core/runtime/common.onyx b/core/runtime/common.onyx index 8a3f719a..5064c717 100644 --- a/core/runtime/common.onyx +++ b/core/runtime/common.onyx @@ -30,5 +30,9 @@ __runtime_initialize :: () { context.assert_handler = __assert_handler; __stdio_init(); + + #if Multi_Threading_Enabled { + thread.__initialize(); + } } diff --git a/core/runtime/js.onyx b/core/runtime/js.onyx index 8155181c..63aad0f5 100644 --- a/core/runtime/js.onyx +++ b/core/runtime/js.onyx @@ -19,13 +19,17 @@ __exit :: (status: i32) -> void #foreign "host" "exit" --- } #if Multi_Threading_Enabled { - __spawn_thread :: (func: (data: rawptr) -> void, data: rawptr) -> bool #foreign "host" "spawn_thread" --- + __spawn_thread :: (id: i32, func: (data: rawptr) -> void, data: rawptr) -> bool #foreign "host" "spawn_thread" --- #export "_thread_start" (func: (data: rawptr) -> void, data: rawptr) { - // Do thread initialization stuff... - // Just has to setup a stack frame for itself __stack_top = raw_alloc(context.allocator, 1 << 20); + // Need to initialize thread-local variables here + func(data); } + + #export "_thread_exit" (id: i32) { + thread.__exited(id); + } } \ No newline at end of file diff --git a/core/std.onyx b/core/std.onyx index 43602d9d..3b9bc8c5 100644 --- a/core/std.onyx +++ b/core/std.onyx @@ -53,6 +53,8 @@ package core #load "./stdio" } -#if #defined(runtime.Allow_Multi_Threading) { +#if runtime.Multi_Threading_Enabled { #load "./intrinsics/atomics" + #load "./sync/mutex" + #load "./threads/thread" } \ No newline at end of file diff --git a/core/sync/mutex.onyx b/core/sync/mutex.onyx new file mode 100644 index 00000000..4b8292b0 --- /dev/null +++ b/core/sync/mutex.onyx @@ -0,0 +1,35 @@ +package core.sync + +use package core.intrinsics.atomics + +Mutex :: struct { + lock : i32; + // owner : Thread_Id; +} + +mutex_init :: (m: ^Mutex) { + m.lock = 0; +} + +mutex_destroy :: (m: ^Mutex) { + m.lock = -1; +} + +mutex_lock :: (m: ^Mutex) { + while __atomic_cmpxchg(^m.lock, 0, 1) == 1 { + __atomic_wait(^m.lock, 0); + } +} + +mutex_unlock :: (m: ^Mutex) { + __atomic_store(^m.lock, 0); + __atomic_notify(^m.lock, maximum = 1); +} + +scoped_mutex :: macro (m: ^Mutex) { + ml :: mutex_lock + mu :: mutex_unlock + + ml(m); + defer mu(m); +} \ No newline at end of file diff --git a/core/threads/thread.onyx b/core/threads/thread.onyx new file mode 100644 index 00000000..b7204115 --- /dev/null +++ b/core/threads/thread.onyx @@ -0,0 +1,49 @@ +package core.thread + +use package core + +#private { + thread_mutex : sync.Mutex; + next_thread_id := 1; + thread_map : Map(Thread_ID, ^Thread); +} + +Thread_ID :: #type i32 + +Thread :: struct { + id : Thread_ID; + alive : bool; +} + +spawn :: (t: ^Thread, func: (rawptr) -> void, data: rawptr) { + sync.scoped_mutex(^thread_mutex); + + t.id = next_thread_id; + t.alive = true; + next_thread_id += 1; + + thread_map->put(t.id, t); + + runtime.__spawn_thread(t.id, func, data); +} + +join :: (t: ^Thread) { + while t.alive ---; +} + +__initialize :: () { + thread_map->init(); +} + +__exited :: (id: i32) { + sync.scoped_mutex(^thread_mutex); + + thread := thread_map->get(id); + if thread != null { + thread.alive = false; + thread_map->delete(id); + } +} + +#private_file runtime :: package runtime +