split 'ReadPending' into 'ReadPending' and 'ReadLater'
authorBrendan Hansen <brendan.f.hansen@gmail.com>
Wed, 1 Feb 2023 17:12:53 +0000 (11:12 -0600)
committerBrendan Hansen <brendan.f.hansen@gmail.com>
Wed, 1 Feb 2023 17:12:53 +0000 (11:12 -0600)
core/container/iter.onyx
core/io/io.onyx
core/io/reader.onyx
core/io/stream.onyx
core/net/net.onyx
core/runtime/onyx_run.onyx

index 93d63bd43ee8341b30dbc5ca06f6616a46369488..9a8875a5d9982b2b87d2e3162dc0b8ef5b729814 100644 (file)
@@ -491,11 +491,8 @@ concat :: (iters: ..Iterator($T)) -> Iterator(T) {
         // But I don't know what the semantics should be for specifying which
         // if any iterators to close.
         for^ iters {
-            it.close(it.data);
+            if it.close != null_proc do it.close(it.data);
         }
-
-        // memory.free_slice(^iters);
-        // cfree(c);
     }
 
     return .{
@@ -552,7 +549,6 @@ enumerate :: (it: Iterator($T), start_index: i32 = 0) -> Iterator(Enumeration_Va
 
     close :: (use data: ^Enumeration_Context($T)) {
         if iterator.close != null_proc do iterator.close(iterator.data);
-        // cfree(data);
     }
 
     return .{
@@ -955,7 +951,7 @@ generator_no_copy :: (ctx: ^$Ctx, gen: (^Ctx) -> ($T, bool), close: (^Ctx) -> vo
             thread_function(t_data, body);
 
             for^ threads do thread.join(it);
-            dist.close(dist.data);
+            if dist.close != null_proc do dist.close(dist.data);
         }
 
         thread_function :: (__data: ^$T, $body: Code) {
index 361852aaa1bd474a4c08bbc9005d9fb0ac85c715..bc92c60b87bc984068fd81aecb88290db388e7b6 100644 (file)
@@ -30,6 +30,9 @@ Error :: enum {
     // Not possible to unread.
     InvalidUnread  :: 0x09;
 
-    // When reading from a stream, no data was read.
+    // When reading from a stream, no data was read, but data will become available soon.
     ReadPending    :: 0x0a;
+
+    // When reading from a stream, no data was read, and data will not become available soon.
+    ReadLater      :: 0x0b;
 }
index 6ea84814bafd3f4ce19bded1cc9ba9ca2ab0953f..d39f3a3a316504547e0dd131783ad441fb1374b4 100644 (file)
@@ -1,33 +1,26 @@
 package core.io
 
+// Reader is a buffered reader over a Stream.
+// If you do not need a buffered reader, simply
+// use the stream_read functions to read from the
+// stream. Because Reader is buffered, there are
+// several things it can do that a non-buffered
+// reader could not, like 'read the next line',
+// or 'read until a period'.
+
 use core {memory, math, array, iter}
 
 Reader :: struct {
-    stream : ^Stream;
+    stream: ^Stream;
 
     buffer: [] u8;
     buffer_allocator: Allocator;
 
     start, end: u32; // The start and ending positions of the edges of the buffer "window".    
-
     last_byte: i32;
 
     error: Error;
-
-    done : bool; // If an .EOF was reached.
-
-    // If a .ReadPending was reached, still work with the data
-    // that is available. Only set this if you know that the data
-    // will always be available.
-    greedy : bool;
-
-    // A very special flag that should probably only be used on
-    // standard input (hence the name). This flag says that if there
-    // is ANY data left in the buffer, do not try to fill the buffer
-    // with a call to stream_read. This will produce incorrect results
-    // for things like read_line, but will prevent hanging when using
-    // a blocking read source like stdin.
-    stdin: bool;
+    done: bool; // If an .EOF was reached.
 }
 
 #inject Reader {
@@ -53,13 +46,12 @@ Reader :: struct {
     lines :: lines;
 }
 
-reader_make :: (s: ^Stream, buffer_size := 4096, allocator := context.allocator, greedy := false) -> Reader {
+reader_make :: (s: ^Stream, buffer_size := 4096, allocator := context.allocator) -> Reader {
     assert(s.vtable != null, "Stream vtable was not setup correctly.");
 
     reader: Reader;
     reader.stream = s;
     reader.error = .None;
-    reader.greedy = greedy;
 
     memory.alloc_slice(^reader.buffer, buffer_size, allocator);
     reader.buffer_allocator = allocator;
@@ -174,8 +166,10 @@ read_bytes :: (use reader: ^Reader, bytes: [] u8) -> (i32, Error) {
 
     write_index := 0;
     while n > 0 && !reader_empty(reader) {
-        if reader_read_next_chunk(reader) == .ReadPending {
-            return write_index, .ReadPending;
+        if  err := reader_read_next_chunk(reader);
+            err == .ReadPending || err == .ReadLater
+        {
+            return write_index, err;
         }
 
         to_write := math.min(n, end);
@@ -562,8 +556,8 @@ skip_bytes :: (use reader: ^Reader, bytes: u32) -> (skipped: i32, err: Error) {
 
 lines :: (r: ^Reader, inplace := false) =>
     iter.generator(^.{
-        r = r, inplace = inplace
-
+        r = r,
+        inplace = inplace
     }, (ctx: $C) -> (str, bool) {
         line := ctx.r->read_line(consume_newline=true, inplace=ctx.inplace);
 
@@ -604,7 +598,11 @@ lines :: (r: ^Reader, inplace := false) =>
         return .BufferFull;
     }
 
-    if end > 0 && stdin {
+    // If this stream will block on a read, use the data that is present
+    // in the buffer currently. In theory, it would be better to have a
+    // 'stream_poll' that tests if the next read would block, but that
+    // can happen later.
+    if end > 0 && stream.flags & .Block_On_Read {
         return .None;
     }
 
@@ -615,8 +613,7 @@ lines :: (r: ^Reader, inplace := false) =>
     // to be read, so this should try again later. This has to return None
     // if the end is equal to start because that means that the buffer is
     // completely empty.
-    if err == .ReadPending {
-        if greedy do err = .NoProgress;
+    if err == .ReadPending || err == .ReadLater {
         error = err;
         return err if end == 0 else .None;
     }
index 0a58a7a8e8f1fe2e6ac894c325849c677413bafa..5cbe23d6b0180899028405e23ef631e95a7fd172 100644 (file)
@@ -4,9 +4,17 @@ use core
 
 Stream :: struct {
     use vtable : ^Stream_Vtable;
+    flags: Stream_Flags;
+}
+
+// These flags are used by things like io.Reader as hints
+// for how the stream will behave under certain circumstances.
+Stream_Flags :: enum #flags {
+    // This flags signals that any call to stream_read *could*
+    // block for arbitrary amounts of time.
+    Block_On_Read;
 }
 
-// #package
 Stream_Vtable :: struct {
     seek         : (s: ^Stream, to: i32, whence: SeekFrom) -> Error            = null_proc;
     tell         : (s: ^Stream) -> (Error, u32)                                = null_proc;
index f36267eef5a43eedc08e905e2ced83844b8c7914..51d4d71978975d3d98ba05a6e243fa2ccca2e0f9 100644 (file)
@@ -9,7 +9,6 @@ Socket :: struct {
     handle: Handle;
     type: SocketType;
     family: SocketDomain;
-
 }
 
 // Inject methods for the socket
@@ -107,6 +106,7 @@ socket_create :: (domain: SocketDomain, type: SocketType) -> (Socket, SocketErro
     err := __net_create_socket(^s.handle, domain, type);
     if err == .None {
         s.vtable = ^__net_socket_vtable;
+        s.flags |= .Block_On_Read;
     }
 
     return s, err;
@@ -119,6 +119,11 @@ socket_close :: (s: ^Socket) {
 
 socket_setting :: (s: ^Socket, setting: SocketSetting, value: u32) {
     __net_setting(s.handle, setting, value);
+
+    if setting == .NonBlocking {
+        if value > 0 do s.flags = ~~ (cast(u32) s.flags & cast(u32) ~io.Stream_Flags.Block_On_Read);
+        else do         s.flags |= io.Stream_Flags.Block_On_Read;
+    }
 }
 
 socket_is_alive :: (s: ^Socket) -> bool {
@@ -185,7 +190,6 @@ socket_send :: (s: ^Socket, data: [] u8) -> i32 {
 
 socket_sendto :: (s: ^Socket, data: [] u8, addr: ^Socket_Address) -> i32 {
     sent := __net_sendto(s.handle, data, addr);
-    // if sent < 0 s.{ s.vtable = null; }
     return sent;
 }
 
@@ -234,11 +238,11 @@ socket_recvfrom :: (s: ^Socket, buffer: [] u8) -> (Socket_Address, i32) {
     return sa, received;
 }
 
-host_to_network :: #match {}
+host_to_network :: #match #local {}
 #match host_to_network (x: u16) => __net_host_to_net_s(x);
 #match host_to_network (x: u32) => __net_host_to_net_l(x);
 
-network_to_host :: #match {}
+network_to_host :: #match #local {}
 #match network_to_host (x: u16) => __net_net_to_host_s(x);
 #match network_to_host (x: u32) => __net_net_to_host_l(x);
 
@@ -250,7 +254,7 @@ network_to_host :: #match {}
         bytes_read := __net_recv(handle, buffer, ^would_block);
         if bytes_read < 0 && !would_block do s.vtable = null;
 
-        if would_block do return .ReadPending, bytes_read;
+        if would_block do return .ReadLater, bytes_read;
 
         return .None, bytes_read;
     },
index 3dd37d3bb120b93eca8bf2b15a6e255531c91aee..3a827aafb88be1763375cb5c9bd2cdc0f9f571b3 100644 (file)
@@ -58,7 +58,7 @@ __read_from_input :: (buffer: [] u8) -> i32 {
 
     __file_get_standard(0, ^fd);
     __stdin = .{
-        .{ ^fs.__file_stream_vtable },
+        .{ ^fs.__file_stream_vtable, .Block_On_Read },
         fd
     };