package core.sync
+//
+// A barrier is a special case of a condition variable that
+// creates a special point in the code where all threads have
+// to "sync up", before any thread can continue. Barriers
+// are generally used between steps of parallel processing
+// to ensure every thread is where it needs to be.
+//
+// All threads simply call barrier_wait to signal that they
+// have reached that point in the code. The last thread to
+// call barrier_wait will wake up every other thread and
+// continue processing.
+//
+
+//
+// Represents a generational barrier, so the same barrier
+// can be used safely multiple times.
Barrier :: struct {
mutex : Mutex;
cond : Condition_Variable;
thread_count : i32;
}
+//
+// Initializes a new generational barrier with `thread_count` threads.
barrier_init :: (b: ^Barrier, thread_count: i32) {
mutex_init(^b.mutex);
condition_init(^b.cond);
b.thread_count = thread_count;
}
+//
+// Destroys a generational barrier.
barrier_destroy :: (b: ^Barrier) {
mutex_destroy(^b.mutex);
condition_destroy(^b.cond);
}
+//
+// Signals that a thread has reached the barrier.
+// The last thread to reach the barrier will wake up all other threads.
barrier_wait :: (b: ^Barrier) {
mutex_lock(^b.mutex);
defer mutex_unlock(^b.mutex);
// TODO: Free the semaphores after they are used.
+//
+// A condition variable is used to implement a queue of threads
+// waiting for a condition to be true. Each thread joins the queue
+// using `condition_wait`. Then, another thread can signal that
+// the condition has changed and can "wake up" the first thread in
+// the queue using `condition_signal`. Alternatively, all threads
+// can be woken up using `condition_broadcast`.
+//
+// Condition variables are generally used to prevent spin checking
+// a condition and waiting for it to change. Instead, the thread
+// joins a wait-queue, and leave it up to another thread to wake
+// it up to continue processing. However sadly, in WebAssembly this
+// is not possible because with the atomic_wait and atomic_notify
+// instructions, which currently are not supported by any runtime
+// outside of the browser.
+//
+
+//
+// Represents a condition variable, with a mutex used to
+// protect the queue operations.
Condition_Variable :: struct {
Node :: struct {
semaphore : Semaphore;
queue: ^Node;
}
+//
+// Initializes a new condition variable.
condition_init :: (c: ^Condition_Variable) {
mutex_init(^c.mutex);
c.queue = null;
}
+//
+// Destroys a condition variable.
condition_destroy :: (c: ^Condition_Variable) {
if c.queue != null do condition_broadcast(c);
mutex_destroy(^c.mutex);
}
+//
+// Enters the thread in the wait-queue of the condition variable.
+// If `m` is not null, the mutex will first be released before
+// entering the queue, and then relocked before returning.
condition_wait :: (c: ^Condition_Variable, m: ^Mutex) {
node: Condition_Variable.Node;
if m != null do mutex_lock(m);
}
+//
+// Wakes up one thread from the wait-queue.
condition_signal :: (c: ^Condition_Variable) {
scoped_mutex(^c.mutex);
}
}
+//
+// Wakes up all threads from the wait-queue.
condition_broadcast :: (c: ^Condition_Variable) {
scoped_mutex(^c.mutex);
semaphore_post(^c.queue.semaphore);
c.queue = c.queue.next;
}
-}
\ No newline at end of file
+}
use core.intrinsics.atomics
use core.thread { Thread_ID }
+//
+// A mutex represents a resource that can only be held by one
+// thread at a time. It is used to create sections of code that
+// only one thread can be in at a time.
+//
+// Mutexes in WebAssembly are very cheap, because they simply
+// use the atomic_cmpxchg intrinsic to operate. This only uses
+// memory, so no real resource allocation is necessary.
+//
// `lock` has two states: 0, and 1.
// 0 means unlocked
// 1 means locked
// To unlock it:
// Atomically set it to 0.
// Notify at most 1 other thread about this change.
-
+//
Mutex :: struct {
lock : i32;
owner : Thread_ID;
}
+//
+// Initializes a new mutex.
mutex_init :: (m: ^Mutex) {
m.lock = 0;
m.owner = -1;
}
+//
+// Destroys a mutex.
mutex_destroy :: (m: ^Mutex) {
m.lock = -1;
m.owner = -1;
}
+//
+// Locks a mutex. If the mutex is currently held by another thread,
+// this function enters a spin loop until the mutex is unlocked.
+// In a JavaScript based implementation, the __atomic_wait intrinsic
+// is used to avoid having to spin loop.
mutex_lock :: (m: ^Mutex) {
while __atomic_cmpxchg(^m.lock, 0, 1) == 1 {
if m.owner == context.thread_id do return;
m.owner = context.thread_id;
}
+//
+// Unlocks a mutex, if the calling thread currently holds the mutex.
+// In a JavaScript based implementation, the __atomic_notify intrinsic
+// is used to wake up one waiting thread.
mutex_unlock :: (m: ^Mutex) {
if m.owner != context.thread_id do return;
}
}
+//
+// Helpful macro for making a particular block be protected by a macro.
+//
+// m: sync.Mutx;
+// sync.mutex_init(^m);
+//
+// {
+// sync.scoped_mutex(^m);
+// // Everything here is done by one thread at a time.
+// }
+//
scoped_mutex :: macro (m: ^Mutex) {
ml :: mutex_lock
mu :: mutex_unlock
defer mu(m);
}
+//
+// Abstracts the pattern decribed in scoped_mutex by automatically
+// calling scoped_mutex in the block of code given.
+//
+// m: sync.Mutx;
+// sync.mutex_init(^m);
+//
+// sync.critical_section(^m) {
+// // Everything here is done by one thread at a time.
+// }
+//
critical_section :: macro (m: ^Mutex, body: Code) -> i32 {
scoped_mutex :: scoped_mutex;
scoped_mutex(m);
package core.sync
+//
+// Once is a thread-safe mechanism for executing a particular
+// function only once. It is simply a flag with a mutex.
+//
+
+//
+// Represents something will only happen once.
Once :: struct {
done: bool;
mutex: Mutex;
#inject Once.exec :: #match #local {}
+//
+// Run a function with no arguments once.
#overload
Once.exec :: (o: ^Once, f: () -> $R) {
scoped_mutex(^o.mutex);
f();
}
+//
+// Run a function with one argument once.
#overload
Once.exec :: (o: ^Once, ctx: $Ctx, f: (Ctx) -> $R) {
scoped_mutex(^o.mutex);
use core.intrinsics.atomics
+//
+// A semaphore represents a counter that can only be incremented
+// and decremented by one thread at a time. "Waiting" on a semaphore
+// means decrementing the counter by 1 if it is greater than 0, otherwise
+// waiting until the counter is incremented. "Posting" on a semaphore
+// means incrementing the counter by a certain value, in turn releasing
+// other threads that might have been waiting for the value to change.
+//
+// Semaphores are generally used for controlling access to shared
+// resources. For a contrived example, say only 4 threads can use
+// a given network connection at a time. A semaphore would be created
+// with a value of 4. When a thread wants to use the network connection,
+// it would use `semaphore_wait` to obtain the resource, or wait if
+// the network is currently available. When it is done using the
+// network, it would call `semaphore_post` to release the resource,
+// allowing another thread to use it.
+//
Semaphore :: struct {
mutex : Mutex;
counter : i32;
}
+//
+// Initializes a semaphore with the specified value.
semaphore_init :: (s: ^Semaphore, value: i32) {
s.counter = value;
mutex_init(^s.mutex);
}
+//
+// Destroys a semaphore.
semaphore_destroy :: (s: ^Semaphore) {
mutex_destroy(^s.mutex);
}
+//
+// Increment the counter in a semaphore by `count`.
semaphore_post :: (s: ^Semaphore, count := 1) {
if count == 0 do return;
scoped_mutex(^s.mutex);
s.counter += count;
- // @Bug // This is susceptible to starvation. Semaphores should have a queue
+ // @Bug
+ // This is susceptible to starvation. Semaphores should have a queue
// or something like that.
#if runtime.Wait_Notify_Available {
__atomic_notify(^s.counter, maximum = count);
}
}
+//
+// Waits until the thread is able to decrement one from the semaphore.
semaphore_wait :: (s: ^Semaphore) {
while true {
mutex_lock(^s.mutex);
}
}
}
-}
\ No newline at end of file
+}