return new_socket, new_addr;
}
-socket_poll_all :: (sockets: [] ^Socket, timeout := -1, changed_buffer: [] i32 = .[]) -> [] i32 {
+Socket_Poll_Status :: enum {
+ No_Change :: 0;
+ Readable :: 1;
+ Closed :: 2;
+}
+
+socket_poll_all :: (sockets: [] ^Socket, timeout := -1, stat_buff: [] Socket_Poll_Status = .[]) {
+ if sockets.count > stat_buff.count do return;
+
handles := (cast(^Socket.Handle) alloc.from_stack(sockets.count * sizeof Socket.Handle))[0 .. sockets.count];
for i: sockets.count {
handles[i] = sockets[i].handle;
}
- handles_changed := cast(^i32) alloc.from_stack(sockets.count * sizeof i32);
- num_changed := __net_poll_recv(handles, timeout, handles_changed);
-
- if changed_buffer.count == 0 do return .[];
- assert(changed_buffer.count >= num_changed, "Not enough space to write back changed sockets.");
-
- changed_buffer.count = num_changed;
- memory.copy(changed_buffer.data, handles_changed, num_changed * sizeof i32);
-
- return changed_buffer;
+ __net_poll_recv(handles, timeout, stat_buff.data);
}
socket_send :: (s: ^Socket, data: [] u8) -> i32 {
#package __net_sendto :: (handle: Socket.Handle, data: [] u8, addr: ^Socket_Address) -> i32 ---
#package __net_recv :: (handle: Socket.Handle, data: [] u8, async_would_block: ^bool) -> i32 ---
#package __net_recvfrom :: (handle: Socket.Handle, data: [] u8, out_recv_addr: ^Socket_Address, async_would_block: ^bool) -> i32 ---
- #package __net_poll_recv :: (handle: [] Socket.Handle, timeout: i32, out_recv_indicies: ^i32) -> i32 ---
+ #package __net_poll_recv :: (handle: [] Socket.Handle, timeout: i32, out_statuses: ^Socket_Poll_Status) -> void ---
#package __net_host_to_net_s :: (s: u16) -> u16 ---
#package __net_host_to_net_l :: (s: u32) -> u32 ---
Connection;
Disconnection;
Data;
+ Ready;
}
Connection :: struct {
contents: [] u8;
}
+
+ Ready :: struct {
+ address: ^Socket_Address;
+ // This is only set when the event is coming from the server.
+ client : ^TCP_Server.Client;
+ }
}
tcp_get_events :: (use conn: ^TCP_Connection) -> Iterator(TCP_Event) {
address : Socket_Address;
state : State;
+ recv_ready_event_present := false;
+
State :: enum {
Alive;
+ Being_Killed;
Dying;
Dead;
}
alive := true;
pulse_time_ms := 500;
+ emit_data_events := true;
+
get_events :: tcp_get_events
listen :: tcp_server_listen
pulse :: tcp_server_pulse
send :: tcp_server_send
broadcast :: tcp_server_broadcast
handle_events :: tcp_server_handle_events
+ kill_client :: tcp_server_kill_client
}
tcp_server_make :: (max_clients := 32, allocator := context.allocator) -> ^TCP_Server {
#local tcp_server_listener :: (use server: ^TCP_Server) {
while server.alive {
+ if server.client_count == server.clients.count {
+ os.sleep(100);
+ continue;
+ }
+
client_socket, client_addr := socket->accept();
client := new(TCP_Server.Client, allocator=client_allocator);
for^ clients {
client := *it;
if client == null do continue;
- if client.state == .Dying {
- // (*it).state = .Dead;
- raw_free(server.client_allocator, client);
- *it = null;
- server.client_count -= 1;
+ switch client.state {
+ case .Being_Killed {
+ // Before, there was not a "being killed" state and the code made
+ // a lot more sense. This was because the socket was read from
+ // directly inside of this codebase. Now, you can opt into
+ // receiving "Ready" events, which allows you to handle the socket's
+ // data however you wish. In doing this, you have the ability
+ // to force kill the client's connection using server->kill_client().
+ // The problem with immediately placing the client in the Dying state
+ // is that this code is run first, which will remove the client. Then,
+ // the loop below that checks for dying clients will never see the
+ // dying client. To remedy this, "Being_Killed" was added as another
+ // shutdown phase. TLDR: This is a hack; refactor this.
+ client.state = .Dying;
+ }
+
+ case .Dying {
+ raw_free(server.client_allocator, client);
+ *it = null;
+ server.client_count -= 1;
+ }
}
}
for clients_with_messages {
if it.state != .Alive do continue;
- msg_buffer: [1024] u8;
- bytes_read := it.socket->recv_into(msg_buffer);
+ if server.emit_data_events {
+ msg_buffer: [1024] u8;
+ bytes_read := it.socket->recv_into(msg_buffer);
+
+ // If exactly 0 bytes are read from the buffer, it means that the
+ // client has shutdown and future communication should be terminated.
+ //
+ // If a negative number of bytes are read, then an error has occured
+ // and the client should also be marked as dead.
+ if bytes_read <= 0 {
+ tcp_server_kill_client(server, it);
+ continue;
+ }
- // If exactly 0 bytes are read from the buffer, it means that the
- // client has shutdown and future communication should be terminated.
- //
- // If a negative number of bytes are read, then an error has occured
- // and the client should also be marked as dead.
- if bytes_read <= 0 {
- tcp_server_kill_client(server, it);
- continue;
+ data_event := new(TCP_Event.Data, allocator=server.event_allocator);
+ data_event.client = it;
+ data_event.address = ^it.address;
+ data_event.contents = memory.copy_slice(msg_buffer[0 .. bytes_read], allocator=server.event_allocator);
+ server.events << .{ .Data, data_event }; @Threading // See comment above.
+
+ } elseif !it.recv_ready_event_present {
+ it.recv_ready_event_present = true;
+ ready_event := new(TCP_Event.Ready, allocator=server.event_allocator);
+ ready_event.client = it;
+ ready_event.address = ^it.address;
+ server.events << .{ .Ready, ready_event }; @Threading // See comment above.
}
-
- data_event := new(TCP_Event.Data, allocator=server.event_allocator);
- data_event.client = it;
- data_event.address = ^it.address;
- data_event.contents = memory.copy_slice(msg_buffer[0 .. bytes_read], allocator=server.event_allocator);
- server.events << .{ .Data, data_event }; @Threading // See comment above.
}
for clients {
if it == null do continue;
- if it.state != .Alive {
+ if it.state == .Dying {
disconnect_event := new(TCP_Event.Disconnection, allocator=server.event_allocator);
disconnect_event.client = it;
disconnect_event.address = ^it.address;
}
}
+tcp_server_kill_client :: (use server: ^TCP_Server, client: ^TCP_Server.Client) {
+ client.state = .Being_Killed;
+ client.socket->close();
+ client.socket.vtable = null;
+}
+
+
//
// TCP Client
-#local {
- tcp_server_kill_client :: (use server: ^TCP_Server, client: ^TCP_Server.Client) {
- client.state = .Dying;
- client.socket.vtable = null;
- }
-
- wait_to_get_client_messages :: (use server: ^TCP_Server) -> [] ^TCP_Server.Client {
- active_clients := alloc.array_from_stack(^TCP_Server.Client, client_count);
- active_clients.count = 0;
+#local
+wait_to_get_client_messages :: (use server: ^TCP_Server) -> [] ^TCP_Server.Client {
+ active_clients := alloc.array_from_stack(^TCP_Server.Client, client_count);
+ active_clients.count = 0;
- for clients {
- if it == null do continue;
+ for clients {
+ if it == null do continue;
- if it.state == .Alive {
- active_clients[active_clients.count] = it;
- active_clients.count += 1;
- }
+ if it.state == .Alive {
+ active_clients[active_clients.count] = it;
+ active_clients.count += 1;
}
+ }
- changed_buffer := alloc.array_from_stack(i32, client_count);
- changed := socket_poll_all(cast([] ^Socket) active_clients, pulse_time_ms, changed_buffer);
+ status_buffer := alloc.array_from_stack(core.net.Socket_Poll_Status, client_count);
+ socket_poll_all(cast([] ^Socket) active_clients, pulse_time_ms, status_buffer);
- recv_clients: [..] ^TCP_Server.Client;
- for changed {
+ recv_clients: [..] ^TCP_Server.Client;
+ for it: client_count {
+ if status_buffer[it] == .Readable {
recv_clients << active_clients[it];
}
- return recv_clients;
+ if status_buffer[it] == .Closed {
+ tcp_server_kill_client(server, active_clients[it]);
+ }
}
+
+ return recv_clients;
}
return NULL;
}
-ONYX_DEF(__net_poll_recv, (WASM_I32, WASM_I32, WASM_I32, WASM_I32), (WASM_I32)) {
+ONYX_DEF(__net_poll_recv, (WASM_I32, WASM_I32, WASM_I32, WASM_I32), ()) {
#ifdef _BH_LINUX
int i, res, cursor;
struct pollfd* fds;
- fds = alloca(params->data[1].of.i32 * 8); // Guessed size of pollfd
+ fds = alloca(params->data[1].of.i32 * sizeof(struct pollfd)); // Guessed size of pollfd
for (i=0; i<params->data[1].of.i32; i++) {
fds[i].fd = *(i32 *) ONYX_PTR(params->data[0].of.i32 + 4 * i);
- fds[i].events = POLLIN;
+ fds[i].events = -1;
fds[i].revents = 0;
}
res = poll(fds, params->data[1].of.i32, params->data[2].of.i32);
- cursor = 0;
for (i=0; i<params->data[1].of.i32; i++) {
+ *(i32 *) ONYX_PTR(params->data[3].of.i32 + 4 * i) = 0; // NO_CHANGE
+
if (fds[i].revents & POLLIN) {
- *(i32 *) ONYX_PTR(params->data[3].of.i32 + 4 * cursor) = i;
- cursor++;
+ *(i32 *) ONYX_PTR(params->data[3].of.i32 + 4 * i) = 1; // READABLE
+ }
+
+ if ((fds[i].revents & POLLHUP)
+ || (fds[i].revents & POLLNVAL)
+ || (fds[i].revents & POLLERR)) {
+ *(i32 *) ONYX_PTR(params->data[3].of.i32 + 4 * i) = 2; // CLOSED
}
}
- results->data[0] = WASM_I32_VAL(cursor);
#endif
return NULL;