documented sync library
authorBrendan Hansen <brendan.f.hansen@gmail.com>
Mon, 13 Feb 2023 21:49:54 +0000 (15:49 -0600)
committerBrendan Hansen <brendan.f.hansen@gmail.com>
Mon, 13 Feb 2023 21:49:54 +0000 (15:49 -0600)
core/sync/barrier.onyx
core/sync/condition_variable.onyx
core/sync/mutex.onyx
core/sync/once.onyx
core/sync/semaphore.onyx

index b31fc696b33b61e4308363bc222e87dc770f0a0e..f5d8259220d853468e195fcfe8d7b64dce4b1d4d 100644 (file)
@@ -1,5 +1,21 @@
 package core.sync
 
+//
+// A barrier is a special case of a condition variable that
+// creates a special point in the code where all threads have
+// to "sync up", before any thread can continue. Barriers
+// are generally used between steps of parallel processing
+// to ensure every thread is where it needs to be.
+//
+// All threads simply call barrier_wait to signal that they
+// have reached that point in the code. The last thread to
+// call barrier_wait will wake up every other thread and
+// continue processing.
+//
+
+//
+// Represents a generational barrier, so the same barrier
+// can be used safely multiple times.
 Barrier :: struct {
     mutex : Mutex;
     cond  : Condition_Variable;
@@ -9,6 +25,8 @@ Barrier :: struct {
     thread_count : i32;
 }
 
+//
+// Initializes a new generational barrier with `thread_count` threads.
 barrier_init :: (b: ^Barrier, thread_count: i32) {
     mutex_init(^b.mutex);
     condition_init(^b.cond);
@@ -18,11 +36,16 @@ barrier_init :: (b: ^Barrier, thread_count: i32) {
     b.thread_count = thread_count;
 }
 
+//
+// Destroys a generational barrier.
 barrier_destroy :: (b: ^Barrier) {
     mutex_destroy(^b.mutex);
     condition_destroy(^b.cond);
 }
 
+//
+// Signals that a thread has reached the barrier.
+// The last thread to reach the barrier will wake up all other threads.
 barrier_wait :: (b: ^Barrier) {
     mutex_lock(^b.mutex);
     defer mutex_unlock(^b.mutex);
index 8aa89274a24b8ce32546cfdb05d560080f71b466..c8eacd2215c87dc9d353a728d24ca570c6e74bdf 100644 (file)
@@ -2,6 +2,26 @@ package core.sync
 
 // TODO: Free the semaphores after they are used.
 
+//
+// A condition variable is used to implement a queue of threads
+// waiting for a condition to be true. Each thread joins the queue
+// using `condition_wait`. Then, another thread can signal that
+// the condition has changed and can "wake up" the first thread in
+// the queue using `condition_signal`. Alternatively, all threads
+// can be woken up using `condition_broadcast`.
+//
+// Condition variables are generally used to prevent spin checking
+// a condition and waiting for it to change. Instead, the thread
+// joins a wait-queue, and leave it up to another thread to wake
+// it up to continue processing. However sadly, in WebAssembly this
+// is not possible because with the atomic_wait and atomic_notify
+// instructions, which currently are not supported by any runtime
+// outside of the browser.
+//
+
+//
+// Represents a condition variable, with a mutex used to
+// protect the queue operations.
 Condition_Variable :: struct {
     Node :: struct {
         semaphore : Semaphore;
@@ -12,17 +32,25 @@ Condition_Variable :: struct {
     queue: ^Node;
 }
 
+//
+// Initializes a new condition variable.
 condition_init :: (c: ^Condition_Variable) {
     mutex_init(^c.mutex);
     c.queue = null;
 }
 
+//
+// Destroys a condition variable.
 condition_destroy :: (c: ^Condition_Variable) {
     if c.queue != null do condition_broadcast(c);
 
     mutex_destroy(^c.mutex);
 }
 
+//
+// Enters the thread in the wait-queue of the condition variable.
+// If `m` is not null, the mutex will first be released before
+// entering the queue, and then relocked before returning.
 condition_wait :: (c: ^Condition_Variable, m: ^Mutex) {
     node: Condition_Variable.Node;
 
@@ -37,6 +65,8 @@ condition_wait :: (c: ^Condition_Variable, m: ^Mutex) {
     if m != null do mutex_lock(m);
 }
 
+//
+// Wakes up one thread from the wait-queue.
 condition_signal :: (c: ^Condition_Variable) {
     scoped_mutex(^c.mutex);
 
@@ -46,6 +76,8 @@ condition_signal :: (c: ^Condition_Variable) {
     }
 }
 
+//
+// Wakes up all threads from the wait-queue.
 condition_broadcast :: (c: ^Condition_Variable) {
     scoped_mutex(^c.mutex);
 
@@ -53,4 +85,4 @@ condition_broadcast :: (c: ^Condition_Variable) {
         semaphore_post(^c.queue.semaphore);
         c.queue = c.queue.next;
     }
-}
\ No newline at end of file
+}
index c730b385b74cece2ed1f05fac3501977ae6da0ef..7c40b39869c04bb2170950a21e3d0a6e7b1722fd 100644 (file)
@@ -3,6 +3,15 @@ package core.sync
 use core.intrinsics.atomics
 use core.thread { Thread_ID }
 
+//
+// A mutex represents a resource that can only be held by one
+// thread at a time. It is used to create sections of code that
+// only one thread can be in at a time.
+//
+// Mutexes in WebAssembly are very cheap, because they simply
+// use the atomic_cmpxchg intrinsic to operate. This only uses
+// memory, so no real resource allocation is necessary.
+//
 // `lock` has two states: 0, and 1.
 //    0 means unlocked
 //    1 means locked
@@ -14,22 +23,31 @@ use core.thread { Thread_ID }
 // To unlock it:
 //    Atomically set it to 0.
 //    Notify at most 1 other thread about this change.
-
+//
 Mutex :: struct {
     lock  : i32;
     owner : Thread_ID;
 }
 
+//
+// Initializes a new mutex.
 mutex_init :: (m: ^Mutex) {
     m.lock = 0;
     m.owner = -1;
 }
 
+//
+// Destroys a mutex.
 mutex_destroy :: (m: ^Mutex) {
     m.lock = -1;
     m.owner = -1;
 }
 
+//
+// Locks a mutex. If the mutex is currently held by another thread,
+// this function enters a spin loop until the mutex is unlocked.
+// In a JavaScript based implementation, the __atomic_wait intrinsic
+// is used to avoid having to spin loop.
 mutex_lock :: (m: ^Mutex) {
     while __atomic_cmpxchg(^m.lock, 0, 1) == 1 {
         if m.owner == context.thread_id do return;
@@ -50,6 +68,10 @@ mutex_lock :: (m: ^Mutex) {
     m.owner = context.thread_id;
 }
 
+//
+// Unlocks a mutex, if the calling thread currently holds the mutex.
+// In a JavaScript based implementation, the __atomic_notify intrinsic
+// is used to wake up one waiting thread.
 mutex_unlock :: (m: ^Mutex) {
     if m.owner != context.thread_id do return;
     
@@ -61,6 +83,17 @@ mutex_unlock :: (m: ^Mutex) {
     }
 }
 
+//
+// Helpful macro for making a particular block be protected by a macro.
+//
+//     m: sync.Mutx;
+//     sync.mutex_init(^m);
+//
+//     {
+//        sync.scoped_mutex(^m);
+//        // Everything here is done by one thread at a time.
+//     }
+//
 scoped_mutex :: macro (m: ^Mutex) {
     ml :: mutex_lock
     mu :: mutex_unlock
@@ -69,6 +102,17 @@ scoped_mutex :: macro (m: ^Mutex) {
     defer mu(m);
 }
 
+//
+// Abstracts the pattern decribed in scoped_mutex by automatically
+// calling scoped_mutex in the block of code given.
+//
+//     m: sync.Mutx;
+//     sync.mutex_init(^m);
+//
+//     sync.critical_section(^m) {
+//        // Everything here is done by one thread at a time.
+//     }
+//
 critical_section :: macro (m: ^Mutex, body: Code) -> i32 {
     scoped_mutex :: scoped_mutex;
     scoped_mutex(m);
index 22cf77297d575dc90975ed88185ca947a886cf95..b1efe34a0641f9313c7795a488c83de624feda39 100644 (file)
@@ -1,5 +1,12 @@
 package core.sync
 
+//
+// Once is a thread-safe mechanism for executing a particular
+// function only once. It is simply a flag with a mutex.
+//
+
+//
+// Represents something will only happen once.
 Once :: struct {
     done: bool;
     mutex: Mutex;
@@ -7,6 +14,8 @@ Once :: struct {
 
 #inject Once.exec :: #match #local {}
 
+//
+// Run a function with no arguments once.
 #overload
 Once.exec :: (o: ^Once, f: () -> $R) {
     scoped_mutex(^o.mutex);
@@ -16,6 +25,8 @@ Once.exec :: (o: ^Once, f: () -> $R) {
     f();
 }
 
+//
+// Run a function with one argument once.
 #overload
 Once.exec :: (o: ^Once, ctx: $Ctx, f: (Ctx) -> $R) {
     scoped_mutex(^o.mutex);
index 44e229987e8a841a0dbef8445d6da25e965169a8..4b4d7e80a2c70ea88c5102639290922d69a61cd7 100644 (file)
@@ -2,34 +2,60 @@ package core.sync
 
 use core.intrinsics.atomics
 
+//
+// A semaphore represents a counter that can only be incremented
+// and decremented by one thread at a time. "Waiting" on a semaphore
+// means decrementing the counter by 1 if it is greater than 0, otherwise
+// waiting until the counter is incremented. "Posting" on a semaphore
+// means incrementing the counter by a certain value, in turn releasing
+// other threads that might have been waiting for the value to change.
+//
+// Semaphores are generally used for controlling access to shared
+// resources. For a contrived example, say only 4 threads can use
+// a given network connection at a time. A semaphore would be created
+// with a value of 4. When a thread wants to use the network connection,
+// it would use `semaphore_wait` to obtain the resource, or wait if
+// the network is currently available. When it is done using the
+// network, it would call `semaphore_post` to release the resource,
+// allowing another thread to use it.
+//
 Semaphore :: struct {
     mutex   : Mutex;
     counter : i32;
 }
 
+//
+// Initializes a semaphore with the specified value.
 semaphore_init :: (s: ^Semaphore, value: i32) {
     s.counter = value;
 
     mutex_init(^s.mutex);
 }
 
+//
+// Destroys a semaphore.
 semaphore_destroy :: (s: ^Semaphore) {
     mutex_destroy(^s.mutex);
 }
 
+//
+// Increment the counter in a semaphore by `count`.
 semaphore_post :: (s: ^Semaphore, count := 1) {
     if count == 0 do return;
     
     scoped_mutex(^s.mutex);
     s.counter += count;
 
-    // @Bug // This is susceptible to starvation. Semaphores should have a queue
+    // @Bug
+    // This is susceptible to starvation. Semaphores should have a queue
     // or something like that.
     #if runtime.Wait_Notify_Available {
         __atomic_notify(^s.counter, maximum = count);
     }
 }
 
+//
+// Waits until the thread is able to decrement one from the semaphore.
 semaphore_wait :: (s: ^Semaphore) {
     while true {
         mutex_lock(^s.mutex);
@@ -47,4 +73,4 @@ semaphore_wait :: (s: ^Semaphore) {
             }
         }
     }
-}
\ No newline at end of file
+}