added basic implementation of Mutex and Thread. made heap thread safe
authorBrendan Hansen <brendan.f.hansen@gmail.com>
Sun, 17 Oct 2021 20:59:24 +0000 (15:59 -0500)
committerBrendan Hansen <brendan.f.hansen@gmail.com>
Sun, 17 Oct 2021 20:59:24 +0000 (15:59 -0500)
bin/onyx-js
core/alloc/heap.onyx
core/runtime/common.onyx
core/runtime/js.onyx
core/std.onyx
core/sync/mutex.onyx [new file with mode: 0644]
core/threads/thread.onyx [new file with mode: 0644]

index 0aa2a126a7e4bbba94806e3e753a9164f58b295f..36668b58094c30226d2eff84d500036c90c0fc9c 100755 (executable)
@@ -21,10 +21,11 @@ const ENV = {
             process.exit(status);
         },
 
-        spawn_thread(funcidx, dataptr) {
+        spawn_thread(id, funcidx, dataptr) {
             try {
                 const worker = new Worker(__filename, {
                     workerData: {
+                        thread_id: id,
                         memory: wasm_memory,
                         wasm_bytes: wasm_bytes,
                         funcidx: funcidx,
@@ -32,11 +33,6 @@ const ENV = {
                     },
                 });
 
-                /*worker.on("exit", (code) => {
-                    console.log("THREAD STOPPED");
-                });
-                */
-
                 return 1;
 
             } catch (e) {
@@ -63,7 +59,7 @@ if (isMainThread) {
         });
 
 } else {
-    let { memory, wasm_bytes, funcidx, dataptr } = workerData;
+    let { thread_id, memory, wasm_bytes, funcidx, dataptr } = workerData;
 
     ENV.onyx.memory = memory;
     wasm_memory = memory;
@@ -75,6 +71,7 @@ if (isMainThread) {
 
             const lib = res.instance.exports;
             lib._thread_start(funcidx, dataptr);
+            lib._thread_exit(thread_id);
         });
 }
 
index 09645b8e3bf3504c7f5e3f1f6e3f3febf9b9fcd3..73bd017aee4b3d75b94124e0575fd9c4e2ef2fc1 100644 (file)
@@ -13,6 +13,10 @@ Enable_Debug :: false
 
 #load "core/intrinsics/wasm"
 
+#if runtime.Multi_Threading_Enabled {
+    heap_mutex: sync.Mutex;
+}
+
 init :: () {
     heap_state.free_list = null;
     heap_state.next_alloc = cast(rawptr) (cast(uintptr) __heap_start + 8);
@@ -21,6 +25,10 @@ init :: () {
     use package core.alloc { heap_allocator }
     heap_allocator.data = ^heap_state;
     heap_allocator.func = heap_alloc_proc;
+
+    #if runtime.Multi_Threading_Enabled {
+        sync.mutex_init(^heap_mutex);
+    }
 }
 
 get_watermark :: () => cast(u32) heap_state.next_alloc;
@@ -31,8 +39,10 @@ get_watermark :: () => cast(u32) heap_state.next_alloc;
         memory_copy, memory_fill,
     }
 
-    memory :: package core.memory
-    math   :: package core.math
+    memory  :: package core.memory
+    math    :: package core.math
+    runtime :: package runtime
+    sync    :: package core.sync
 
     uintptr :: #type u32
 
@@ -66,6 +76,10 @@ get_watermark :: () => cast(u32) heap_state.next_alloc;
     heap_alloc :: (size_: u32, align: u32) -> rawptr {
         if size_ == 0 do return null;
 
+        #if runtime.Multi_Threading_Enabled {
+            sync.scoped_mutex(^heap_mutex);
+        }
+
         size := size_ + sizeof heap_block;
         size = math.max(size, sizeof heap_freed_block);
         memory.align(~~^size, ~~align);
@@ -137,6 +151,9 @@ get_watermark :: () => cast(u32) heap_state.next_alloc;
 
     heap_free :: (ptr: rawptr) {
         #if Enable_Debug do assert(ptr != null, "Trying to free a null pointer.");
+        #if runtime.Multi_Threading_Enabled {
+            sync.scoped_mutex(^heap_mutex);
+        }
 
         hb_ptr := cast(^heap_freed_block) (cast(uintptr) ptr - sizeof heap_allocated_block);
         #if Enable_Debug do assert(hb_ptr.size & Allocated_Flag == Allocated_Flag, "Corrupted heap on free. This could be due to a double free, or using memory past were you allocated it.");
@@ -174,6 +191,10 @@ get_watermark :: () => cast(u32) heap_state.next_alloc;
 
     heap_resize :: (ptr: rawptr, new_size_: u32, align: u32) -> rawptr {
         if ptr == null do return null;
+        
+        #if runtime.Multi_Threading_Enabled {
+            sync.scoped_mutex(^heap_mutex);
+        }
 
         new_size := new_size_ + sizeof heap_block;
         new_size = math.max(new_size, sizeof heap_freed_block);
index 8a3f719ac331039ed4a35c1f13c798f84a2ece71..5064c717f033ad98048e4b07bb665565577780c8 100644 (file)
@@ -30,5 +30,9 @@ __runtime_initialize :: () {
     context.assert_handler = __assert_handler;
 
     __stdio_init();
+
+    #if Multi_Threading_Enabled {
+        thread.__initialize();
+    }
 }
 
index 8155181cad086f1a41d6d2f8114a62d3fcefab0a..63aad0f550ab0601295317b7720f87f500b432e8 100644 (file)
@@ -19,13 +19,17 @@ __exit          :: (status: i32) -> void #foreign "host" "exit" ---
 }
 
 #if Multi_Threading_Enabled {
-    __spawn_thread :: (func: (data: rawptr) -> void, data: rawptr) -> bool #foreign "host" "spawn_thread" ---
+    __spawn_thread :: (id: i32, func: (data: rawptr) -> void, data: rawptr) -> bool #foreign "host" "spawn_thread" ---
 
     #export "_thread_start" (func: (data: rawptr) -> void, data: rawptr) {
-        // Do thread initialization stuff...
-        // Just has to setup a stack frame for itself
         __stack_top = raw_alloc(context.allocator, 1 << 20);
 
+        // Need to initialize thread-local variables here
+
         func(data);
     }
+
+    #export "_thread_exit" (id: i32) {
+        thread.__exited(id);
+    }
 }
\ No newline at end of file
index 43602d9db2c83f1b5aad786f2133ed1ed1417c4d..3b9bc8c5759ba17bbaa5fb6b339b063907f59c7e 100644 (file)
@@ -53,6 +53,8 @@ package core
     #load "./stdio"
 }
 
-#if #defined(runtime.Allow_Multi_Threading) {
+#if runtime.Multi_Threading_Enabled {
     #load "./intrinsics/atomics"
+    #load "./sync/mutex"
+    #load "./threads/thread"
 }
\ No newline at end of file
diff --git a/core/sync/mutex.onyx b/core/sync/mutex.onyx
new file mode 100644 (file)
index 0000000..4b8292b
--- /dev/null
@@ -0,0 +1,35 @@
+package core.sync
+
+use package core.intrinsics.atomics
+
+Mutex :: struct {
+    lock  : i32;
+    // owner : Thread_Id;
+}
+
+mutex_init :: (m: ^Mutex) {
+    m.lock = 0;
+}
+
+mutex_destroy :: (m: ^Mutex) {
+    m.lock = -1;
+}
+
+mutex_lock :: (m: ^Mutex) {
+    while __atomic_cmpxchg(^m.lock, 0, 1) == 1 {
+        __atomic_wait(^m.lock, 0);
+    }
+}
+
+mutex_unlock :: (m: ^Mutex) {
+    __atomic_store(^m.lock, 0);
+    __atomic_notify(^m.lock, maximum = 1);
+}
+
+scoped_mutex :: macro (m: ^Mutex) {
+    ml :: mutex_lock
+    mu :: mutex_unlock
+
+    ml(m);
+    defer mu(m);
+}
\ No newline at end of file
diff --git a/core/threads/thread.onyx b/core/threads/thread.onyx
new file mode 100644 (file)
index 0000000..b720411
--- /dev/null
@@ -0,0 +1,49 @@
+package core.thread
+
+use package core
+
+#private {
+    thread_mutex   : sync.Mutex;
+    next_thread_id := 1;
+    thread_map     : Map(Thread_ID, ^Thread);
+}
+
+Thread_ID :: #type i32
+
+Thread :: struct {
+    id    : Thread_ID;
+    alive : bool;
+}
+
+spawn :: (t: ^Thread, func: (rawptr) -> void, data: rawptr) {
+    sync.scoped_mutex(^thread_mutex);
+
+    t.id    = next_thread_id;
+    t.alive = true;
+    next_thread_id += 1;
+
+    thread_map->put(t.id, t);
+
+    runtime.__spawn_thread(t.id, func, data);
+}
+
+join :: (t: ^Thread) {
+    while t.alive ---;
+}
+
+__initialize :: () {
+    thread_map->init();
+}
+
+__exited :: (id: i32) {
+    sync.scoped_mutex(^thread_mutex);
+
+    thread := thread_map->get(id);
+    if thread != null {
+        thread.alive = false;
+        thread_map->delete(id);
+    }
+}
+
+#private_file runtime :: package runtime
+