bugfixes in TCP server code
authorBrendan Hansen <brendan.f.hansen@gmail.com>
Sat, 3 Sep 2022 00:51:44 +0000 (19:51 -0500)
committerBrendan Hansen <brendan.f.hansen@gmail.com>
Sat, 3 Sep 2022 00:51:44 +0000 (19:51 -0500)
core/net/net.onyx
core/net/tcp.onyx
core/string.onyx
src/onyx_runtime.c
src/symres.c

index b5072aae2d60e6418fa6542445c1f5a0c987f878..9f0d9d578e201af4f8129b1836c8f37e754f41a1 100644 (file)
@@ -141,22 +141,21 @@ socket_accept :: (s: ^Socket) -> (Socket, Socket_Address) {
     return new_socket, new_addr;
 }
 
-socket_poll_all :: (sockets: [] ^Socket, timeout := -1, changed_buffer: [] i32 = .[]) -> [] i32 {
+Socket_Poll_Status :: enum {
+    No_Change :: 0;
+    Readable  :: 1;
+    Closed    :: 2;
+}
+
+socket_poll_all :: (sockets: [] ^Socket, timeout := -1, stat_buff: [] Socket_Poll_Status = .[]) {
+    if sockets.count > stat_buff.count do return;
+
     handles := (cast(^Socket.Handle) alloc.from_stack(sockets.count * sizeof Socket.Handle))[0 .. sockets.count];
     for i: sockets.count {
         handles[i] = sockets[i].handle;
     }
 
-    handles_changed := cast(^i32) alloc.from_stack(sockets.count * sizeof i32);
-    num_changed := __net_poll_recv(handles, timeout, handles_changed);
-
-    if changed_buffer.count == 0 do return .[];
-    assert(changed_buffer.count >= num_changed, "Not enough space to write back changed sockets."); 
-
-    changed_buffer.count = num_changed;
-    memory.copy(changed_buffer.data, handles_changed, num_changed * sizeof i32);
-
-    return changed_buffer;
+    __net_poll_recv(handles, timeout, stat_buff.data);
 }
 
 socket_send :: (s: ^Socket, data: [] u8) -> i32 {
@@ -272,7 +271,7 @@ network_to_host :: #match {}
     #package __net_sendto        :: (handle: Socket.Handle, data: [] u8, addr: ^Socket_Address)  -> i32 ---
     #package __net_recv          :: (handle: Socket.Handle, data: [] u8, async_would_block: ^bool) -> i32 ---
     #package __net_recvfrom      :: (handle: Socket.Handle, data: [] u8, out_recv_addr: ^Socket_Address, async_would_block: ^bool) -> i32 ---
-    #package __net_poll_recv     :: (handle: [] Socket.Handle, timeout: i32, out_recv_indicies: ^i32) -> i32 ---
+    #package __net_poll_recv     :: (handle: [] Socket.Handle, timeout: i32, out_statuses: ^Socket_Poll_Status) -> void ---
 
     #package __net_host_to_net_s :: (s: u16) -> u16 ---
     #package __net_host_to_net_l :: (s: u32) -> u32 ---
index b7da3ead8f2a0b95f96ffff3276ae00663059b33..f57da68daf9a3958190c351eec912ec1f538a804 100644 (file)
@@ -33,6 +33,7 @@ TCP_Event :: struct {
         Connection;
         Disconnection;
         Data;
+        Ready;
     }
 
     Connection :: struct {
@@ -56,6 +57,12 @@ TCP_Event :: struct {
 
         contents: [] u8;
     }
+
+    Ready :: struct {
+        address: ^Socket_Address;
+        // This is only set when the event is coming from the server.
+        client : ^TCP_Server.Client;
+    }
 }
 
 tcp_get_events :: (use conn: ^TCP_Connection) -> Iterator(TCP_Event) {
@@ -102,8 +109,11 @@ TCP_Server :: struct {
         address : Socket_Address;
         state   : State;
 
+        recv_ready_event_present := false;
+
         State :: enum {
             Alive;
+            Being_Killed;
             Dying;
             Dead;
         }
@@ -119,12 +129,15 @@ TCP_Server :: struct {
     alive         := true;
     pulse_time_ms := 500;
 
+    emit_data_events := true;
+
     get_events    :: tcp_get_events
     listen        :: tcp_server_listen
     pulse         :: tcp_server_pulse
     send          :: tcp_server_send
     broadcast     :: tcp_server_broadcast
     handle_events :: tcp_server_handle_events
+    kill_client   :: tcp_server_kill_client
 }
 
 tcp_server_make :: (max_clients := 32, allocator := context.allocator) -> ^TCP_Server {
@@ -145,6 +158,11 @@ tcp_server_make :: (max_clients := 32, allocator := context.allocator) -> ^TCP_S
 
 #local tcp_server_listener :: (use server: ^TCP_Server) {
     while server.alive {
+        if server.client_count == server.clients.count {
+            os.sleep(100);
+            continue;
+        }
+
         client_socket, client_addr := socket->accept();
 
         client := new(TCP_Server.Client, allocator=client_allocator);
@@ -176,11 +194,27 @@ tcp_server_pulse :: (use server: ^TCP_Server) -> bool {
     for^ clients {
         client := *it;
         if client == null do continue;
-        if client.state == .Dying {
-            // (*it).state = .Dead;
-            raw_free(server.client_allocator, client);
-            *it = null;
-            server.client_count -= 1;
+        switch client.state {
+            case .Being_Killed {
+                // Before, there was not a "being killed" state and the code made
+                // a lot more sense. This was because the socket was read from
+                // directly inside of this codebase. Now, you can opt into
+                // receiving "Ready" events, which allows you to handle the socket's
+                // data however you wish. In doing this, you have the ability
+                // to force kill the client's connection using server->kill_client().
+                // The problem with immediately placing the client in the Dying state
+                // is that this code is run first, which will remove the client. Then,
+                // the loop below that checks for dying clients will never see the
+                // dying client. To remedy this, "Being_Killed" was added as another
+                // shutdown phase. TLDR: This is a hack; refactor this.
+                client.state = .Dying;
+            }
+
+            case .Dying {
+                raw_free(server.client_allocator, client);
+                *it = null;
+                server.client_count -= 1;
+            }
         }
     }
 
@@ -197,29 +231,38 @@ tcp_server_pulse :: (use server: ^TCP_Server) -> bool {
     for clients_with_messages {
         if it.state != .Alive do continue;
 
-        msg_buffer: [1024] u8;
-        bytes_read := it.socket->recv_into(msg_buffer);
+        if server.emit_data_events {
+            msg_buffer: [1024] u8;
+            bytes_read := it.socket->recv_into(msg_buffer);
+
+            // If exactly 0 bytes are read from the buffer, it means that the
+            // client has shutdown and future communication should be terminated.
+            //
+            // If a negative number of bytes are read, then an error has occured
+            // and the client should also be marked as dead.
+            if bytes_read <= 0 {
+                tcp_server_kill_client(server, it);
+                continue;
+            }
 
-        // If exactly 0 bytes are read from the buffer, it means that the
-        // client has shutdown and future communication should be terminated.
-        //
-        // If a negative number of bytes are read, then an error has occured
-        // and the client should also be marked as dead.
-        if bytes_read <= 0 {
-            tcp_server_kill_client(server, it);
-            continue;
+            data_event := new(TCP_Event.Data, allocator=server.event_allocator);
+            data_event.client  = it;
+            data_event.address = ^it.address;
+            data_event.contents = memory.copy_slice(msg_buffer[0 .. bytes_read], allocator=server.event_allocator);
+            server.events << .{ .Data, data_event }; @Threading // See comment above.
+
+        } elseif !it.recv_ready_event_present {
+            it.recv_ready_event_present = true;
+            ready_event := new(TCP_Event.Ready, allocator=server.event_allocator);
+            ready_event.client  = it;
+            ready_event.address = ^it.address;
+            server.events << .{ .Ready, ready_event }; @Threading // See comment above.
         }
-
-        data_event := new(TCP_Event.Data, allocator=server.event_allocator);
-        data_event.client  = it;
-        data_event.address = ^it.address;
-        data_event.contents = memory.copy_slice(msg_buffer[0 .. bytes_read], allocator=server.event_allocator);
-        server.events << .{ .Data, data_event }; @Threading // See comment above.
     }
 
     for clients {
         if it == null do continue;
-        if it.state != .Alive {
+        if it.state == .Dying {
             disconnect_event := new(TCP_Event.Disconnection, allocator=server.event_allocator);
             disconnect_event.client  = it;
             disconnect_event.address = ^it.address;
@@ -261,6 +304,13 @@ tcp_server_handle_events :: macro (server: ^TCP_Server, handler: Code) {
     }
 }
 
+tcp_server_kill_client :: (use server: ^TCP_Server, client: ^TCP_Server.Client) {
+    client.state = .Being_Killed;
+    client.socket->close();
+    client.socket.vtable = null;
+}
+
+
 
 //
 // TCP Client
@@ -275,33 +325,33 @@ TCP_Client :: struct {
 
 
 
-#local {
-    tcp_server_kill_client :: (use server: ^TCP_Server, client: ^TCP_Server.Client) {
-        client.state = .Dying;
-        client.socket.vtable = null;
-    }
-
-    wait_to_get_client_messages :: (use server: ^TCP_Server) -> [] ^TCP_Server.Client {
-        active_clients := alloc.array_from_stack(^TCP_Server.Client, client_count);
-        active_clients.count = 0;
+#local
+wait_to_get_client_messages :: (use server: ^TCP_Server) -> [] ^TCP_Server.Client {
+    active_clients := alloc.array_from_stack(^TCP_Server.Client, client_count);
+    active_clients.count = 0;
 
-        for clients {
-            if it == null do continue;
+    for clients {
+        if it == null do continue;
 
-            if it.state == .Alive {
-                active_clients[active_clients.count] = it;
-                active_clients.count += 1;
-            }
+        if it.state == .Alive {
+            active_clients[active_clients.count] = it;
+            active_clients.count += 1;
         }
+    }
 
-        changed_buffer := alloc.array_from_stack(i32, client_count);
-        changed := socket_poll_all(cast([] ^Socket) active_clients, pulse_time_ms, changed_buffer);
+    status_buffer := alloc.array_from_stack(core.net.Socket_Poll_Status, client_count);
+    socket_poll_all(cast([] ^Socket) active_clients, pulse_time_ms, status_buffer);
 
-        recv_clients: [..] ^TCP_Server.Client;
-        for changed {
+    recv_clients: [..] ^TCP_Server.Client;
+    for it: client_count {
+        if status_buffer[it] == .Readable {
             recv_clients << active_clients[it];
         }
 
-        return recv_clients;
+        if status_buffer[it] == .Closed {
+            tcp_server_kill_client(server, active_clients[it]);
+        }
     }
+
+    return recv_clients;
 }
index c4db484d4e8c95e1f7b56d4ecf214c3723692fd5..f3beb1d2e8500210979dea72a1f210c081f0e0b3 100644 (file)
@@ -398,7 +398,7 @@ read_until :: (s: ^str, upto: str, skip := 0) -> str {
         s.count  = 0;
 
     } else {
-        out.count = i - 1;
+        out.count = i;
         s.data  += out.count + (upto.count - 1);
         s.count -= out.count + (upto.count - 1);
     }
index 893e698956b89a13fbbeb0b90d7d0e37c8527c6c..c1fac328b51274fb57e957acb5c2c135f1108d8b 100644 (file)
@@ -1255,30 +1255,35 @@ ONYX_DEF(__net_recvfrom, (WASM_I32, WASM_I32, WASM_I32, WASM_I32, WASM_I32), (WA
     return NULL;
 }
 
-ONYX_DEF(__net_poll_recv, (WASM_I32, WASM_I32, WASM_I32, WASM_I32), (WASM_I32)) {
+ONYX_DEF(__net_poll_recv, (WASM_I32, WASM_I32, WASM_I32, WASM_I32), ()) {
     #ifdef _BH_LINUX
     int i, res, cursor;
     struct pollfd* fds;
 
-    fds = alloca(params->data[1].of.i32 * 8); // Guessed size of pollfd
+    fds = alloca(params->data[1].of.i32 * sizeof(struct pollfd)); // Guessed size of pollfd
 
     for (i=0; i<params->data[1].of.i32; i++) {
         fds[i].fd = *(i32 *) ONYX_PTR(params->data[0].of.i32 + 4 * i);
-        fds[i].events = POLLIN;
+        fds[i].events = -1;
         fds[i].revents = 0;
     }
 
     res = poll(fds, params->data[1].of.i32, params->data[2].of.i32);
 
-    cursor = 0;
     for (i=0; i<params->data[1].of.i32; i++) {
+        *(i32 *) ONYX_PTR(params->data[3].of.i32 + 4 * i) = 0; // NO_CHANGE
+
         if (fds[i].revents & POLLIN) {
-            *(i32 *) ONYX_PTR(params->data[3].of.i32 + 4 * cursor) = i;
-            cursor++;
+            *(i32 *) ONYX_PTR(params->data[3].of.i32 + 4 * i) = 1; // READABLE
+        }
+
+        if ((fds[i].revents & POLLHUP)
+            || (fds[i].revents & POLLNVAL)
+            || (fds[i].revents & POLLERR)) {
+            *(i32 *) ONYX_PTR(params->data[3].of.i32 + 4 * i) = 2; // CLOSED
         }
     }
 
-    results->data[0] = WASM_I32_VAL(cursor);
     #endif
 
     return NULL;
index 16206daded9ef79e20ecb03faaa26234aebe3b7d..ea2c400b79898beffd4b88ec853b6f06e6483865 100644 (file)
@@ -1038,8 +1038,6 @@ SymresStatus symres_function_header(AstFunction* func) {
     if (func->scope == NULL)
         func->scope = scope_create(context.ast_alloc, curr_scope, func->token->pos);
 
-    scope_enter(func->scope);
-
     if (func->constraints.constraints != NULL && func->constraints.constraints_met == 0) {
         bh_arr_each(AstConstraint *, constraint, func->constraints.constraints) {
             SYMRES(constraint, *constraint);
@@ -1050,6 +1048,8 @@ SymresStatus symres_function_header(AstFunction* func) {
         return Symres_Success;
     }
 
+    scope_enter(func->scope);
+
     bh_arr_each(AstParam, param, func->params) {
         if (param->default_value != NULL) {
             SYMRES(expression, &param->default_value);