From 0eb939e8fe959b32ca78df6d4f46712e03fd63ae Mon Sep 17 00:00:00 2001 From: Brendan Hansen Date: Mon, 13 Feb 2023 15:49:54 -0600 Subject: [PATCH] documented sync library --- core/sync/barrier.onyx | 23 ++++++++++++++++ core/sync/condition_variable.onyx | 34 ++++++++++++++++++++++- core/sync/mutex.onyx | 46 ++++++++++++++++++++++++++++++- core/sync/once.onyx | 11 ++++++++ core/sync/semaphore.onyx | 30 ++++++++++++++++++-- 5 files changed, 140 insertions(+), 4 deletions(-) diff --git a/core/sync/barrier.onyx b/core/sync/barrier.onyx index b31fc696..f5d82592 100644 --- a/core/sync/barrier.onyx +++ b/core/sync/barrier.onyx @@ -1,5 +1,21 @@ package core.sync +// +// A barrier is a special case of a condition variable that +// creates a special point in the code where all threads have +// to "sync up", before any thread can continue. Barriers +// are generally used between steps of parallel processing +// to ensure every thread is where it needs to be. +// +// All threads simply call barrier_wait to signal that they +// have reached that point in the code. The last thread to +// call barrier_wait will wake up every other thread and +// continue processing. +// + +// +// Represents a generational barrier, so the same barrier +// can be used safely multiple times. Barrier :: struct { mutex : Mutex; cond : Condition_Variable; @@ -9,6 +25,8 @@ Barrier :: struct { thread_count : i32; } +// +// Initializes a new generational barrier with `thread_count` threads. barrier_init :: (b: ^Barrier, thread_count: i32) { mutex_init(^b.mutex); condition_init(^b.cond); @@ -18,11 +36,16 @@ barrier_init :: (b: ^Barrier, thread_count: i32) { b.thread_count = thread_count; } +// +// Destroys a generational barrier. barrier_destroy :: (b: ^Barrier) { mutex_destroy(^b.mutex); condition_destroy(^b.cond); } +// +// Signals that a thread has reached the barrier. +// The last thread to reach the barrier will wake up all other threads. barrier_wait :: (b: ^Barrier) { mutex_lock(^b.mutex); defer mutex_unlock(^b.mutex); diff --git a/core/sync/condition_variable.onyx b/core/sync/condition_variable.onyx index 8aa89274..c8eacd22 100644 --- a/core/sync/condition_variable.onyx +++ b/core/sync/condition_variable.onyx @@ -2,6 +2,26 @@ package core.sync // TODO: Free the semaphores after they are used. +// +// A condition variable is used to implement a queue of threads +// waiting for a condition to be true. Each thread joins the queue +// using `condition_wait`. Then, another thread can signal that +// the condition has changed and can "wake up" the first thread in +// the queue using `condition_signal`. Alternatively, all threads +// can be woken up using `condition_broadcast`. +// +// Condition variables are generally used to prevent spin checking +// a condition and waiting for it to change. Instead, the thread +// joins a wait-queue, and leave it up to another thread to wake +// it up to continue processing. However sadly, in WebAssembly this +// is not possible because with the atomic_wait and atomic_notify +// instructions, which currently are not supported by any runtime +// outside of the browser. +// + +// +// Represents a condition variable, with a mutex used to +// protect the queue operations. Condition_Variable :: struct { Node :: struct { semaphore : Semaphore; @@ -12,17 +32,25 @@ Condition_Variable :: struct { queue: ^Node; } +// +// Initializes a new condition variable. condition_init :: (c: ^Condition_Variable) { mutex_init(^c.mutex); c.queue = null; } +// +// Destroys a condition variable. condition_destroy :: (c: ^Condition_Variable) { if c.queue != null do condition_broadcast(c); mutex_destroy(^c.mutex); } +// +// Enters the thread in the wait-queue of the condition variable. +// If `m` is not null, the mutex will first be released before +// entering the queue, and then relocked before returning. condition_wait :: (c: ^Condition_Variable, m: ^Mutex) { node: Condition_Variable.Node; @@ -37,6 +65,8 @@ condition_wait :: (c: ^Condition_Variable, m: ^Mutex) { if m != null do mutex_lock(m); } +// +// Wakes up one thread from the wait-queue. condition_signal :: (c: ^Condition_Variable) { scoped_mutex(^c.mutex); @@ -46,6 +76,8 @@ condition_signal :: (c: ^Condition_Variable) { } } +// +// Wakes up all threads from the wait-queue. condition_broadcast :: (c: ^Condition_Variable) { scoped_mutex(^c.mutex); @@ -53,4 +85,4 @@ condition_broadcast :: (c: ^Condition_Variable) { semaphore_post(^c.queue.semaphore); c.queue = c.queue.next; } -} \ No newline at end of file +} diff --git a/core/sync/mutex.onyx b/core/sync/mutex.onyx index c730b385..7c40b398 100644 --- a/core/sync/mutex.onyx +++ b/core/sync/mutex.onyx @@ -3,6 +3,15 @@ package core.sync use core.intrinsics.atomics use core.thread { Thread_ID } +// +// A mutex represents a resource that can only be held by one +// thread at a time. It is used to create sections of code that +// only one thread can be in at a time. +// +// Mutexes in WebAssembly are very cheap, because they simply +// use the atomic_cmpxchg intrinsic to operate. This only uses +// memory, so no real resource allocation is necessary. +// // `lock` has two states: 0, and 1. // 0 means unlocked // 1 means locked @@ -14,22 +23,31 @@ use core.thread { Thread_ID } // To unlock it: // Atomically set it to 0. // Notify at most 1 other thread about this change. - +// Mutex :: struct { lock : i32; owner : Thread_ID; } +// +// Initializes a new mutex. mutex_init :: (m: ^Mutex) { m.lock = 0; m.owner = -1; } +// +// Destroys a mutex. mutex_destroy :: (m: ^Mutex) { m.lock = -1; m.owner = -1; } +// +// Locks a mutex. If the mutex is currently held by another thread, +// this function enters a spin loop until the mutex is unlocked. +// In a JavaScript based implementation, the __atomic_wait intrinsic +// is used to avoid having to spin loop. mutex_lock :: (m: ^Mutex) { while __atomic_cmpxchg(^m.lock, 0, 1) == 1 { if m.owner == context.thread_id do return; @@ -50,6 +68,10 @@ mutex_lock :: (m: ^Mutex) { m.owner = context.thread_id; } +// +// Unlocks a mutex, if the calling thread currently holds the mutex. +// In a JavaScript based implementation, the __atomic_notify intrinsic +// is used to wake up one waiting thread. mutex_unlock :: (m: ^Mutex) { if m.owner != context.thread_id do return; @@ -61,6 +83,17 @@ mutex_unlock :: (m: ^Mutex) { } } +// +// Helpful macro for making a particular block be protected by a macro. +// +// m: sync.Mutx; +// sync.mutex_init(^m); +// +// { +// sync.scoped_mutex(^m); +// // Everything here is done by one thread at a time. +// } +// scoped_mutex :: macro (m: ^Mutex) { ml :: mutex_lock mu :: mutex_unlock @@ -69,6 +102,17 @@ scoped_mutex :: macro (m: ^Mutex) { defer mu(m); } +// +// Abstracts the pattern decribed in scoped_mutex by automatically +// calling scoped_mutex in the block of code given. +// +// m: sync.Mutx; +// sync.mutex_init(^m); +// +// sync.critical_section(^m) { +// // Everything here is done by one thread at a time. +// } +// critical_section :: macro (m: ^Mutex, body: Code) -> i32 { scoped_mutex :: scoped_mutex; scoped_mutex(m); diff --git a/core/sync/once.onyx b/core/sync/once.onyx index 22cf7729..b1efe34a 100644 --- a/core/sync/once.onyx +++ b/core/sync/once.onyx @@ -1,5 +1,12 @@ package core.sync +// +// Once is a thread-safe mechanism for executing a particular +// function only once. It is simply a flag with a mutex. +// + +// +// Represents something will only happen once. Once :: struct { done: bool; mutex: Mutex; @@ -7,6 +14,8 @@ Once :: struct { #inject Once.exec :: #match #local {} +// +// Run a function with no arguments once. #overload Once.exec :: (o: ^Once, f: () -> $R) { scoped_mutex(^o.mutex); @@ -16,6 +25,8 @@ Once.exec :: (o: ^Once, f: () -> $R) { f(); } +// +// Run a function with one argument once. #overload Once.exec :: (o: ^Once, ctx: $Ctx, f: (Ctx) -> $R) { scoped_mutex(^o.mutex); diff --git a/core/sync/semaphore.onyx b/core/sync/semaphore.onyx index 44e22998..4b4d7e80 100644 --- a/core/sync/semaphore.onyx +++ b/core/sync/semaphore.onyx @@ -2,34 +2,60 @@ package core.sync use core.intrinsics.atomics +// +// A semaphore represents a counter that can only be incremented +// and decremented by one thread at a time. "Waiting" on a semaphore +// means decrementing the counter by 1 if it is greater than 0, otherwise +// waiting until the counter is incremented. "Posting" on a semaphore +// means incrementing the counter by a certain value, in turn releasing +// other threads that might have been waiting for the value to change. +// +// Semaphores are generally used for controlling access to shared +// resources. For a contrived example, say only 4 threads can use +// a given network connection at a time. A semaphore would be created +// with a value of 4. When a thread wants to use the network connection, +// it would use `semaphore_wait` to obtain the resource, or wait if +// the network is currently available. When it is done using the +// network, it would call `semaphore_post` to release the resource, +// allowing another thread to use it. +// Semaphore :: struct { mutex : Mutex; counter : i32; } +// +// Initializes a semaphore with the specified value. semaphore_init :: (s: ^Semaphore, value: i32) { s.counter = value; mutex_init(^s.mutex); } +// +// Destroys a semaphore. semaphore_destroy :: (s: ^Semaphore) { mutex_destroy(^s.mutex); } +// +// Increment the counter in a semaphore by `count`. semaphore_post :: (s: ^Semaphore, count := 1) { if count == 0 do return; scoped_mutex(^s.mutex); s.counter += count; - // @Bug // This is susceptible to starvation. Semaphores should have a queue + // @Bug + // This is susceptible to starvation. Semaphores should have a queue // or something like that. #if runtime.Wait_Notify_Available { __atomic_notify(^s.counter, maximum = count); } } +// +// Waits until the thread is able to decrement one from the semaphore. semaphore_wait :: (s: ^Semaphore) { while true { mutex_lock(^s.mutex); @@ -47,4 +73,4 @@ semaphore_wait :: (s: ^Semaphore) { } } } -} \ No newline at end of file +} -- 2.25.1