From 38ea0563f440d2a9db21837eda5175bec71b632b Mon Sep 17 00:00:00 2001 From: Brendan Hansen Date: Tue, 18 Jan 2022 22:09:50 -0600 Subject: [PATCH] basics of generic TCP server --- core/net/tcp.onyx | 256 ++++++++++++++++++++++++++++++++++++++++++++++ core/std.onyx | 2 + 2 files changed, 258 insertions(+) create mode 100644 core/net/tcp.onyx diff --git a/core/net/tcp.onyx b/core/net/tcp.onyx new file mode 100644 index 00000000..eeacbdf0 --- /dev/null +++ b/core/net/tcp.onyx @@ -0,0 +1,256 @@ +package core.net + +#local { + runtime :: package runtime + sync :: package core.sync + thread :: package core.thread + array :: package core.array + memory :: package core.memory + alloc :: package core.alloc + + use package core.intrinsics.onyx { __zero_value } +} + +#if !runtime.Multi_Threading_Enabled { + #error "Expected multi-threading to be enabled for TCP server."; +} + +TCP_Connection :: struct { + socket: Socket; + + event_allocator: Allocator; + events: [..] TCP_Event; + event_cursor := 0; +} + +TCP_Server :: struct { + use connection: TCP_Connection; + + Client :: struct { + socket : Socket; + address : Socket_Address; + state : State; + + State :: enum { + Alive; + Dying; + Dead; + } + } + client_allocator: Allocator; + clients: [] ^Client; + + // max clients is stored as clients.count. + client_count: u32; + + listener_thread: thread.Thread; + + get_events :: tcp_get_events + listen :: tcp_server_listen + alive :: tcp_server_pulse +} + +TCP_Client :: struct { + use connection: TCP_Connection; + + get_events :: tcp_get_events +} + +TCP_Event :: struct { + kind: Kind; + data: rawptr; + + Kind :: enum { + Undefined; + Connection; + Disconnection; + Message; + } + + Connection :: struct { + address: ^Socket_Address; + } + + Disconnection :: struct { + address: ^Socket_Address; + } + + Message :: struct { + address: ^Socket_Address; + contents: [] u8; + } +} + +tcp_get_events :: (use conn: ^TCP_Connection) -> Iterator(TCP_Event) { + next :: (use conn: ^TCP_Connection) -> (TCP_Event, bool) { + if event_cursor == events.count do return __zero_value(TCP_Event), false; + + defer event_cursor += 1; + return events[event_cursor], true; + } + + close :: (use conn: ^TCP_Connection) { + for events { + switch it.kind { + case .Message { + raw_free(event_allocator, (cast(^TCP_Event.Message) it.data).contents.data); + } + + case .Disconnection { + // This is a weird place to free the client + } + } + + raw_free(event_allocator, it.data); + } + + array.clear(^events); + } + + conn.event_cursor = 0; + + return .{ + data = conn, + next = next, + close = close, + }; +} + +tcp_server_make :: (max_clients := 32, allocator := context.allocator) -> ^TCP_Server { + socket, err := socket_create(.Inet, .Stream); // IPv6? + if err != .None do return null; + + server := make(TCP_Server, allocator=allocator); + server.socket = socket; + server.event_allocator = allocator; + + server.client_count = 0; + server.client_allocator = allocator; + memory.alloc_slice(^server.clients, max_clients, allocator=allocator); + memory.fill_slice(server.clients, null); + + return server; +} + +#local tcp_server_listener :: (use server: ^TCP_Server) { + @ServerAlive + while true { + client_socket, client_addr := socket->accept(); + + client := new(TCP_Server.Client, allocator=client_allocator); + client.state = .Alive; + client.socket = client_socket; + client.address = client_addr; + + for^ clients do if *it == null { *it = client; break; } + server.client_count += 1; + + conn_event := new(TCP_Event.Connection, allocator=server.event_allocator); + conn_event.address = ^client.address; + server.events << .{ .Connection, conn_event }; + } +} + +tcp_server_listen :: (use server: ^TCP_Server, port: u16) -> bool { + if !socket->bind(port) do return false; + + socket->listen(); + thread.spawn(^listener_thread, server, tcp_server_listener); +} + +tcp_server_pulse :: (use server: ^TCP_Server) -> bool { + for^ clients { + client := *it; + if client == null do continue; + if client.state == .Dying { + // (*it).state = .Dead; + raw_free(server.client_allocator, client); + *it = null; + server.client_count -= 1; + } + } + + clients_with_messages := wait_to_get_client_messages(server); + defer if clients_with_messages.data != null do cfree(clients_with_messages.data); + + for clients_with_messages { + if it.state != .Alive do continue; + + msg_buffer: [1024] u8; + bytes_read := it.socket->recv_into(msg_buffer); + + // If exactly 0 bytes are read from the buffer, it means that the + // client has shutdown and future communication should be terminated. + // + // If a negative number of bytes are read, then an error has occured + // and the client should also be marked as dead. + if bytes_read <= 0 { + tcp_server_kill_client(server, it); + continue; + } + + msg_event := new(TCP_Event.Message, allocator=server.event_allocator); + msg_event.address = ^it.address; + msg_event.contents = memory.copy_slice(msg_buffer[0 .. bytes_read], allocator=server.event_allocator); + server.events << .{ .Message, msg_event }; + } + + for clients { + if it.state != .Alive { + disconnect_event := new(TCP_Event.Disconnection, allocator=server.event_allocator); + disconnect_event.address = ^it.address; + server.events << .{ .Disconnection, disconnect_event }; + } + } + + array.sort(clients, (a, b) => { + a_val := 1 if a == null else 0; + b_val := 1 if b == null else 0; + + if a_val != b_val do return b_val - a_val; + + return cast(i32) a.state - cast(i32) b.state; + }); + + client_count = array.count_where(clients, x => x != null); + + return true; +} + +#local { + tcp_server_kill_client :: (use server: ^TCP_Server, client: ^TCP_Server.Client) { + client.state = .Dying; + client.socket.vtable = null; + } + + wait_to_get_client_messages :: (use server: ^TCP_Server) -> [] ^TCP_Server.Client { + // This mapping was pulled from another code piece. It is entirely + // possible that this mapping no longer needs to happen. + + active_clients: [..] i32; + for i: clients.count { + if clients[i] == null do continue; + + if clients[i].state == .Alive { + active_clients << i; + } + } + defer if active_clients.data != null do array.free(^active_clients); + + poll_sockets: [..] ^Socket; + for active_clients { + poll_sockets << ^clients[it].socket; + } + defer if poll_sockets.data != null do array.free(^poll_sockets); + + changed_buffer := cast(^i32) alloc.from_stack(client_count * sizeof i32); + changed := socket_poll_all(poll_sockets, 500, changed_buffer[0 .. client_count]); + + recv_clients: [..] ^TCP_Server.Client; + for changed { + recv_clients << clients[active_clients[it]]; + } + + return recv_clients; + } +} diff --git a/core/std.onyx b/core/std.onyx index bfd4775f..4c672e32 100644 --- a/core/std.onyx +++ b/core/std.onyx @@ -50,7 +50,9 @@ package core #load "./runtime/onyx_run" #load "./os/process" #load "./os/onyx_fs" + #load "./net/net" + #load "./net/tcp" } #if runtime.runtime == .Wasi { #load "./wasi/wasi" -- 2.25.1