From 2bfe8ed68832d4d5ac06ea3a02f8b2d4ee813010 Mon Sep 17 00:00:00 2001 From: Brendan Hansen Date: Mon, 18 Oct 2021 11:06:22 -0500 Subject: [PATCH] added barrier and condition variable implementation --- core/std.onyx | 5 +++ core/sync/barrier.onyx | 45 +++++++++++++++++++++++++ core/sync/condition_variable.onyx | 56 +++++++++++++++++++++++++++++++ core/sync/mutex.onyx | 14 +++++++- core/sync/semaphore.onyx | 4 +++ core/threads/thread.onyx | 3 +- 6 files changed, 125 insertions(+), 2 deletions(-) create mode 100644 core/sync/barrier.onyx create mode 100644 core/sync/condition_variable.onyx diff --git a/core/std.onyx b/core/std.onyx index 3b9bc8c5..9ad51a9e 100644 --- a/core/std.onyx +++ b/core/std.onyx @@ -55,6 +55,11 @@ package core #if runtime.Multi_Threading_Enabled { #load "./intrinsics/atomics" + #load "./sync/mutex" + #load "./sync/condition_variable" + #load "./sync/semaphore" + #load "./sync/barrier" + #load "./threads/thread" } \ No newline at end of file diff --git a/core/sync/barrier.onyx b/core/sync/barrier.onyx new file mode 100644 index 00000000..6f90e9a3 --- /dev/null +++ b/core/sync/barrier.onyx @@ -0,0 +1,45 @@ +package core.sync + +Barrier :: struct { + mutex : Mutex; + cond : Condition_Variable; + + index : i32; + generation : i32; + thread_count : i32; +} + +barrier_init :: (b: ^Barrier, thread_count: i32) { + mutex_init(^b.mutex); + condition_init(^b.cond); + + b.index = 0; + b.generation = 0; + b.thread_count = thread_count; +} + +barrier_destroy :: (b: ^Barrier) { + mutex_destroy(^b.mutex); + condition_destroy(^b.cond); +} + +barrier_wait :: (b: ^Barrier) { + scoped_mutex(^b.mutex); + + local_gen := b.generation; + b.index += 1; + + if b.index < b.thread_count { + mutex_unlock(^b.mutex); + + while local_gen == b.generation && b.index < b.thread_count { + condition_wait(^b.cond); + } + return; + } + + b.index = 0; + b.generation += 1; + condition_broadcast(^b.cond); + return; +} \ No newline at end of file diff --git a/core/sync/condition_variable.onyx b/core/sync/condition_variable.onyx new file mode 100644 index 00000000..4ad53dca --- /dev/null +++ b/core/sync/condition_variable.onyx @@ -0,0 +1,56 @@ +package core.sync + +// TODO: Free the semaphores after they are used. + +Condition_Variable :: struct { + Node :: struct { + semaphore : Semaphore; + next : ^Node; + } + + mutex: Mutex; + queue: ^Node; +} + +condition_init :: (c: ^Condition_Variable) { + mutex_init(^c.mutex); + c.queue = null; +} + +condition_destroy :: (c: ^Condition_Variable) { + if c.queue != null do condition_broadcast(c); + + mutex_destroy(^c.mutex); +} + +condition_wait :: (c: ^Condition_Variable) { + self: Condition_Variable.Node; + + critical_section(^c.mutex, #code { + self.next = c.queue; + c.queue = ^self; + semaphore_init(^self.semaphore, 0); + }); + + semaphore_wait(^self.semaphore); +} + +condition_signal :: (c: ^Condition_Variable) { + scoped_mutex(^c.mutex); + + if c.queue != null { + semaphore_post(^c.queue.semaphore); + semaphore_destroy(^c.queue.semaphore); + c.queue = c.queue.next; + } +} + +condition_broadcast :: (c: ^Condition_Variable) { + scoped_mutex(^c.mutex); + + while c.queue != null { + semaphore_post(^c.queue.semaphore); + semaphore_destroy(^c.queue.semaphore); + c.queue = c.queue.next; + } +} \ No newline at end of file diff --git a/core/sync/mutex.onyx b/core/sync/mutex.onyx index ecaf5518..4960870f 100644 --- a/core/sync/mutex.onyx +++ b/core/sync/mutex.onyx @@ -1,6 +1,7 @@ package core.sync use package core.intrinsics.atomics +use package core.thread { Thread_ID } // `lock` has two states: 0, and 1. // 0 means unlocked @@ -16,7 +17,7 @@ use package core.intrinsics.atomics Mutex :: struct { lock : i32; - // owner : Thread_Id; + owner : Thread_ID; } mutex_init :: (m: ^Mutex) { @@ -31,9 +32,12 @@ mutex_lock :: (m: ^Mutex) { while __atomic_cmpxchg(^m.lock, 0, 1) == 1 { __atomic_wait(^m.lock, 1); } + + m.owner = context.thread_id; } mutex_unlock :: (m: ^Mutex) { + m.owner = -1; __atomic_store(^m.lock, 0); __atomic_notify(^m.lock, maximum = 1); } @@ -44,4 +48,12 @@ scoped_mutex :: macro (m: ^Mutex) { ml(m); defer mu(m); +} + +critical_section :: macro (m: ^Mutex, body: Code) -> i32 { + scoped_mutex(m); + + #insert body; + + return 0; } \ No newline at end of file diff --git a/core/sync/semaphore.onyx b/core/sync/semaphore.onyx index 7ecf5f40..9ee288fa 100644 --- a/core/sync/semaphore.onyx +++ b/core/sync/semaphore.onyx @@ -13,6 +13,10 @@ semaphore_init :: (s: ^Semaphore, value: i32) { mutex_init(^s.mutex); } +semaphore_destroy :: (s: ^Semaphore) { + mutex_destroy(^s.mutex); +} + semaphore_post :: (s: ^Semaphore) { scoped_mutex(^s.mutex); s.counter += 1; diff --git a/core/threads/thread.onyx b/core/threads/thread.onyx index 8ee99cb6..933e9d69 100644 --- a/core/threads/thread.onyx +++ b/core/threads/thread.onyx @@ -4,6 +4,8 @@ use package core use package core.intrinsics.atomics #private { + runtime :: package runtime + thread_mutex : sync.Mutex; next_thread_id := 1; thread_map : Map(Thread_ID, ^Thread); @@ -48,5 +50,4 @@ __exited :: (id: i32) { } } -#private_file runtime :: package runtime -- 2.25.1