better process support on linux
authorBrendan Hansen <brendan.f.hansen@gmail.com>
Thu, 2 Dec 2021 17:02:33 +0000 (11:02 -0600)
committerBrendan Hansen <brendan.f.hansen@gmail.com>
Thu, 2 Dec 2021 17:02:33 +0000 (11:02 -0600)
core/io/process.onyx [new file with mode: 0644]
core/io/reader.onyx
core/runtime/onyx_run.onyx
src/wasm_runtime.c

diff --git a/core/io/process.onyx b/core/io/process.onyx
new file mode 100644 (file)
index 0000000..bfa055e
--- /dev/null
@@ -0,0 +1,61 @@
+package core.io
+
+// Some thoughts about processes and the API.
+//
+// Should processes ever directly dump to standard output?
+// Or should their output always be buffered through a pipe to the user?
+//
+//
+
+
+#local runtime :: package runtime
+#if runtime.Runtime != runtime.Runtime_Onyx {
+    #error "This file can only be included in the 'onyx' runtime, because Wasi has not defined how to spawn and manage processes.";
+}
+
+Process :: struct {
+    Handle :: #distinct i64;
+
+    use stream: Stream;
+    process_handle: Handle;
+}
+
+process_spawn :: (path: str, args: [] str, non_blocking_io := false) -> Process {
+    handle := runtime.__process_spawn(path, args, non_blocking_io);
+
+    return .{
+        .{ ^process_stream_vtable },
+        handle,
+    };
+}
+
+process_kill :: (use p: ^Process) -> bool {
+    return runtime.__process_kill(process_handle);
+}
+
+process_wait :: (use p: ^Process) => {
+    return runtime.__process_wait(process_handle);
+}
+
+#local process_stream_vtable := Stream_Vtable.{
+    read = (use p: ^Process, buffer: [] u8) -> (Error, u32) {
+        // Read from the process stdout
+        if cast(i64) process_handle == 0 do return .BadFile, 0;
+
+        bytes_read := runtime.__process_read(process_handle, buffer);
+        return .None, bytes_read;
+    },
+
+    write = (use p: ^Process, buffer: [] u8) -> (Error, u32) {
+        // Write to the process stdin
+        if cast(i64) process_handle == 0 do return .BadFile, 0;
+
+        bytes_written := runtime.__process_write(process_handle, buffer);
+        return .None, bytes_written;
+    },
+
+    close = (use p: ^Process) -> Error {
+        process_kill(p);
+        return .None;
+    }
+}
index 6f5d005ebf062231b4f0018e873aaee5cd7d19b5..7d38953d22710e6773a8720bc984817d2bf8b09a 100644 (file)
@@ -381,7 +381,7 @@ skip_bytes :: (use reader: ^Reader, bytes: u32) -> (skipped: i32, err: Error) {
     }
 
     // Try to re-read multiple times
-    for 4 {
+    for 16 {
         err, n := stream_read(stream, buffer[end .. buffer.count]);
         end += n;
         if err != .None {
index a60156241feae9ac4c28f53890ad8afc021d4a63..f18a3847dd3cad6068edf58e020ad6fe9792d5d8 100644 (file)
@@ -17,10 +17,16 @@ use package wasi
     #export "_thread_exit"  _thread_exit
 }
 
-#local SpawnProcessResult :: enum {
+#load "core/io/process"
+
+#local ProcessResult :: enum {
     Success     :: 0x00;
     FailedToRun :: 0x01;
     Error       :: 0x02;
 }
 
-__spawn_process :: (path: str, args: [] str) -> SpawnProcessResult #foreign "env" "spawn_process" ---
\ No newline at end of file
+__process_spawn :: (path: str, args: [] str, non_blocking_io: bool) -> io.Process.Handle #foreign "env" "process_spawn" ---
+__process_read  :: (handle: io.Process.Handle, buffer: [] u8) -> u32 #foreign "env" "process_read" ---
+__process_write :: (handle: io.Process.Handle, buffer: [] u8) -> u32 #foreign "env" "process_write" ---
+__process_kill  :: (handle: io.Process.Handle) -> bool #foreign "env" "process_kill" ---
+__process_wait  :: (handle: io.Process.Handle) -> ProcessResult #foreign "env" "process_wait" ---
\ No newline at end of file
index 862904f8659bfff6fc52a211823b6d43b06d22ad..9ca7425755fb5e2b45e12d31b6001752dc1787f5 100644 (file)
@@ -58,6 +58,8 @@ wasm_extern_t* wasm_extern_lookup_by_name(wasm_module_t* module, wasm_instance_t
     return exports.data[idx];
 }
 
+#define WASM_INTEROP(name) static wasm_trap_t * name (const wasm_val_vec_t *params, wasm_val_vec_t *results)
+
 
 typedef struct OnyxThread {
     i32 id;
@@ -97,8 +99,14 @@ static i32 onyx_run_thread(void *data) {
 
     wasm_trap_t* trap=NULL;
 
+    // NOTE: This is cached in a local variable because there is a tiny chance that if a lot of threads are created
+    // then the backing array for the thread handles will move and thread* we have well not be valid. I'm betting on this
+    // not happening before now in the function; however, it *could* happen before we call thread_exit. This would be bad
+    // because of the normal reasons why accessing memory that you don't own any more is bad.
+    i32 thread_id = thread->id;
+
     { // Call the _thread_start procedure
-        wasm_val_t args[]    = { WASM_I32_VAL(thread->id), WASM_I32_VAL(thread->tls_base), WASM_I32_VAL (thread->funcidx), WASM_I32_VAL(thread->dataptr) };
+        wasm_val_t args[]    = { WASM_I32_VAL(thread_id), WASM_I32_VAL(thread->tls_base), WASM_I32_VAL (thread->funcidx), WASM_I32_VAL(thread->dataptr) };
         wasm_val_vec_t results;
         wasm_val_vec_t args_array = WASM_ARRAY_VEC(args);
 
@@ -114,7 +122,7 @@ static i32 onyx_run_thread(void *data) {
     }
 
     { // Call the _thread_exit procedure
-        wasm_val_t args[]    = { WASM_I32_VAL(thread->id) };
+        wasm_val_t args[]    = { WASM_I32_VAL(thread_id) };
         wasm_val_vec_t results;
         wasm_val_vec_t args_array = WASM_ARRAY_VEC(args);
 
@@ -124,7 +132,7 @@ static i32 onyx_run_thread(void *data) {
     return 0;
 }
 
-static wasm_trap_t* onyx_spawn_thread_impl(const wasm_val_vec_t* params, wasm_val_vec_t* results) {
+WASM_INTEROP(onyx_spawn_thread_impl) {
     if (threads == NULL) bh_arr_new(global_heap_allocator, threads, 128);
     bh_arr_insert_end(threads, 1);
     OnyxThread *thread = &bh_arr_last(threads);
@@ -146,7 +154,7 @@ static wasm_trap_t* onyx_spawn_thread_impl(const wasm_val_vec_t* params, wasm_va
     return NULL;
 }
 
-static wasm_trap_t* onyx_kill_thread_impl(const wasm_val_vec_t* params, wasm_val_vec_t* results) {
+WASM_INTEROP(onyx_kill_thread_impl) {
     i32 thread_id = params->data[0].of.i32;
 
     i32 i = 0;
@@ -172,18 +180,33 @@ static wasm_trap_t* onyx_kill_thread_impl(const wasm_val_vec_t* params, wasm_val
     return NULL;
 }
 
-static wasm_trap_t* onyx_spawn_process_impl(const wasm_val_vec_t* params, wasm_val_vec_t* results) {
+#define ONYX_PROCESS_MAGIC_NUMBER 0xdeadfadebabecafe
+typedef struct OnyxProcess {
+    u64 magic_number;
+
+#ifdef _BH_LINUX
+    // Pipes
+    i32 proc_to_host[2];
+    i32 host_to_proc[2];
+
+    pid_t pid;
+#endif
+} OnyxProcess;
+
+WASM_INTEROP(onyx_process_spawn_impl) {
     char* process_str = (char *) wasm_memory_data(wasm_memory) + params->data[0].of.i32;
     i32   process_len = params->data[1].of.i32;
     i32   args_ptr    = params->data[2].of.i32;
     i32   args_len    = params->data[3].of.i32;
+    b32   blocking_io = !params->data[4].of.i32;
 
     char process_path[1024];
     process_len = bh_min(1023, process_len);
     memcpy(process_path, process_str, process_len);
     process_path[process_len] = '\0';
 
-    // CLEANUP: Make the return value from the Windows and Linux version mean the same thing!!!
+    OnyxProcess *process = bh_alloc_item(global_heap_allocator, OnyxProcess);
+    process->magic_number = ONYX_PROCESS_MAGIC_NUMBER;
 
     #ifdef _BH_LINUX
         char **process_args = bh_alloc_array(global_scratch_allocator, char *, args_len + 2);
@@ -200,22 +223,37 @@ static wasm_trap_t* onyx_spawn_process_impl(const wasm_val_vec_t* params, wasm_v
         }
         process_args[0] = process_path;
         process_args[args_len + 1] = NULL;
-        
-        switch (fork()) {
+
+        pipe(process->proc_to_host);
+        pipe(process->host_to_proc);
+
+        pid_t pid;
+        switch (pid = fork()) {
             case -1: // Bad fork
-                results->data[0] = WASM_I32_VAL(1); // Failed to run
+                wasm_val_init_ptr(&results->data[0], NULL); // Failed to run
                 break;
 
             case 0: // Child process
+                close(process->proc_to_host[0]);
+                close(process->host_to_proc[1]);
+                dup2(process->proc_to_host[1], 1); // Map the output to the pipe
+                dup2(process->host_to_proc[0], 0); // Map the output to the pipe
+
+                if (!blocking_io) {
+                    fcntl(0, F_SETFL, O_NONBLOCK);
+                    fcntl(1, F_SETFL, O_NONBLOCK);
+                }
+
                 execv(process_path, process_args);
+                wasm_val_init_ptr(&results->data[0], NULL);
+                break;
             
             default: {
-                i32 status;
-                wait(&status);
+                process->pid = pid;
+                close(process->host_to_proc[0]);
+                close(process->proc_to_host[1]);
 
-                i32 exit_status = WEXITSTATUS(status);
-            
-                results->data[0] = WASM_I32_VAL(exit_status != 0 ? 2 : 0); // Error if non-zero exit, Success if zero.
+                wasm_val_init_ptr(&results->data[0], process);
                 break;
             }
         }
@@ -258,6 +296,81 @@ static wasm_trap_t* onyx_spawn_process_impl(const wasm_val_vec_t* params, wasm_v
     return NULL;
 }
 
+WASM_INTEROP(onyx_process_read_impl) {
+    OnyxProcess *process = (OnyxProcess *) params->data[0].of.i64;
+    if (process == NULL || process->magic_number != ONYX_PROCESS_MAGIC_NUMBER) {
+        results->data[0] = WASM_I32_VAL(0);
+        return NULL;
+    }
+
+    i32 output_ptr = params->data[1].of.i32;
+    i32 output_len = params->data[2].of.i32;
+    u8 *buffer = wasm_memory_data(wasm_memory) + output_ptr;
+
+    i32 bytes_read;
+    #ifdef _BH_LINUX
+        bytes_read = read(process->proc_to_host[0], buffer, output_len);
+        bytes_read = bh_max(bytes_read, 0);  // Silently consume errors
+    #endif
+
+    results->data[0] = WASM_I32_VAL(bytes_read);
+    return NULL;
+}
+
+WASM_INTEROP(onyx_process_write_impl) {
+    OnyxProcess *process = (OnyxProcess *) params->data[0].of.i64;
+    if (process == NULL || process->magic_number != ONYX_PROCESS_MAGIC_NUMBER) {
+        results->data[0] = WASM_I32_VAL(0);
+        return NULL;
+    }
+
+    i32 input_ptr = params->data[1].of.i32;
+    i32 input_len = params->data[2].of.i32;
+    u8 *buffer = wasm_memory_data(wasm_memory) + input_ptr;
+
+    i32 bytes_written;
+    #ifdef _BH_LINUX
+        bytes_written = write(process->host_to_proc[1], buffer, input_len);
+        bytes_written = bh_max(bytes_written, 0);  // Silently consume errors
+    #endif
+
+    results->data[0] = WASM_I32_VAL(bytes_written);
+    return NULL;
+}
+
+WASM_INTEROP(onyx_process_kill_impl) {
+    OnyxProcess *process = (OnyxProcess *) params->data[0].of.i64;
+    if (process == NULL || process->magic_number != ONYX_PROCESS_MAGIC_NUMBER) {
+        results->data[0] = WASM_I32_VAL(0);
+        return NULL;
+    }
+
+    #ifdef _BH_LINUX
+        i32 failed = kill(process->pid, SIGKILL);
+        results->data[0] = WASM_I32_VAL(!failed);
+    #endif
+
+    return NULL;
+}
+
+WASM_INTEROP(onyx_process_wait_impl) {
+    OnyxProcess *process = (OnyxProcess *) params->data[0].of.i64;
+    if (process == NULL || process->magic_number != ONYX_PROCESS_MAGIC_NUMBER) {
+        results->data[0] = WASM_I32_VAL(1);
+        return NULL;
+    }
+
+    #ifdef _BH_LINUX
+        i32 status;
+        waitpid(process->pid, &status, 0);
+
+        i32 exit_code = WEXITSTATUS(status);
+        results->data[0] = WASM_I32_VAL(exit_code != 0 ? 2 : 0);
+    #endif
+
+    return NULL;
+}
+
 void onyx_run_wasm(bh_buffer wasm_bytes) {
     wasm_instance_t* instance = NULL;
     wasmer_features_t* features = NULL;
@@ -358,12 +471,55 @@ void onyx_run_wasm(bh_buffer wasm_bytes) {
                 goto import_found;
             }
 
-            if (wasm_name_equals_string(import_name, "spawn_process")) {
-                wasm_functype_t* func_type = wasm_functype_new_4_1(
-                    wasm_valtype_new_i32(), wasm_valtype_new_i32(), wasm_valtype_new_i32(), wasm_valtype_new_i32(),
+            if (wasm_name_equals_string(import_name, "process_spawn")) {
+                wasm_valtype_t* ps[5] = {
+                    wasm_valtype_new_i32(), wasm_valtype_new_i32(),
+                    wasm_valtype_new_i32(), wasm_valtype_new_i32(),
+                    wasm_valtype_new_i32()
+                };
+                wasm_valtype_t* rs[1] = { wasm_valtype_new_i64() };
+                wasm_valtype_vec_t params, results;
+                wasm_valtype_vec_new(&params, 5, ps);
+                wasm_valtype_vec_new(&results, 1, rs);
+                wasm_functype_t* func_type = wasm_functype_new(&params, &results);
+
+                wasm_func_t* wasm_func = wasm_func_new(wasm_store, func_type, onyx_process_spawn_impl);
+                import = wasm_func_as_extern(wasm_func);
+                goto import_found;
+            }
+
+            if (wasm_name_equals_string(import_name, "process_read")) {
+                wasm_functype_t* func_type = wasm_functype_new_3_1(
+                    wasm_valtype_new_i64(), wasm_valtype_new_i32(), wasm_valtype_new_i32(),
                     wasm_valtype_new_i32());
 
-                wasm_func_t* wasm_func = wasm_func_new(wasm_store, func_type, onyx_spawn_process_impl);
+                wasm_func_t* wasm_func = wasm_func_new(wasm_store, func_type, onyx_process_read_impl);
+                import = wasm_func_as_extern(wasm_func);
+                goto import_found;
+            }
+
+            if (wasm_name_equals_string(import_name, "process_write")) {
+                wasm_functype_t* func_type = wasm_functype_new_3_1(
+                    wasm_valtype_new_i64(), wasm_valtype_new_i32(), wasm_valtype_new_i32(),
+                    wasm_valtype_new_i32());
+
+                wasm_func_t* wasm_func = wasm_func_new(wasm_store, func_type, onyx_process_write_impl);
+                import = wasm_func_as_extern(wasm_func);
+                goto import_found;
+            }
+
+            if (wasm_name_equals_string(import_name, "process_kill")) {
+                wasm_functype_t* func_type = wasm_functype_new_1_1(wasm_valtype_new_i64(), wasm_valtype_new_i32());
+
+                wasm_func_t* wasm_func = wasm_func_new(wasm_store, func_type, onyx_process_kill_impl);
+                import = wasm_func_as_extern(wasm_func);
+                goto import_found;
+            }
+
+            if (wasm_name_equals_string(import_name, "process_wait")) {
+                wasm_functype_t* func_type = wasm_functype_new_1_1(wasm_valtype_new_i64(), wasm_valtype_new_i32());
+
+                wasm_func_t* wasm_func = wasm_func_new(wasm_store, func_type, onyx_process_wait_impl);
                 import = wasm_func_as_extern(wasm_func);
                 goto import_found;
             }