From 9ffa76a7b4833a4ab619d0cabf224aa964d336bb Mon Sep 17 00:00:00 2001 From: Brendan Hansen Date: Thu, 2 Dec 2021 11:02:33 -0600 Subject: [PATCH] better process support on linux --- core/io/process.onyx | 61 ++++++++++++ core/io/reader.onyx | 2 +- core/runtime/onyx_run.onyx | 10 +- src/wasm_runtime.c | 192 +++++++++++++++++++++++++++++++++---- 4 files changed, 244 insertions(+), 21 deletions(-) create mode 100644 core/io/process.onyx diff --git a/core/io/process.onyx b/core/io/process.onyx new file mode 100644 index 00000000..bfa055e5 --- /dev/null +++ b/core/io/process.onyx @@ -0,0 +1,61 @@ +package core.io + +// Some thoughts about processes and the API. +// +// Should processes ever directly dump to standard output? +// Or should their output always be buffered through a pipe to the user? +// +// + + +#local runtime :: package runtime +#if runtime.Runtime != runtime.Runtime_Onyx { + #error "This file can only be included in the 'onyx' runtime, because Wasi has not defined how to spawn and manage processes."; +} + +Process :: struct { + Handle :: #distinct i64; + + use stream: Stream; + process_handle: Handle; +} + +process_spawn :: (path: str, args: [] str, non_blocking_io := false) -> Process { + handle := runtime.__process_spawn(path, args, non_blocking_io); + + return .{ + .{ ^process_stream_vtable }, + handle, + }; +} + +process_kill :: (use p: ^Process) -> bool { + return runtime.__process_kill(process_handle); +} + +process_wait :: (use p: ^Process) => { + return runtime.__process_wait(process_handle); +} + +#local process_stream_vtable := Stream_Vtable.{ + read = (use p: ^Process, buffer: [] u8) -> (Error, u32) { + // Read from the process stdout + if cast(i64) process_handle == 0 do return .BadFile, 0; + + bytes_read := runtime.__process_read(process_handle, buffer); + return .None, bytes_read; + }, + + write = (use p: ^Process, buffer: [] u8) -> (Error, u32) { + // Write to the process stdin + if cast(i64) process_handle == 0 do return .BadFile, 0; + + bytes_written := runtime.__process_write(process_handle, buffer); + return .None, bytes_written; + }, + + close = (use p: ^Process) -> Error { + process_kill(p); + return .None; + } +} diff --git a/core/io/reader.onyx b/core/io/reader.onyx index 6f5d005e..7d38953d 100644 --- a/core/io/reader.onyx +++ b/core/io/reader.onyx @@ -381,7 +381,7 @@ skip_bytes :: (use reader: ^Reader, bytes: u32) -> (skipped: i32, err: Error) { } // Try to re-read multiple times - for 4 { + for 16 { err, n := stream_read(stream, buffer[end .. buffer.count]); end += n; if err != .None { diff --git a/core/runtime/onyx_run.onyx b/core/runtime/onyx_run.onyx index a6015624..f18a3847 100644 --- a/core/runtime/onyx_run.onyx +++ b/core/runtime/onyx_run.onyx @@ -17,10 +17,16 @@ use package wasi #export "_thread_exit" _thread_exit } -#local SpawnProcessResult :: enum { +#load "core/io/process" + +#local ProcessResult :: enum { Success :: 0x00; FailedToRun :: 0x01; Error :: 0x02; } -__spawn_process :: (path: str, args: [] str) -> SpawnProcessResult #foreign "env" "spawn_process" --- \ No newline at end of file +__process_spawn :: (path: str, args: [] str, non_blocking_io: bool) -> io.Process.Handle #foreign "env" "process_spawn" --- +__process_read :: (handle: io.Process.Handle, buffer: [] u8) -> u32 #foreign "env" "process_read" --- +__process_write :: (handle: io.Process.Handle, buffer: [] u8) -> u32 #foreign "env" "process_write" --- +__process_kill :: (handle: io.Process.Handle) -> bool #foreign "env" "process_kill" --- +__process_wait :: (handle: io.Process.Handle) -> ProcessResult #foreign "env" "process_wait" --- \ No newline at end of file diff --git a/src/wasm_runtime.c b/src/wasm_runtime.c index 862904f8..9ca74257 100644 --- a/src/wasm_runtime.c +++ b/src/wasm_runtime.c @@ -58,6 +58,8 @@ wasm_extern_t* wasm_extern_lookup_by_name(wasm_module_t* module, wasm_instance_t return exports.data[idx]; } +#define WASM_INTEROP(name) static wasm_trap_t * name (const wasm_val_vec_t *params, wasm_val_vec_t *results) + typedef struct OnyxThread { i32 id; @@ -97,8 +99,14 @@ static i32 onyx_run_thread(void *data) { wasm_trap_t* trap=NULL; + // NOTE: This is cached in a local variable because there is a tiny chance that if a lot of threads are created + // then the backing array for the thread handles will move and thread* we have well not be valid. I'm betting on this + // not happening before now in the function; however, it *could* happen before we call thread_exit. This would be bad + // because of the normal reasons why accessing memory that you don't own any more is bad. + i32 thread_id = thread->id; + { // Call the _thread_start procedure - wasm_val_t args[] = { WASM_I32_VAL(thread->id), WASM_I32_VAL(thread->tls_base), WASM_I32_VAL (thread->funcidx), WASM_I32_VAL(thread->dataptr) }; + wasm_val_t args[] = { WASM_I32_VAL(thread_id), WASM_I32_VAL(thread->tls_base), WASM_I32_VAL (thread->funcidx), WASM_I32_VAL(thread->dataptr) }; wasm_val_vec_t results; wasm_val_vec_t args_array = WASM_ARRAY_VEC(args); @@ -114,7 +122,7 @@ static i32 onyx_run_thread(void *data) { } { // Call the _thread_exit procedure - wasm_val_t args[] = { WASM_I32_VAL(thread->id) }; + wasm_val_t args[] = { WASM_I32_VAL(thread_id) }; wasm_val_vec_t results; wasm_val_vec_t args_array = WASM_ARRAY_VEC(args); @@ -124,7 +132,7 @@ static i32 onyx_run_thread(void *data) { return 0; } -static wasm_trap_t* onyx_spawn_thread_impl(const wasm_val_vec_t* params, wasm_val_vec_t* results) { +WASM_INTEROP(onyx_spawn_thread_impl) { if (threads == NULL) bh_arr_new(global_heap_allocator, threads, 128); bh_arr_insert_end(threads, 1); OnyxThread *thread = &bh_arr_last(threads); @@ -146,7 +154,7 @@ static wasm_trap_t* onyx_spawn_thread_impl(const wasm_val_vec_t* params, wasm_va return NULL; } -static wasm_trap_t* onyx_kill_thread_impl(const wasm_val_vec_t* params, wasm_val_vec_t* results) { +WASM_INTEROP(onyx_kill_thread_impl) { i32 thread_id = params->data[0].of.i32; i32 i = 0; @@ -172,18 +180,33 @@ static wasm_trap_t* onyx_kill_thread_impl(const wasm_val_vec_t* params, wasm_val return NULL; } -static wasm_trap_t* onyx_spawn_process_impl(const wasm_val_vec_t* params, wasm_val_vec_t* results) { +#define ONYX_PROCESS_MAGIC_NUMBER 0xdeadfadebabecafe +typedef struct OnyxProcess { + u64 magic_number; + +#ifdef _BH_LINUX + // Pipes + i32 proc_to_host[2]; + i32 host_to_proc[2]; + + pid_t pid; +#endif +} OnyxProcess; + +WASM_INTEROP(onyx_process_spawn_impl) { char* process_str = (char *) wasm_memory_data(wasm_memory) + params->data[0].of.i32; i32 process_len = params->data[1].of.i32; i32 args_ptr = params->data[2].of.i32; i32 args_len = params->data[3].of.i32; + b32 blocking_io = !params->data[4].of.i32; char process_path[1024]; process_len = bh_min(1023, process_len); memcpy(process_path, process_str, process_len); process_path[process_len] = '\0'; - // CLEANUP: Make the return value from the Windows and Linux version mean the same thing!!! + OnyxProcess *process = bh_alloc_item(global_heap_allocator, OnyxProcess); + process->magic_number = ONYX_PROCESS_MAGIC_NUMBER; #ifdef _BH_LINUX char **process_args = bh_alloc_array(global_scratch_allocator, char *, args_len + 2); @@ -200,22 +223,37 @@ static wasm_trap_t* onyx_spawn_process_impl(const wasm_val_vec_t* params, wasm_v } process_args[0] = process_path; process_args[args_len + 1] = NULL; - - switch (fork()) { + + pipe(process->proc_to_host); + pipe(process->host_to_proc); + + pid_t pid; + switch (pid = fork()) { case -1: // Bad fork - results->data[0] = WASM_I32_VAL(1); // Failed to run + wasm_val_init_ptr(&results->data[0], NULL); // Failed to run break; case 0: // Child process + close(process->proc_to_host[0]); + close(process->host_to_proc[1]); + dup2(process->proc_to_host[1], 1); // Map the output to the pipe + dup2(process->host_to_proc[0], 0); // Map the output to the pipe + + if (!blocking_io) { + fcntl(0, F_SETFL, O_NONBLOCK); + fcntl(1, F_SETFL, O_NONBLOCK); + } + execv(process_path, process_args); + wasm_val_init_ptr(&results->data[0], NULL); + break; default: { - i32 status; - wait(&status); + process->pid = pid; + close(process->host_to_proc[0]); + close(process->proc_to_host[1]); - i32 exit_status = WEXITSTATUS(status); - - results->data[0] = WASM_I32_VAL(exit_status != 0 ? 2 : 0); // Error if non-zero exit, Success if zero. + wasm_val_init_ptr(&results->data[0], process); break; } } @@ -258,6 +296,81 @@ static wasm_trap_t* onyx_spawn_process_impl(const wasm_val_vec_t* params, wasm_v return NULL; } +WASM_INTEROP(onyx_process_read_impl) { + OnyxProcess *process = (OnyxProcess *) params->data[0].of.i64; + if (process == NULL || process->magic_number != ONYX_PROCESS_MAGIC_NUMBER) { + results->data[0] = WASM_I32_VAL(0); + return NULL; + } + + i32 output_ptr = params->data[1].of.i32; + i32 output_len = params->data[2].of.i32; + u8 *buffer = wasm_memory_data(wasm_memory) + output_ptr; + + i32 bytes_read; + #ifdef _BH_LINUX + bytes_read = read(process->proc_to_host[0], buffer, output_len); + bytes_read = bh_max(bytes_read, 0); // Silently consume errors + #endif + + results->data[0] = WASM_I32_VAL(bytes_read); + return NULL; +} + +WASM_INTEROP(onyx_process_write_impl) { + OnyxProcess *process = (OnyxProcess *) params->data[0].of.i64; + if (process == NULL || process->magic_number != ONYX_PROCESS_MAGIC_NUMBER) { + results->data[0] = WASM_I32_VAL(0); + return NULL; + } + + i32 input_ptr = params->data[1].of.i32; + i32 input_len = params->data[2].of.i32; + u8 *buffer = wasm_memory_data(wasm_memory) + input_ptr; + + i32 bytes_written; + #ifdef _BH_LINUX + bytes_written = write(process->host_to_proc[1], buffer, input_len); + bytes_written = bh_max(bytes_written, 0); // Silently consume errors + #endif + + results->data[0] = WASM_I32_VAL(bytes_written); + return NULL; +} + +WASM_INTEROP(onyx_process_kill_impl) { + OnyxProcess *process = (OnyxProcess *) params->data[0].of.i64; + if (process == NULL || process->magic_number != ONYX_PROCESS_MAGIC_NUMBER) { + results->data[0] = WASM_I32_VAL(0); + return NULL; + } + + #ifdef _BH_LINUX + i32 failed = kill(process->pid, SIGKILL); + results->data[0] = WASM_I32_VAL(!failed); + #endif + + return NULL; +} + +WASM_INTEROP(onyx_process_wait_impl) { + OnyxProcess *process = (OnyxProcess *) params->data[0].of.i64; + if (process == NULL || process->magic_number != ONYX_PROCESS_MAGIC_NUMBER) { + results->data[0] = WASM_I32_VAL(1); + return NULL; + } + + #ifdef _BH_LINUX + i32 status; + waitpid(process->pid, &status, 0); + + i32 exit_code = WEXITSTATUS(status); + results->data[0] = WASM_I32_VAL(exit_code != 0 ? 2 : 0); + #endif + + return NULL; +} + void onyx_run_wasm(bh_buffer wasm_bytes) { wasm_instance_t* instance = NULL; wasmer_features_t* features = NULL; @@ -358,12 +471,55 @@ void onyx_run_wasm(bh_buffer wasm_bytes) { goto import_found; } - if (wasm_name_equals_string(import_name, "spawn_process")) { - wasm_functype_t* func_type = wasm_functype_new_4_1( - wasm_valtype_new_i32(), wasm_valtype_new_i32(), wasm_valtype_new_i32(), wasm_valtype_new_i32(), + if (wasm_name_equals_string(import_name, "process_spawn")) { + wasm_valtype_t* ps[5] = { + wasm_valtype_new_i32(), wasm_valtype_new_i32(), + wasm_valtype_new_i32(), wasm_valtype_new_i32(), + wasm_valtype_new_i32() + }; + wasm_valtype_t* rs[1] = { wasm_valtype_new_i64() }; + wasm_valtype_vec_t params, results; + wasm_valtype_vec_new(¶ms, 5, ps); + wasm_valtype_vec_new(&results, 1, rs); + wasm_functype_t* func_type = wasm_functype_new(¶ms, &results); + + wasm_func_t* wasm_func = wasm_func_new(wasm_store, func_type, onyx_process_spawn_impl); + import = wasm_func_as_extern(wasm_func); + goto import_found; + } + + if (wasm_name_equals_string(import_name, "process_read")) { + wasm_functype_t* func_type = wasm_functype_new_3_1( + wasm_valtype_new_i64(), wasm_valtype_new_i32(), wasm_valtype_new_i32(), wasm_valtype_new_i32()); - wasm_func_t* wasm_func = wasm_func_new(wasm_store, func_type, onyx_spawn_process_impl); + wasm_func_t* wasm_func = wasm_func_new(wasm_store, func_type, onyx_process_read_impl); + import = wasm_func_as_extern(wasm_func); + goto import_found; + } + + if (wasm_name_equals_string(import_name, "process_write")) { + wasm_functype_t* func_type = wasm_functype_new_3_1( + wasm_valtype_new_i64(), wasm_valtype_new_i32(), wasm_valtype_new_i32(), + wasm_valtype_new_i32()); + + wasm_func_t* wasm_func = wasm_func_new(wasm_store, func_type, onyx_process_write_impl); + import = wasm_func_as_extern(wasm_func); + goto import_found; + } + + if (wasm_name_equals_string(import_name, "process_kill")) { + wasm_functype_t* func_type = wasm_functype_new_1_1(wasm_valtype_new_i64(), wasm_valtype_new_i32()); + + wasm_func_t* wasm_func = wasm_func_new(wasm_store, func_type, onyx_process_kill_impl); + import = wasm_func_as_extern(wasm_func); + goto import_found; + } + + if (wasm_name_equals_string(import_name, "process_wait")) { + wasm_functype_t* func_type = wasm_functype_new_1_1(wasm_valtype_new_i64(), wasm_valtype_new_i32()); + + wasm_func_t* wasm_func = wasm_func_new(wasm_store, func_type, onyx_process_wait_impl); import = wasm_func_as_extern(wasm_func); goto import_found; } -- 2.25.1