added barrier and condition variable implementation
authorBrendan Hansen <brendan.f.hansen@gmail.com>
Mon, 18 Oct 2021 16:06:22 +0000 (11:06 -0500)
committerBrendan Hansen <brendan.f.hansen@gmail.com>
Mon, 18 Oct 2021 16:06:22 +0000 (11:06 -0500)
core/std.onyx
core/sync/barrier.onyx [new file with mode: 0644]
core/sync/condition_variable.onyx [new file with mode: 0644]
core/sync/mutex.onyx
core/sync/semaphore.onyx
core/threads/thread.onyx

index 3b9bc8c5759ba17bbaa5fb6b339b063907f59c7e..9ad51a9eb87bc2b9ebba25aaa99bd40f03aadce8 100644 (file)
@@ -55,6 +55,11 @@ package core
 
 #if runtime.Multi_Threading_Enabled {
     #load "./intrinsics/atomics"
+
     #load "./sync/mutex"
+    #load "./sync/condition_variable"
+    #load "./sync/semaphore"
+    #load "./sync/barrier"
+
     #load "./threads/thread"
 }
\ No newline at end of file
diff --git a/core/sync/barrier.onyx b/core/sync/barrier.onyx
new file mode 100644 (file)
index 0000000..6f90e9a
--- /dev/null
@@ -0,0 +1,45 @@
+package core.sync
+
+Barrier :: struct {
+    mutex : Mutex;
+    cond  : Condition_Variable;
+
+    index        : i32;
+    generation   : i32;
+    thread_count : i32;
+}
+
+barrier_init :: (b: ^Barrier, thread_count: i32) {
+    mutex_init(^b.mutex);
+    condition_init(^b.cond);
+
+    b.index = 0;
+    b.generation = 0;
+    b.thread_count = thread_count;
+}
+
+barrier_destroy :: (b: ^Barrier) {
+    mutex_destroy(^b.mutex);
+    condition_destroy(^b.cond);
+}
+
+barrier_wait :: (b: ^Barrier) {
+    scoped_mutex(^b.mutex);
+
+    local_gen := b.generation;
+    b.index += 1;
+
+    if b.index < b.thread_count {
+        mutex_unlock(^b.mutex);
+
+        while local_gen == b.generation && b.index < b.thread_count {
+            condition_wait(^b.cond);
+        }
+        return;
+    }
+
+    b.index = 0;
+    b.generation += 1;
+    condition_broadcast(^b.cond);
+    return;
+}
\ No newline at end of file
diff --git a/core/sync/condition_variable.onyx b/core/sync/condition_variable.onyx
new file mode 100644 (file)
index 0000000..4ad53dc
--- /dev/null
@@ -0,0 +1,56 @@
+package core.sync
+
+// TODO: Free the semaphores after they are used.
+
+Condition_Variable :: struct {
+    Node :: struct {
+        semaphore : Semaphore;
+        next      : ^Node;
+    }
+
+    mutex: Mutex;
+    queue: ^Node;
+}
+
+condition_init :: (c: ^Condition_Variable) {
+    mutex_init(^c.mutex);
+    c.queue = null;
+}
+
+condition_destroy :: (c: ^Condition_Variable) {
+    if c.queue != null do condition_broadcast(c);
+
+    mutex_destroy(^c.mutex);
+}
+
+condition_wait :: (c: ^Condition_Variable) {
+    self: Condition_Variable.Node;
+
+    critical_section(^c.mutex, #code {
+        self.next = c.queue;
+        c.queue   = ^self;
+        semaphore_init(^self.semaphore, 0);
+    });
+
+    semaphore_wait(^self.semaphore);
+}
+
+condition_signal :: (c: ^Condition_Variable) {
+    scoped_mutex(^c.mutex);
+
+    if c.queue != null {
+        semaphore_post(^c.queue.semaphore);
+        semaphore_destroy(^c.queue.semaphore);
+        c.queue = c.queue.next;
+    }
+}
+
+condition_broadcast :: (c: ^Condition_Variable) {
+    scoped_mutex(^c.mutex);
+
+    while c.queue != null {
+        semaphore_post(^c.queue.semaphore);
+        semaphore_destroy(^c.queue.semaphore);
+        c.queue = c.queue.next;
+    }
+}
\ No newline at end of file
index ecaf5518a3b230951c2444696cec443bba4dcb20..4960870f18dfe95f59898a68387455a1c3141121 100644 (file)
@@ -1,6 +1,7 @@
 package core.sync
 
 use package core.intrinsics.atomics
+use package core.thread { Thread_ID }
 
 // `lock` has two states: 0, and 1.
 //    0 means unlocked
@@ -16,7 +17,7 @@ use package core.intrinsics.atomics
 
 Mutex :: struct {
     lock  : i32;
-    // owner : Thread_Id;
+    owner : Thread_ID;
 }
 
 mutex_init :: (m: ^Mutex) {
@@ -31,9 +32,12 @@ mutex_lock :: (m: ^Mutex) {
     while __atomic_cmpxchg(^m.lock, 0, 1) == 1 {
         __atomic_wait(^m.lock, 1);
     }
+
+    m.owner = context.thread_id;
 }
 
 mutex_unlock :: (m: ^Mutex) {
+    m.owner = -1;
     __atomic_store(^m.lock, 0);
     __atomic_notify(^m.lock, maximum = 1);
 }
@@ -44,4 +48,12 @@ scoped_mutex :: macro (m: ^Mutex) {
 
     ml(m);
     defer mu(m);
+}
+
+critical_section :: macro (m: ^Mutex, body: Code) -> i32 {
+    scoped_mutex(m);
+
+    #insert body;
+
+    return 0;
 }
\ No newline at end of file
index 7ecf5f40a0bdc5be7629abdbcf9d69d6a8eab3ea..9ee288fa57dae7bbbaa84d6041b389c307058873 100644 (file)
@@ -13,6 +13,10 @@ semaphore_init :: (s: ^Semaphore, value: i32) {
     mutex_init(^s.mutex);
 }
 
+semaphore_destroy :: (s: ^Semaphore) {
+    mutex_destroy(^s.mutex);
+}
+
 semaphore_post :: (s: ^Semaphore) {
     scoped_mutex(^s.mutex);
     s.counter += 1;
index 8ee99cb676bbdcc1b3f1fb427e485b5e79e6e327..933e9d69b85aa0415a8c57a2076e4fe0dced9aee 100644 (file)
@@ -4,6 +4,8 @@ use package core
 use package core.intrinsics.atomics
 
 #private {
+    runtime :: package runtime
+
     thread_mutex   : sync.Mutex;
     next_thread_id := 1;
     thread_map     : Map(Thread_ID, ^Thread);
@@ -48,5 +50,4 @@ __exited :: (id: i32) {
     }
 }
 
-#private_file runtime :: package runtime