From c1c3d28898b39df68e7832834e5a35366ed1ce12 Mon Sep 17 00:00:00 2001 From: Brendan Hansen Date: Wed, 1 Feb 2023 11:12:53 -0600 Subject: [PATCH] split 'ReadPending' into 'ReadPending' and 'ReadLater' --- core/container/iter.onyx | 8 ++----- core/io/io.onyx | 5 +++- core/io/reader.onyx | 49 ++++++++++++++++++-------------------- core/io/stream.onyx | 10 +++++++- core/net/net.onyx | 14 +++++++---- core/runtime/onyx_run.onyx | 2 +- 6 files changed, 48 insertions(+), 40 deletions(-) diff --git a/core/container/iter.onyx b/core/container/iter.onyx index 93d63bd4..9a8875a5 100644 --- a/core/container/iter.onyx +++ b/core/container/iter.onyx @@ -491,11 +491,8 @@ concat :: (iters: ..Iterator($T)) -> Iterator(T) { // But I don't know what the semantics should be for specifying which // if any iterators to close. for^ iters { - it.close(it.data); + if it.close != null_proc do it.close(it.data); } - - // memory.free_slice(^iters); - // cfree(c); } return .{ @@ -552,7 +549,6 @@ enumerate :: (it: Iterator($T), start_index: i32 = 0) -> Iterator(Enumeration_Va close :: (use data: ^Enumeration_Context($T)) { if iterator.close != null_proc do iterator.close(iterator.data); - // cfree(data); } return .{ @@ -955,7 +951,7 @@ generator_no_copy :: (ctx: ^$Ctx, gen: (^Ctx) -> ($T, bool), close: (^Ctx) -> vo thread_function(t_data, body); for^ threads do thread.join(it); - dist.close(dist.data); + if dist.close != null_proc do dist.close(dist.data); } thread_function :: (__data: ^$T, $body: Code) { diff --git a/core/io/io.onyx b/core/io/io.onyx index 361852aa..bc92c60b 100644 --- a/core/io/io.onyx +++ b/core/io/io.onyx @@ -30,6 +30,9 @@ Error :: enum { // Not possible to unread. InvalidUnread :: 0x09; - // When reading from a stream, no data was read. + // When reading from a stream, no data was read, but data will become available soon. ReadPending :: 0x0a; + + // When reading from a stream, no data was read, and data will not become available soon. + ReadLater :: 0x0b; } diff --git a/core/io/reader.onyx b/core/io/reader.onyx index 6ea84814..d39f3a3a 100644 --- a/core/io/reader.onyx +++ b/core/io/reader.onyx @@ -1,33 +1,26 @@ package core.io +// Reader is a buffered reader over a Stream. +// If you do not need a buffered reader, simply +// use the stream_read functions to read from the +// stream. Because Reader is buffered, there are +// several things it can do that a non-buffered +// reader could not, like 'read the next line', +// or 'read until a period'. + use core {memory, math, array, iter} Reader :: struct { - stream : ^Stream; + stream: ^Stream; buffer: [] u8; buffer_allocator: Allocator; start, end: u32; // The start and ending positions of the edges of the buffer "window". - last_byte: i32; error: Error; - - done : bool; // If an .EOF was reached. - - // If a .ReadPending was reached, still work with the data - // that is available. Only set this if you know that the data - // will always be available. - greedy : bool; - - // A very special flag that should probably only be used on - // standard input (hence the name). This flag says that if there - // is ANY data left in the buffer, do not try to fill the buffer - // with a call to stream_read. This will produce incorrect results - // for things like read_line, but will prevent hanging when using - // a blocking read source like stdin. - stdin: bool; + done: bool; // If an .EOF was reached. } #inject Reader { @@ -53,13 +46,12 @@ Reader :: struct { lines :: lines; } -reader_make :: (s: ^Stream, buffer_size := 4096, allocator := context.allocator, greedy := false) -> Reader { +reader_make :: (s: ^Stream, buffer_size := 4096, allocator := context.allocator) -> Reader { assert(s.vtable != null, "Stream vtable was not setup correctly."); reader: Reader; reader.stream = s; reader.error = .None; - reader.greedy = greedy; memory.alloc_slice(^reader.buffer, buffer_size, allocator); reader.buffer_allocator = allocator; @@ -174,8 +166,10 @@ read_bytes :: (use reader: ^Reader, bytes: [] u8) -> (i32, Error) { write_index := 0; while n > 0 && !reader_empty(reader) { - if reader_read_next_chunk(reader) == .ReadPending { - return write_index, .ReadPending; + if err := reader_read_next_chunk(reader); + err == .ReadPending || err == .ReadLater + { + return write_index, err; } to_write := math.min(n, end); @@ -562,8 +556,8 @@ skip_bytes :: (use reader: ^Reader, bytes: u32) -> (skipped: i32, err: Error) { lines :: (r: ^Reader, inplace := false) => iter.generator(^.{ - r = r, inplace = inplace - + r = r, + inplace = inplace }, (ctx: $C) -> (str, bool) { line := ctx.r->read_line(consume_newline=true, inplace=ctx.inplace); @@ -604,7 +598,11 @@ lines :: (r: ^Reader, inplace := false) => return .BufferFull; } - if end > 0 && stdin { + // If this stream will block on a read, use the data that is present + // in the buffer currently. In theory, it would be better to have a + // 'stream_poll' that tests if the next read would block, but that + // can happen later. + if end > 0 && stream.flags & .Block_On_Read { return .None; } @@ -615,8 +613,7 @@ lines :: (r: ^Reader, inplace := false) => // to be read, so this should try again later. This has to return None // if the end is equal to start because that means that the buffer is // completely empty. - if err == .ReadPending { - if greedy do err = .NoProgress; + if err == .ReadPending || err == .ReadLater { error = err; return err if end == 0 else .None; } diff --git a/core/io/stream.onyx b/core/io/stream.onyx index 0a58a7a8..5cbe23d6 100644 --- a/core/io/stream.onyx +++ b/core/io/stream.onyx @@ -4,9 +4,17 @@ use core Stream :: struct { use vtable : ^Stream_Vtable; + flags: Stream_Flags; +} + +// These flags are used by things like io.Reader as hints +// for how the stream will behave under certain circumstances. +Stream_Flags :: enum #flags { + // This flags signals that any call to stream_read *could* + // block for arbitrary amounts of time. + Block_On_Read; } -// #package Stream_Vtable :: struct { seek : (s: ^Stream, to: i32, whence: SeekFrom) -> Error = null_proc; tell : (s: ^Stream) -> (Error, u32) = null_proc; diff --git a/core/net/net.onyx b/core/net/net.onyx index f36267ee..51d4d719 100644 --- a/core/net/net.onyx +++ b/core/net/net.onyx @@ -9,7 +9,6 @@ Socket :: struct { handle: Handle; type: SocketType; family: SocketDomain; - } // Inject methods for the socket @@ -107,6 +106,7 @@ socket_create :: (domain: SocketDomain, type: SocketType) -> (Socket, SocketErro err := __net_create_socket(^s.handle, domain, type); if err == .None { s.vtable = ^__net_socket_vtable; + s.flags |= .Block_On_Read; } return s, err; @@ -119,6 +119,11 @@ socket_close :: (s: ^Socket) { socket_setting :: (s: ^Socket, setting: SocketSetting, value: u32) { __net_setting(s.handle, setting, value); + + if setting == .NonBlocking { + if value > 0 do s.flags = ~~ (cast(u32) s.flags & cast(u32) ~io.Stream_Flags.Block_On_Read); + else do s.flags |= io.Stream_Flags.Block_On_Read; + } } socket_is_alive :: (s: ^Socket) -> bool { @@ -185,7 +190,6 @@ socket_send :: (s: ^Socket, data: [] u8) -> i32 { socket_sendto :: (s: ^Socket, data: [] u8, addr: ^Socket_Address) -> i32 { sent := __net_sendto(s.handle, data, addr); - // if sent < 0 s.{ s.vtable = null; } return sent; } @@ -234,11 +238,11 @@ socket_recvfrom :: (s: ^Socket, buffer: [] u8) -> (Socket_Address, i32) { return sa, received; } -host_to_network :: #match {} +host_to_network :: #match #local {} #match host_to_network (x: u16) => __net_host_to_net_s(x); #match host_to_network (x: u32) => __net_host_to_net_l(x); -network_to_host :: #match {} +network_to_host :: #match #local {} #match network_to_host (x: u16) => __net_net_to_host_s(x); #match network_to_host (x: u32) => __net_net_to_host_l(x); @@ -250,7 +254,7 @@ network_to_host :: #match {} bytes_read := __net_recv(handle, buffer, ^would_block); if bytes_read < 0 && !would_block do s.vtable = null; - if would_block do return .ReadPending, bytes_read; + if would_block do return .ReadLater, bytes_read; return .None, bytes_read; }, diff --git a/core/runtime/onyx_run.onyx b/core/runtime/onyx_run.onyx index 3dd37d3b..3a827aaf 100644 --- a/core/runtime/onyx_run.onyx +++ b/core/runtime/onyx_run.onyx @@ -58,7 +58,7 @@ __read_from_input :: (buffer: [] u8) -> i32 { __file_get_standard(0, ^fd); __stdin = .{ - .{ ^fs.__file_stream_vtable }, + .{ ^fs.__file_stream_vtable, .Block_On_Read }, fd }; -- 2.25.1