From: Brendan Hansen Date: Sat, 3 Sep 2022 00:51:44 +0000 (-0500) Subject: bugfixes in TCP server code X-Git-Url: https://git.brendanfh.com/?a=commitdiff_plain;h=8e947b104272e1b7c32c3a95a5c21db95bea7194;p=onyx.git bugfixes in TCP server code --- diff --git a/core/net/net.onyx b/core/net/net.onyx index b5072aae..9f0d9d57 100644 --- a/core/net/net.onyx +++ b/core/net/net.onyx @@ -141,22 +141,21 @@ socket_accept :: (s: ^Socket) -> (Socket, Socket_Address) { return new_socket, new_addr; } -socket_poll_all :: (sockets: [] ^Socket, timeout := -1, changed_buffer: [] i32 = .[]) -> [] i32 { +Socket_Poll_Status :: enum { + No_Change :: 0; + Readable :: 1; + Closed :: 2; +} + +socket_poll_all :: (sockets: [] ^Socket, timeout := -1, stat_buff: [] Socket_Poll_Status = .[]) { + if sockets.count > stat_buff.count do return; + handles := (cast(^Socket.Handle) alloc.from_stack(sockets.count * sizeof Socket.Handle))[0 .. sockets.count]; for i: sockets.count { handles[i] = sockets[i].handle; } - handles_changed := cast(^i32) alloc.from_stack(sockets.count * sizeof i32); - num_changed := __net_poll_recv(handles, timeout, handles_changed); - - if changed_buffer.count == 0 do return .[]; - assert(changed_buffer.count >= num_changed, "Not enough space to write back changed sockets."); - - changed_buffer.count = num_changed; - memory.copy(changed_buffer.data, handles_changed, num_changed * sizeof i32); - - return changed_buffer; + __net_poll_recv(handles, timeout, stat_buff.data); } socket_send :: (s: ^Socket, data: [] u8) -> i32 { @@ -272,7 +271,7 @@ network_to_host :: #match {} #package __net_sendto :: (handle: Socket.Handle, data: [] u8, addr: ^Socket_Address) -> i32 --- #package __net_recv :: (handle: Socket.Handle, data: [] u8, async_would_block: ^bool) -> i32 --- #package __net_recvfrom :: (handle: Socket.Handle, data: [] u8, out_recv_addr: ^Socket_Address, async_would_block: ^bool) -> i32 --- - #package __net_poll_recv :: (handle: [] Socket.Handle, timeout: i32, out_recv_indicies: ^i32) -> i32 --- + #package __net_poll_recv :: (handle: [] Socket.Handle, timeout: i32, out_statuses: ^Socket_Poll_Status) -> void --- #package __net_host_to_net_s :: (s: u16) -> u16 --- #package __net_host_to_net_l :: (s: u32) -> u32 --- diff --git a/core/net/tcp.onyx b/core/net/tcp.onyx index b7da3ead..f57da68d 100644 --- a/core/net/tcp.onyx +++ b/core/net/tcp.onyx @@ -33,6 +33,7 @@ TCP_Event :: struct { Connection; Disconnection; Data; + Ready; } Connection :: struct { @@ -56,6 +57,12 @@ TCP_Event :: struct { contents: [] u8; } + + Ready :: struct { + address: ^Socket_Address; + // This is only set when the event is coming from the server. + client : ^TCP_Server.Client; + } } tcp_get_events :: (use conn: ^TCP_Connection) -> Iterator(TCP_Event) { @@ -102,8 +109,11 @@ TCP_Server :: struct { address : Socket_Address; state : State; + recv_ready_event_present := false; + State :: enum { Alive; + Being_Killed; Dying; Dead; } @@ -119,12 +129,15 @@ TCP_Server :: struct { alive := true; pulse_time_ms := 500; + emit_data_events := true; + get_events :: tcp_get_events listen :: tcp_server_listen pulse :: tcp_server_pulse send :: tcp_server_send broadcast :: tcp_server_broadcast handle_events :: tcp_server_handle_events + kill_client :: tcp_server_kill_client } tcp_server_make :: (max_clients := 32, allocator := context.allocator) -> ^TCP_Server { @@ -145,6 +158,11 @@ tcp_server_make :: (max_clients := 32, allocator := context.allocator) -> ^TCP_S #local tcp_server_listener :: (use server: ^TCP_Server) { while server.alive { + if server.client_count == server.clients.count { + os.sleep(100); + continue; + } + client_socket, client_addr := socket->accept(); client := new(TCP_Server.Client, allocator=client_allocator); @@ -176,11 +194,27 @@ tcp_server_pulse :: (use server: ^TCP_Server) -> bool { for^ clients { client := *it; if client == null do continue; - if client.state == .Dying { - // (*it).state = .Dead; - raw_free(server.client_allocator, client); - *it = null; - server.client_count -= 1; + switch client.state { + case .Being_Killed { + // Before, there was not a "being killed" state and the code made + // a lot more sense. This was because the socket was read from + // directly inside of this codebase. Now, you can opt into + // receiving "Ready" events, which allows you to handle the socket's + // data however you wish. In doing this, you have the ability + // to force kill the client's connection using server->kill_client(). + // The problem with immediately placing the client in the Dying state + // is that this code is run first, which will remove the client. Then, + // the loop below that checks for dying clients will never see the + // dying client. To remedy this, "Being_Killed" was added as another + // shutdown phase. TLDR: This is a hack; refactor this. + client.state = .Dying; + } + + case .Dying { + raw_free(server.client_allocator, client); + *it = null; + server.client_count -= 1; + } } } @@ -197,29 +231,38 @@ tcp_server_pulse :: (use server: ^TCP_Server) -> bool { for clients_with_messages { if it.state != .Alive do continue; - msg_buffer: [1024] u8; - bytes_read := it.socket->recv_into(msg_buffer); + if server.emit_data_events { + msg_buffer: [1024] u8; + bytes_read := it.socket->recv_into(msg_buffer); + + // If exactly 0 bytes are read from the buffer, it means that the + // client has shutdown and future communication should be terminated. + // + // If a negative number of bytes are read, then an error has occured + // and the client should also be marked as dead. + if bytes_read <= 0 { + tcp_server_kill_client(server, it); + continue; + } - // If exactly 0 bytes are read from the buffer, it means that the - // client has shutdown and future communication should be terminated. - // - // If a negative number of bytes are read, then an error has occured - // and the client should also be marked as dead. - if bytes_read <= 0 { - tcp_server_kill_client(server, it); - continue; + data_event := new(TCP_Event.Data, allocator=server.event_allocator); + data_event.client = it; + data_event.address = ^it.address; + data_event.contents = memory.copy_slice(msg_buffer[0 .. bytes_read], allocator=server.event_allocator); + server.events << .{ .Data, data_event }; @Threading // See comment above. + + } elseif !it.recv_ready_event_present { + it.recv_ready_event_present = true; + ready_event := new(TCP_Event.Ready, allocator=server.event_allocator); + ready_event.client = it; + ready_event.address = ^it.address; + server.events << .{ .Ready, ready_event }; @Threading // See comment above. } - - data_event := new(TCP_Event.Data, allocator=server.event_allocator); - data_event.client = it; - data_event.address = ^it.address; - data_event.contents = memory.copy_slice(msg_buffer[0 .. bytes_read], allocator=server.event_allocator); - server.events << .{ .Data, data_event }; @Threading // See comment above. } for clients { if it == null do continue; - if it.state != .Alive { + if it.state == .Dying { disconnect_event := new(TCP_Event.Disconnection, allocator=server.event_allocator); disconnect_event.client = it; disconnect_event.address = ^it.address; @@ -261,6 +304,13 @@ tcp_server_handle_events :: macro (server: ^TCP_Server, handler: Code) { } } +tcp_server_kill_client :: (use server: ^TCP_Server, client: ^TCP_Server.Client) { + client.state = .Being_Killed; + client.socket->close(); + client.socket.vtable = null; +} + + // // TCP Client @@ -275,33 +325,33 @@ TCP_Client :: struct { -#local { - tcp_server_kill_client :: (use server: ^TCP_Server, client: ^TCP_Server.Client) { - client.state = .Dying; - client.socket.vtable = null; - } - - wait_to_get_client_messages :: (use server: ^TCP_Server) -> [] ^TCP_Server.Client { - active_clients := alloc.array_from_stack(^TCP_Server.Client, client_count); - active_clients.count = 0; +#local +wait_to_get_client_messages :: (use server: ^TCP_Server) -> [] ^TCP_Server.Client { + active_clients := alloc.array_from_stack(^TCP_Server.Client, client_count); + active_clients.count = 0; - for clients { - if it == null do continue; + for clients { + if it == null do continue; - if it.state == .Alive { - active_clients[active_clients.count] = it; - active_clients.count += 1; - } + if it.state == .Alive { + active_clients[active_clients.count] = it; + active_clients.count += 1; } + } - changed_buffer := alloc.array_from_stack(i32, client_count); - changed := socket_poll_all(cast([] ^Socket) active_clients, pulse_time_ms, changed_buffer); + status_buffer := alloc.array_from_stack(core.net.Socket_Poll_Status, client_count); + socket_poll_all(cast([] ^Socket) active_clients, pulse_time_ms, status_buffer); - recv_clients: [..] ^TCP_Server.Client; - for changed { + recv_clients: [..] ^TCP_Server.Client; + for it: client_count { + if status_buffer[it] == .Readable { recv_clients << active_clients[it]; } - return recv_clients; + if status_buffer[it] == .Closed { + tcp_server_kill_client(server, active_clients[it]); + } } + + return recv_clients; } diff --git a/core/string.onyx b/core/string.onyx index c4db484d..f3beb1d2 100644 --- a/core/string.onyx +++ b/core/string.onyx @@ -398,7 +398,7 @@ read_until :: (s: ^str, upto: str, skip := 0) -> str { s.count = 0; } else { - out.count = i - 1; + out.count = i; s.data += out.count + (upto.count - 1); s.count -= out.count + (upto.count - 1); } diff --git a/src/onyx_runtime.c b/src/onyx_runtime.c index 893e6989..c1fac328 100644 --- a/src/onyx_runtime.c +++ b/src/onyx_runtime.c @@ -1255,30 +1255,35 @@ ONYX_DEF(__net_recvfrom, (WASM_I32, WASM_I32, WASM_I32, WASM_I32, WASM_I32), (WA return NULL; } -ONYX_DEF(__net_poll_recv, (WASM_I32, WASM_I32, WASM_I32, WASM_I32), (WASM_I32)) { +ONYX_DEF(__net_poll_recv, (WASM_I32, WASM_I32, WASM_I32, WASM_I32), ()) { #ifdef _BH_LINUX int i, res, cursor; struct pollfd* fds; - fds = alloca(params->data[1].of.i32 * 8); // Guessed size of pollfd + fds = alloca(params->data[1].of.i32 * sizeof(struct pollfd)); // Guessed size of pollfd for (i=0; idata[1].of.i32; i++) { fds[i].fd = *(i32 *) ONYX_PTR(params->data[0].of.i32 + 4 * i); - fds[i].events = POLLIN; + fds[i].events = -1; fds[i].revents = 0; } res = poll(fds, params->data[1].of.i32, params->data[2].of.i32); - cursor = 0; for (i=0; idata[1].of.i32; i++) { + *(i32 *) ONYX_PTR(params->data[3].of.i32 + 4 * i) = 0; // NO_CHANGE + if (fds[i].revents & POLLIN) { - *(i32 *) ONYX_PTR(params->data[3].of.i32 + 4 * cursor) = i; - cursor++; + *(i32 *) ONYX_PTR(params->data[3].of.i32 + 4 * i) = 1; // READABLE + } + + if ((fds[i].revents & POLLHUP) + || (fds[i].revents & POLLNVAL) + || (fds[i].revents & POLLERR)) { + *(i32 *) ONYX_PTR(params->data[3].of.i32 + 4 * i) = 2; // CLOSED } } - results->data[0] = WASM_I32_VAL(cursor); #endif return NULL; diff --git a/src/symres.c b/src/symres.c index 16206dad..ea2c400b 100644 --- a/src/symres.c +++ b/src/symres.c @@ -1038,8 +1038,6 @@ SymresStatus symres_function_header(AstFunction* func) { if (func->scope == NULL) func->scope = scope_create(context.ast_alloc, curr_scope, func->token->pos); - scope_enter(func->scope); - if (func->constraints.constraints != NULL && func->constraints.constraints_met == 0) { bh_arr_each(AstConstraint *, constraint, func->constraints.constraints) { SYMRES(constraint, *constraint); @@ -1050,6 +1048,8 @@ SymresStatus symres_function_header(AstFunction* func) { return Symres_Success; } + scope_enter(func->scope); + bh_arr_each(AstParam, param, func->params) { if (param->default_value != NULL) { SYMRES(expression, ¶m->default_value);