From: Brendan Hansen Date: Sun, 15 Oct 2023 03:36:24 +0000 (-0500) Subject: added: `io.stream_poll` and related functions X-Git-Url: https://git.brendanfh.com/?a=commitdiff_plain;h=606e6aa263a01994a492bb103680e554f68d9494;p=onyx.git added: `io.stream_poll` and related functions --- diff --git a/CHANGELOG b/CHANGELOG index 7fba5704..35d01b9f 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,17 @@ +Release v0.1.7 +-------------- +Unreleased + +Additions: + +Removals: + +Changes: + +Bugfixes: + + + Release v0.1.6 ----------- 24th September 2023 diff --git a/compiler/src/onyx.c b/compiler/src/onyx.c index f0f03ce8..dfad34a0 100644 --- a/compiler/src/onyx.c +++ b/compiler/src/onyx.c @@ -934,7 +934,7 @@ static i32 onyx_compile() { if (context.options->verbose_output > 0) { // TODO: Replace these with bh_printf when padded formatting is added. printf("\nStatistics:\n"); - printf(" Time taken: %lf seconds\n", (double) duration / 1000); + printf(" Time taken: %lf ms\n", (double) duration); printf(" Processed %ld lines (%f lines/second).\n", context.lexer_lines_processed, ((f32) 1000 * context.lexer_lines_processed) / (duration)); printf(" Processed %ld tokens (%f tokens/second).\n", context.lexer_tokens_processed, ((f32) 1000 * context.lexer_tokens_processed) / (duration)); printf("\n"); diff --git a/core/io/stdio.onyx b/core/io/stdio.onyx index f9c83204..53f84366 100644 --- a/core/io/stdio.onyx +++ b/core/io/stdio.onyx @@ -214,5 +214,13 @@ __byte_dump :: (ptr: rawptr, byte_count: u32, bytes_per_line := 8) { flush = (_: &io.Stream) -> io.Error { __flush_stdio(); return .None; + }, + + poll = (_: &io.Stream, ev: io.PollEvent, timeout: i32) -> (io.Error, bool) { + // TODO: This might not be right, as maybe sometimes you can't actually + // write because a the output was closed? + if ev == .Write do return .None, true; + + return .None, runtime.platform.__wait_for_input(timeout); } } diff --git a/core/io/stream.onyx b/core/io/stream.onyx index f6de5cb5..4063f2e9 100644 --- a/core/io/stream.onyx +++ b/core/io/stream.onyx @@ -32,6 +32,15 @@ Stream_Vtable :: struct { flush : (s: &Stream) -> Error = null_proc; size : (s: &Stream) -> i32 = null_proc; + + poll : (s: &Stream, ev: PollEvent, timeout: i32) -> (Error, bool) = null_proc; +} + +PollEvent :: enum { + None :: 0x00; + Read :: 0x01; + Write :: 0x02; + Closed :: 0x03; } SeekFrom :: enum { @@ -134,6 +143,22 @@ stream_size :: (use s: &Stream) -> i32 { return vtable.size(s); } +#doc """ + Waits until a stream is able to be read from or written to. + + If `timeout` < 0, then there is an indefinite timeout. + + If `timeout` = 0, then there is no timeout, and this function returns immediately. + + If `timeout` > 0, then there is a `timeout` millisecond delay before returning false. +""" +stream_poll :: (use s: &Stream, ev: PollEvent, timeout: i32) -> (Error, bool) { + if vtable == null do return .NoVtable, false; + if vtable.poll == null_proc do return .NotImplemented, false; + + return vtable.poll(s, ev, timeout); +} + // // BufferStream @@ -344,6 +369,14 @@ buffer_stream_vtable := Stream_Vtable.{ } return .None; + }, + + poll = (use dss: &BufferStream, ev: io.PollEvent, timeout: i32) -> (Error, bool) { + if ev == .Write && !write_enabled { + return .None, false; + } + + return .None, true; } } diff --git a/core/net/net.onyx b/core/net/net.onyx index 4c3cdf44..a866c978 100644 --- a/core/net/net.onyx +++ b/core/net/net.onyx @@ -180,22 +180,35 @@ Socket_Poll_Status :: enum { socket_poll_all :: (sockets: [] &Socket, timeout := -1, stat_buff: [] Socket_Poll_Status = .[]) { if sockets.count > stat_buff.count do return; - handles := alloc.array_from_stack(Socket.Handle, sockets.count); + handles := alloc.array_from_stack(runtime.platform.PollDescription, sockets.count); for i: sockets.count { - handles[i] = sockets[i].handle; + handles[i] = .{ + ~~cast(i32) sockets[i].handle, .Read + }; } - __net_poll_recv(handles, timeout, stat_buff.data); + runtime.platform.__poll(handles, timeout); + for i: sockets.count { + stat_buff[i] = switch handles[i].out { + case .None => .No_Change + case .Read => .Readable + case .Closed => .Closed + }; + } } socket_poll :: (socket: &Socket, timeout := -1) -> Socket_Poll_Status { - handles := alloc.array_from_stack(Socket.Handle, 1); - handles[0] = socket.handle; + fds := runtime.platform.PollDescription.[ + .{ ~~cast(i32) socket.handle, .Read } + ]; - stat: Socket_Poll_Status; - __net_poll_recv(handles, timeout, &stat); + runtime.platform.__poll(fds, timeout); - return stat; + return switch fds[0].out { + case .None => .No_Change + case .Read => .Readable + case .Closed => .Closed + }; } socket_send :: (s: &Socket, data: [] u8) -> i32 { @@ -293,6 +306,18 @@ network_to_host :: #match #local {} return .None, bytes_written; }, + poll = (use s: &Socket, ev: io.PollEvent, timeout: i32) -> (io.Error, bool) { + if ev == .Write do return .None, true; + + status := socket_poll(s, timeout); + + if status == .Closed { + return .EOF, false; + } + + return .None, status == .Readable; + }, + close = (use p: &Socket) -> io.Error { __net_close_socket(handle); return .None; @@ -318,7 +343,6 @@ network_to_host :: #match #local {} #package __net_sendto :: (handle: Socket.Handle, data: [] u8, addr: &Socket_Address) -> i32 --- #package __net_recv :: (handle: Socket.Handle, data: [] u8, async_would_block: &bool) -> i32 --- #package __net_recvfrom :: (handle: Socket.Handle, data: [] u8, out_recv_addr: &Socket_Address, async_would_block: &bool) -> i32 --- - #package __net_poll_recv :: (handle: [] Socket.Handle, timeout: i32, out_statuses: &Socket_Poll_Status) -> void --- #package __net_host_to_net_s :: (s: u16) -> u16 --- #package __net_host_to_net_l :: (s: u32) -> u32 --- diff --git a/core/runtime/platform/js/platform.onyx b/core/runtime/platform/js/platform.onyx index 0b4e8a25..5f4d5604 100644 --- a/core/runtime/platform/js/platform.onyx +++ b/core/runtime/platform/js/platform.onyx @@ -31,6 +31,8 @@ __time :: () -> i64 #foreign "host" "time" --- __read_from_input :: (buf: [] u8) -> u32 do return 0; } +__wait_for_input :: (timeout: i32) => true; + __futex_wait :: (addr: rawptr, expected: i32, timeout: i32) -> i32 { use core.intrinsics.atomics {__atomic_wait} if context.thread_id != 0 { diff --git a/core/runtime/platform/onyx/fs.onyx b/core/runtime/platform/onyx/fs.onyx index 7a3e8345..64a1da3d 100644 --- a/core/runtime/platform/onyx/fs.onyx +++ b/core/runtime/platform/onyx/fs.onyx @@ -5,6 +5,17 @@ use core {*} FileData :: #distinct i64 DirectoryData :: #distinct u64 +PollDescription :: struct { + fd: u64; + in: io.PollEvent; + out: io.PollEvent; +} + +#foreign "onyx_runtime" { + __poll :: (fds: [] PollDescription, timeout: i32) -> void --- +} + + #local { #foreign "onyx_runtime" { __file_open_impl :: (path: str, mode: os.OpenMode, out_handle: &FileData) -> os.FileError --- @@ -110,5 +121,21 @@ __file_stream_vtable := io.Stream_Vtable.{ size = (use fs: &os.File) -> i32 { return __file_size(data); }, + + poll = (use fs: &os.File, ev: io.PollEvent, timeout: i32) -> (io.Error, bool) { + fds: [1] PollDescription; + fds[0] = .{ + fd = ~~data, + in = ev + }; + + __poll(fds, timeout); + + if fds[0].out == .Closed { + return .EOF, ev == .Closed; + } + + return .None, fds[0].out == ev; + } }; diff --git a/core/runtime/platform/onyx/platform.onyx b/core/runtime/platform/onyx/platform.onyx index b317170b..7325bce9 100644 --- a/core/runtime/platform/onyx/platform.onyx +++ b/core/runtime/platform/onyx/platform.onyx @@ -55,6 +55,12 @@ __read_from_input :: (buffer: [] u8) -> i32 { return read; } +__wait_for_input :: (timeout: i32) -> bool { + err, ready := io.stream_poll(&__stdin, .Read, timeout); + if err != .None do return false; + return ready; +} + ProcessData :: #distinct u64 diff --git a/core/runtime/platform/wasi/platform.onyx b/core/runtime/platform/wasi/platform.onyx index 8722ad90..85dd6f86 100644 --- a/core/runtime/platform/wasi/platform.onyx +++ b/core/runtime/platform/wasi/platform.onyx @@ -63,6 +63,45 @@ __read_from_input :: (buffer: [] u8) -> i32 { return read; } +__wait_for_input :: (timeout: i32) -> bool { + events := 1; + subscriptions: [2] Subscription; + + fd_tagged: SubscriptionTagged; + fd_tagged.tag = .FDRead; + fd_tagged.fd_read = .{ file_description = STDIN_FILENO, }; + subscriptions[0] = .{ + userdata = 0, + u = tagged, + }; + + clock_tagged: SubscriptionTagged; + if timeout > 0 { + events += 1; + clock_tagged.tag = .Clock; + clock_tagged.clock = .{ + id = .Realtime, + timeout = cast(u64) timeout * 1000000, + precision + }; + + subscriptions[1] = .{ + userdata = 0, + u = clock_tagged, + }; + } + + event: [2] Event; + number_of_events: u32; + + error_code := poll_oneoff(&subscription, &event, 2, &number_of_events); + + if number_of_events == 0 do return false; + if event[0].type == .Clock do return false; + + return event[0].fd_readwrite.nbytes > 0; +} + __sleep :: (milliseconds: u32) { tagged: SubscriptionTagged; tagged.tag = .Clock; diff --git a/core/runtime/platform/wasi/wasi_fs.onyx b/core/runtime/platform/wasi/wasi_fs.onyx index 011b1231..b2589dbd 100644 --- a/core/runtime/platform/wasi/wasi_fs.onyx +++ b/core/runtime/platform/wasi/wasi_fs.onyx @@ -258,6 +258,46 @@ __file_stream_vtable := io.Stream_Vtable.{ return ~~ file_stat.size; }, + + poll = (use fs: &os.File, ev: io.PollEvent, timeout: i32) -> (Error, bool) { + // TODO: Not entirely sure this is implemented correctly. + events := 1; + subscriptions: [2] Subscription; + + fd_tagged: SubscriptionTagged; + fd_tagged.tag = .FDRead; + fd_tagged.fd_read = .{ file_description = cast(i64) data, }; + subscriptions[0] = .{ + userdata = 0, + u = tagged, + }; + + clock_tagged: SubscriptionTagged; + if timeout > 0 { + events += 1; + clock_tagged.tag = .Clock; + clock_tagged.clock = .{ + id = .Realtime, + timeout = cast(u64) timeout * 1000000, + precision + }; + + subscriptions[1] = .{ + userdata = 0, + u = clock_tagged, + }; + } + + event: [2] Event; + number_of_events: u32; + + error_code := poll_oneoff(&subscription, &event, 2, &number_of_events); + + if number_of_events == 0 do return .None, false; + if event[0].type == .Clock do return .None, false; + + return .None, event[0].fd_readwrite.nbytes > 0; + } } diff --git a/interpreter/include/vm.h b/interpreter/include/vm.h index 77220270..c6b3c6d1 100644 --- a/interpreter/include/vm.h +++ b/interpreter/include/vm.h @@ -58,7 +58,6 @@ struct ovm_static_integer_array_t { #define OVM_TYPE_V128 0x07 struct ovm_value_t { - ovm_valtype_t type; union { i8 i8; i16 i16; @@ -71,6 +70,7 @@ struct ovm_value_t { f32 f32; f64 f64; }; + ovm_valtype_t type; }; diff --git a/runtime/onyx_runtime.c b/runtime/onyx_runtime.c index 24dc0d1d..c80097bf 100644 --- a/runtime/onyx_runtime.c +++ b/runtime/onyx_runtime.c @@ -52,6 +52,7 @@ ONYX_LIBRARY { ONYX_FUNC(__file_size) ONYX_FUNC(__file_get_standard) ONYX_FUNC(__file_rename) + ONYX_FUNC(__poll) ONYX_FUNC(__dir_open) ONYX_FUNC(__dir_read) @@ -96,7 +97,6 @@ ONYX_LIBRARY { ONYX_FUNC(__net_sendto) ONYX_FUNC(__net_recv) ONYX_FUNC(__net_recvfrom) - ONYX_FUNC(__net_poll_recv) ONYX_FUNC(__net_host_to_net_s) ONYX_FUNC(__net_host_to_net_l) ONYX_FUNC(__net_net_to_host_s) diff --git a/runtime/src/ort_net.h b/runtime/src/ort_net.h index 3550862b..caa407ef 100644 --- a/runtime/src/ort_net.h +++ b/runtime/src/ort_net.h @@ -324,40 +324,6 @@ ONYX_DEF(__net_recvfrom, (WASM_I32, WASM_I32, WASM_I32, WASM_I32, WASM_I32), (WA return NULL; } -ONYX_DEF(__net_poll_recv, (WASM_I32, WASM_I32, WASM_I32, WASM_I32), ()) { - #ifdef _BH_LINUX - int i, res, cursor; - struct pollfd* fds; - - fds = alloca(params->data[1].of.i32 * sizeof(struct pollfd)); - - for (i=0; i < params->data[1].of.i32; i++) { - fds[i].fd = *(i32 *) ONYX_PTR(params->data[0].of.i32 + 4 * i); - fds[i].events = POLLIN; - fds[i].revents = 0; - } - - res = poll(fds, params->data[1].of.i32, params->data[2].of.i32); - - for (i=0; idata[1].of.i32; i++) { - *(i32 *) ONYX_PTR(params->data[3].of.i32 + 4 * i) = 0; // NO_CHANGE - - if (fds[i].revents & POLLIN) { - *(i32 *) ONYX_PTR(params->data[3].of.i32 + 4 * i) = 1; // READABLE - } - - if ((fds[i].revents & POLLHUP) - || (fds[i].revents & POLLNVAL) - || (fds[i].revents & POLLERR)) { - *(i32 *) ONYX_PTR(params->data[3].of.i32 + 4 * i) = 2; // CLOSED - } - } - - #endif - - return NULL; -} - ONYX_DEF(__net_host_to_net_s, (WASM_I32), (WASM_I32)) { results->data[0] = WASM_I32_VAL(htons(params->data[0].of.i32)); return NULL; diff --git a/runtime/src/ort_os.h b/runtime/src/ort_os.h index 5f875e59..403f5b70 100644 --- a/runtime/src/ort_os.h +++ b/runtime/src/ort_os.h @@ -53,6 +53,52 @@ ONYX_DEF(__time, (), (WASM_I64)) { return NULL; } +// ([] PollDescription, timeout: i32) -> void +// PollDescription :: struct { fd: i64; in_event: PollEvent; out_event: PollEvent; } +ONYX_DEF(__poll, (WASM_I32, WASM_I32, WASM_I32), ()) { + #ifdef _BH_LINUX + struct pollfd* fds = alloca(params->data[1].of.i32 * sizeof(struct pollfd)); + + for (int i=0; i < params->data[1].of.i32; i++) { + fds[i].fd = *(i64 *) ONYX_PTR(params->data[0].of.i32 + 16 * i); + fds[i].revents = 0; + fds[i].events = 0; + + switch (*(i32 *) ONYX_PTR(params->data[0].of.i32 + 16 * i + 8)) { + case 0x01: // Read + fds[i].events = POLLIN; + break; + + case 0x02: // Write + fds[i].events = POLLOUT; + break; + } + } + + int res = poll(fds, params->data[1].of.i32, params->data[2].of.i32); + + for (int i=0; idata[1].of.i32; i++) { + *(i32 *) ONYX_PTR(params->data[0].of.i32 + 16 * i + 12) = 0; // NO_CHANGE + + if (fds[i].revents & POLLIN) { + *(i32 *) ONYX_PTR(params->data[0].of.i32 + 16 * i + 12) = 1; // READABLE + } + + if (fds[i].revents & POLLOUT) { + *(i32 *) ONYX_PTR(params->data[0].of.i32 + 16 * i + 12) = 2; // WRITABLE + } + + if ((fds[i].revents & POLLHUP) + || (fds[i].revents & POLLNVAL) + || (fds[i].revents & POLLERR)) { + *(i32 *) ONYX_PTR(params->data[0].of.i32 + 16 * i + 12) = 3; // CLOSED + } + } + #endif + + return NULL; +} + ONYX_DEF(__lookup_env, (WASM_I32, WASM_I32, WASM_I32, WASM_I32), (WASM_I32)) {