added: `io.stream_poll` and related functions
authorBrendan Hansen <brendan.f.hansen@gmail.com>
Sun, 15 Oct 2023 03:36:24 +0000 (22:36 -0500)
committerBrendan Hansen <brendan.f.hansen@gmail.com>
Sun, 15 Oct 2023 03:36:24 +0000 (22:36 -0500)
14 files changed:
CHANGELOG
compiler/src/onyx.c
core/io/stdio.onyx
core/io/stream.onyx
core/net/net.onyx
core/runtime/platform/js/platform.onyx
core/runtime/platform/onyx/fs.onyx
core/runtime/platform/onyx/platform.onyx
core/runtime/platform/wasi/platform.onyx
core/runtime/platform/wasi/wasi_fs.onyx
interpreter/include/vm.h
runtime/onyx_runtime.c
runtime/src/ort_net.h
runtime/src/ort_os.h

index 7fba5704fedeb43a8167f1d643f41ea9e40d5b48..35d01b9fa46a3c8ce9c64b8bb8a1df06b5d51ee7 100644 (file)
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,3 +1,17 @@
+Release v0.1.7
+--------------
+Unreleased
+
+Additions:
+
+Removals:
+
+Changes:
+
+Bugfixes:
+
+
+
 Release v0.1.6
 -----------
 24th September 2023
index f0f03ce84ff546006b6926351f68c17ddf95127c..dfad34a0e01713945ca129d38f938eb268cc8a61 100644 (file)
@@ -934,7 +934,7 @@ static i32 onyx_compile() {
     if (context.options->verbose_output > 0) {
         // TODO: Replace these with bh_printf when padded formatting is added.
         printf("\nStatistics:\n");
-        printf("    Time taken: %lf seconds\n", (double) duration / 1000);
+        printf("    Time taken: %lf ms\n", (double) duration);
         printf("    Processed %ld lines (%f lines/second).\n", context.lexer_lines_processed, ((f32) 1000 * context.lexer_lines_processed) / (duration));
         printf("    Processed %ld tokens (%f tokens/second).\n", context.lexer_tokens_processed, ((f32) 1000 * context.lexer_tokens_processed) / (duration));
         printf("\n");
index f9c832045ad09b996600b89c92050203d1d9a8f0..53f84366f52f011121265bdbc6b261e833f01a88 100644 (file)
@@ -214,5 +214,13 @@ __byte_dump :: (ptr: rawptr, byte_count: u32, bytes_per_line := 8) {
     flush = (_: &io.Stream) -> io.Error {
         __flush_stdio();
         return .None;
+    },
+
+    poll = (_: &io.Stream, ev: io.PollEvent, timeout: i32) -> (io.Error, bool) {
+        // TODO: This might not be right, as maybe sometimes you can't actually
+        // write because a the output was closed?
+        if ev == .Write do return .None, true;
+
+        return .None, runtime.platform.__wait_for_input(timeout);
     }
 }
index f6de5cb5abec17936a1938a7569904065f9e848e..4063f2e915ae485552fe03ad0dc77017e52ab13b 100644 (file)
@@ -32,6 +32,15 @@ Stream_Vtable :: struct {
     flush        : (s: &Stream) -> Error                                       = null_proc;
 
     size         : (s: &Stream) -> i32                                         = null_proc;
+
+    poll         : (s: &Stream, ev: PollEvent, timeout: i32) -> (Error, bool)  = null_proc;
+}
+
+PollEvent :: enum {
+    None :: 0x00;
+    Read :: 0x01;
+    Write :: 0x02;
+    Closed :: 0x03;
 }
 
 SeekFrom :: enum {
@@ -134,6 +143,22 @@ stream_size :: (use s: &Stream) -> i32 {
     return vtable.size(s);
 }
 
+#doc """
+    Waits until a stream is able to be read from or written to.
+
+    If `timeout` < 0, then there is an indefinite timeout.
+    
+    If `timeout` = 0, then there is no timeout, and this function returns immediately.
+
+    If `timeout` > 0, then there is a `timeout` millisecond delay before returning false.
+"""
+stream_poll :: (use s: &Stream, ev: PollEvent, timeout: i32) -> (Error, bool) {
+    if vtable == null do return .NoVtable, false;
+    if vtable.poll == null_proc do return .NotImplemented, false;
+
+    return vtable.poll(s, ev, timeout);
+}
+
 
 //
 // BufferStream
@@ -344,6 +369,14 @@ buffer_stream_vtable := Stream_Vtable.{
         }
 
         return .None;
+    },
+
+    poll = (use dss: &BufferStream, ev: io.PollEvent, timeout: i32) -> (Error, bool) {
+        if ev == .Write && !write_enabled {
+            return .None, false;
+        }
+
+        return .None, true;
     }
 }
 
index 4c3cdf449b5a30d632fb97e32c4cdc13b24be5a9..a866c97814e7866cad4fa99fe91b823831ad3ae9 100644 (file)
@@ -180,22 +180,35 @@ Socket_Poll_Status :: enum {
 socket_poll_all :: (sockets: [] &Socket, timeout := -1, stat_buff: [] Socket_Poll_Status = .[]) {
     if sockets.count > stat_buff.count do return;
 
-    handles := alloc.array_from_stack(Socket.Handle, sockets.count);
+    handles := alloc.array_from_stack(runtime.platform.PollDescription, sockets.count);
     for i: sockets.count {
-        handles[i] = sockets[i].handle;
+        handles[i] = .{
+            ~~cast(i32) sockets[i].handle, .Read
+        };
     }
 
-    __net_poll_recv(handles, timeout, stat_buff.data);
+    runtime.platform.__poll(handles, timeout);
+    for i: sockets.count {
+        stat_buff[i] = switch handles[i].out {
+            case .None => .No_Change
+            case .Read => .Readable
+            case .Closed => .Closed
+        };
+    }
 }
 
 socket_poll :: (socket: &Socket, timeout := -1) -> Socket_Poll_Status {
-    handles := alloc.array_from_stack(Socket.Handle, 1);
-    handles[0] = socket.handle;
+    fds := runtime.platform.PollDescription.[
+        .{ ~~cast(i32) socket.handle, .Read }
+    ];
 
-    stat: Socket_Poll_Status;
-    __net_poll_recv(handles, timeout, &stat);
+    runtime.platform.__poll(fds, timeout);
 
-    return stat;
+    return switch fds[0].out {
+        case .None => .No_Change
+        case .Read => .Readable
+        case .Closed => .Closed
+    };
 }
 
 socket_send :: (s: &Socket, data: [] u8) -> i32 {
@@ -293,6 +306,18 @@ network_to_host :: #match #local {}
         return .None, bytes_written;
     },
 
+    poll = (use s: &Socket, ev: io.PollEvent, timeout: i32) -> (io.Error, bool) {
+        if ev == .Write do return .None, true;
+
+        status := socket_poll(s, timeout);
+
+        if status == .Closed {
+            return .EOF, false;
+        }
+
+        return .None, status == .Readable;
+    },
+
     close = (use p: &Socket) -> io.Error {
         __net_close_socket(handle);
         return .None;
@@ -318,7 +343,6 @@ network_to_host :: #match #local {}
     #package __net_sendto        :: (handle: Socket.Handle, data: [] u8, addr: &Socket_Address)  -> i32 ---
     #package __net_recv          :: (handle: Socket.Handle, data: [] u8, async_would_block: &bool) -> i32 ---
     #package __net_recvfrom      :: (handle: Socket.Handle, data: [] u8, out_recv_addr: &Socket_Address, async_would_block: &bool) -> i32 ---
-    #package __net_poll_recv     :: (handle: [] Socket.Handle, timeout: i32, out_statuses: &Socket_Poll_Status) -> void ---
 
     #package __net_host_to_net_s :: (s: u16) -> u16 ---
     #package __net_host_to_net_l :: (s: u32) -> u32 ---
index 0b4e8a25131b612ade5c2b06d6e5858fc529e359..5f4d560433984e2ca47fa068ccca13bfae1fa2eb 100644 (file)
@@ -31,6 +31,8 @@ __time            :: ()            -> i64  #foreign "host" "time" ---
     __read_from_input :: (buf: [] u8)  -> u32 do return 0;
 }
 
+__wait_for_input :: (timeout: i32) => true;
+
 __futex_wait :: (addr: rawptr, expected: i32, timeout: i32) -> i32 {
     use core.intrinsics.atomics {__atomic_wait}
     if context.thread_id != 0 {
index 7a3e8345b5c2f6f425198cf4a00ee764bc31b7e0..64a1da3d28184fc216263a7cfa7ea6fbf3471bc0 100644 (file)
@@ -5,6 +5,17 @@ use core {*}
 FileData :: #distinct i64
 DirectoryData :: #distinct u64
 
+PollDescription :: struct {
+    fd: u64;
+    in: io.PollEvent;
+    out: io.PollEvent;
+}
+
+#foreign "onyx_runtime" {
+    __poll :: (fds: [] PollDescription, timeout: i32) -> void ---
+}
+
+
 #local {
     #foreign "onyx_runtime" {
         __file_open_impl :: (path: str, mode: os.OpenMode, out_handle: &FileData) -> os.FileError ---
@@ -110,5 +121,21 @@ __file_stream_vtable := io.Stream_Vtable.{
     size = (use fs: &os.File) -> i32 {
         return __file_size(data);
     },
+
+    poll = (use fs: &os.File, ev: io.PollEvent, timeout: i32) -> (io.Error, bool) {
+        fds: [1] PollDescription;
+        fds[0] = .{
+            fd = ~~data,
+            in = ev
+        };
+
+        __poll(fds, timeout);
+
+        if fds[0].out == .Closed {
+            return .EOF, ev == .Closed;
+        }
+
+        return .None, fds[0].out == ev;
+    }
 };
 
index b317170b75e37c0d637761f2c59ba5baf601ae54..7325bce90f173cb586db983343a2068e7099371c 100644 (file)
@@ -55,6 +55,12 @@ __read_from_input :: (buffer: [] u8) -> i32 {
     return read;
 }
 
+__wait_for_input :: (timeout: i32) -> bool {
+    err, ready := io.stream_poll(&__stdin, .Read, timeout);
+    if err != .None do return false;
+    return ready;
+}
+
 
 ProcessData :: #distinct u64
 
index 8722ad90af54c5a05dd26f99bdbf9d8e1867ee8a..85dd6f866f7129f0a89adbae1d3ffa92a6722116 100644 (file)
@@ -63,6 +63,45 @@ __read_from_input :: (buffer: [] u8) -> i32 {
     return read;
 }
 
+__wait_for_input :: (timeout: i32) -> bool {
+    events := 1;
+    subscriptions: [2] Subscription;
+
+    fd_tagged: SubscriptionTagged;
+    fd_tagged.tag = .FDRead;
+    fd_tagged.fd_read = .{ file_description = STDIN_FILENO, };
+    subscriptions[0] = .{
+        userdata = 0,
+        u = tagged,
+    };
+
+    clock_tagged: SubscriptionTagged;
+    if timeout > 0 {
+        events += 1;
+        clock_tagged.tag = .Clock;
+        clock_tagged.clock = .{
+            id = .Realtime,
+            timeout = cast(u64) timeout * 1000000,
+            precision
+        };
+
+        subscriptions[1] = .{
+            userdata = 0,
+            u = clock_tagged,
+        };
+    }
+
+    event: [2] Event;
+    number_of_events: u32;
+
+    error_code := poll_oneoff(&subscription, &event, 2, &number_of_events);
+
+    if number_of_events == 0 do return false;
+    if event[0].type == .Clock do return false;
+
+    return event[0].fd_readwrite.nbytes > 0;
+}
+
 __sleep :: (milliseconds: u32) {
     tagged: SubscriptionTagged;
     tagged.tag = .Clock;
index 011b1231f5ca9dc603d2e583a874e8f86126c417..b2589dbd1a514d59044b339dec03c402d605e89f 100644 (file)
@@ -258,6 +258,46 @@ __file_stream_vtable := io.Stream_Vtable.{
 
         return ~~ file_stat.size;
     },
+
+    poll = (use fs: &os.File, ev: io.PollEvent, timeout: i32) -> (Error, bool) {
+        // TODO: Not entirely sure this is implemented correctly.
+        events := 1;
+        subscriptions: [2] Subscription;
+
+        fd_tagged: SubscriptionTagged;
+        fd_tagged.tag = .FDRead;
+        fd_tagged.fd_read = .{ file_description = cast(i64) data, };
+        subscriptions[0] = .{
+            userdata = 0,
+            u = tagged,
+        };
+
+        clock_tagged: SubscriptionTagged;
+        if timeout > 0 {
+            events += 1;
+            clock_tagged.tag = .Clock;
+            clock_tagged.clock = .{
+                id = .Realtime,
+                timeout = cast(u64) timeout * 1000000,
+                precision
+            };
+
+            subscriptions[1] = .{
+                userdata = 0,
+                u = clock_tagged,
+            };
+        }
+
+        event: [2] Event;
+        number_of_events: u32;
+
+        error_code := poll_oneoff(&subscription, &event, 2, &number_of_events);
+
+        if number_of_events == 0 do return .None, false;
+        if event[0].type == .Clock do return .None, false;
+
+        return .None, event[0].fd_readwrite.nbytes > 0;
+    }
 }
 
 
index 77220270db78d43e9076c819a0653491bd102617..c6b3c6d1f4660a9203e97a4728a519f7c0a20420 100644 (file)
@@ -58,7 +58,6 @@ struct ovm_static_integer_array_t {
 #define OVM_TYPE_V128   0x07
 
 struct ovm_value_t {
-    ovm_valtype_t type;
     union {
         i8  i8;
         i16 i16;
@@ -71,6 +70,7 @@ struct ovm_value_t {
         f32 f32;
         f64 f64;
     };
+    ovm_valtype_t type;
 };
 
 
index 24dc0d1d213cf20ef1fcdf66051959009108b3bd..c80097bf951e73176622b2e9c40a6153b4fd97bc 100644 (file)
@@ -52,6 +52,7 @@ ONYX_LIBRARY {
     ONYX_FUNC(__file_size)
     ONYX_FUNC(__file_get_standard)
     ONYX_FUNC(__file_rename)
+    ONYX_FUNC(__poll)
 
     ONYX_FUNC(__dir_open)
     ONYX_FUNC(__dir_read)
@@ -96,7 +97,6 @@ ONYX_LIBRARY {
     ONYX_FUNC(__net_sendto)
     ONYX_FUNC(__net_recv)
     ONYX_FUNC(__net_recvfrom)
-    ONYX_FUNC(__net_poll_recv)
     ONYX_FUNC(__net_host_to_net_s)
     ONYX_FUNC(__net_host_to_net_l)
     ONYX_FUNC(__net_net_to_host_s)
index 3550862b9e6470f3e35fc06ed97629d775921ae0..caa407ef322bb2012ed19b675749b360a22df279 100644 (file)
@@ -324,40 +324,6 @@ ONYX_DEF(__net_recvfrom, (WASM_I32, WASM_I32, WASM_I32, WASM_I32, WASM_I32), (WA
     return NULL;
 }
 
-ONYX_DEF(__net_poll_recv, (WASM_I32, WASM_I32, WASM_I32, WASM_I32), ()) {
-    #ifdef _BH_LINUX
-    int i, res, cursor;
-    struct pollfd* fds;
-
-    fds = alloca(params->data[1].of.i32 * sizeof(struct pollfd));
-
-    for (i=0; i < params->data[1].of.i32; i++) {
-        fds[i].fd = *(i32 *) ONYX_PTR(params->data[0].of.i32 + 4 * i);
-        fds[i].events = POLLIN;
-        fds[i].revents = 0;
-    }
-
-    res = poll(fds, params->data[1].of.i32, params->data[2].of.i32);
-
-    for (i=0; i<params->data[1].of.i32; i++) {
-        *(i32 *) ONYX_PTR(params->data[3].of.i32 + 4 * i) = 0; // NO_CHANGE
-
-        if (fds[i].revents & POLLIN) {
-            *(i32 *) ONYX_PTR(params->data[3].of.i32 + 4 * i) = 1; // READABLE
-        }
-
-        if ((fds[i].revents & POLLHUP)
-            || (fds[i].revents & POLLNVAL)
-            || (fds[i].revents & POLLERR)) {
-            *(i32 *) ONYX_PTR(params->data[3].of.i32 + 4 * i) = 2; // CLOSED
-        }
-    }
-
-    #endif
-
-    return NULL;
-}
-
 ONYX_DEF(__net_host_to_net_s, (WASM_I32), (WASM_I32)) {
     results->data[0] = WASM_I32_VAL(htons(params->data[0].of.i32));
     return NULL;
index 5f875e593b5dd1a57c0587a19dd7deeb611e3e4e..403f5b7017ebb2090940c60350112ddd15ac48c3 100644 (file)
@@ -53,6 +53,52 @@ ONYX_DEF(__time, (), (WASM_I64)) {
     return NULL;
 }
 
+// ([] PollDescription, timeout: i32) -> void
+// PollDescription :: struct { fd: i64; in_event: PollEvent; out_event: PollEvent; }
+ONYX_DEF(__poll, (WASM_I32, WASM_I32, WASM_I32), ()) {
+    #ifdef _BH_LINUX
+    struct pollfd* fds = alloca(params->data[1].of.i32 * sizeof(struct pollfd));
+
+    for (int i=0; i < params->data[1].of.i32; i++) {
+        fds[i].fd = *(i64 *) ONYX_PTR(params->data[0].of.i32 + 16 * i);
+        fds[i].revents = 0;
+        fds[i].events = 0;
+
+        switch (*(i32 *) ONYX_PTR(params->data[0].of.i32 + 16 * i + 8)) {
+            case 0x01: // Read
+                fds[i].events = POLLIN;
+                break;
+            
+            case 0x02: // Write
+                fds[i].events = POLLOUT;
+                break;
+        }
+    }
+
+    int res = poll(fds, params->data[1].of.i32, params->data[2].of.i32);
+
+    for (int i=0; i<params->data[1].of.i32; i++) {
+        *(i32 *) ONYX_PTR(params->data[0].of.i32 + 16 * i + 12) = 0; // NO_CHANGE
+
+        if (fds[i].revents & POLLIN) {
+            *(i32 *) ONYX_PTR(params->data[0].of.i32 + 16 * i + 12) = 1; // READABLE
+        }
+
+        if (fds[i].revents & POLLOUT) {
+            *(i32 *) ONYX_PTR(params->data[0].of.i32 + 16 * i + 12) = 2; // WRITABLE
+        }
+
+        if ((fds[i].revents & POLLHUP)
+            || (fds[i].revents & POLLNVAL)
+            || (fds[i].revents & POLLERR)) {
+            *(i32 *) ONYX_PTR(params->data[0].of.i32 + 16 * i + 12) = 3; // CLOSED
+        }
+    }
+    #endif
+
+    return NULL;
+}
+
 
 ONYX_DEF(__lookup_env, (WASM_I32, WASM_I32, WASM_I32, WASM_I32), (WASM_I32)) {