basics of generic TCP server
authorBrendan Hansen <brendan.f.hansen@gmail.com>
Wed, 19 Jan 2022 04:09:50 +0000 (22:09 -0600)
committerBrendan Hansen <brendan.f.hansen@gmail.com>
Wed, 19 Jan 2022 04:09:50 +0000 (22:09 -0600)
core/net/tcp.onyx [new file with mode: 0644]
core/std.onyx

diff --git a/core/net/tcp.onyx b/core/net/tcp.onyx
new file mode 100644 (file)
index 0000000..eeacbdf
--- /dev/null
@@ -0,0 +1,256 @@
+package core.net
+
+#local {
+    runtime :: package runtime
+    sync    :: package core.sync
+    thread  :: package core.thread
+    array   :: package core.array
+    memory  :: package core.memory
+    alloc   :: package core.alloc
+
+    use package core.intrinsics.onyx { __zero_value }
+}
+
+#if !runtime.Multi_Threading_Enabled {
+    #error "Expected multi-threading to be enabled for TCP server.";
+}
+
+TCP_Connection :: struct {
+    socket: Socket;
+
+    event_allocator: Allocator;
+    events: [..] TCP_Event;
+    event_cursor := 0;
+}
+
+TCP_Server :: struct {
+    use connection: TCP_Connection;
+
+    Client :: struct {
+        socket  : Socket;
+        address : Socket_Address;
+        state   : State;
+
+        State :: enum {
+            Alive;
+            Dying;
+            Dead;
+        }
+    }
+    client_allocator: Allocator;
+    clients: [] ^Client;
+
+    // max clients is stored as clients.count.
+    client_count: u32;
+
+    listener_thread: thread.Thread;
+    
+    get_events :: tcp_get_events
+    listen     :: tcp_server_listen
+    alive      :: tcp_server_pulse
+}
+
+TCP_Client :: struct {
+    use connection: TCP_Connection;
+
+    get_events :: tcp_get_events
+}
+
+TCP_Event :: struct {
+    kind: Kind;
+    data: rawptr;
+
+    Kind :: enum {
+        Undefined;
+        Connection;
+        Disconnection;
+        Message;
+    }
+
+    Connection :: struct {
+        address: ^Socket_Address;
+    }
+
+    Disconnection :: struct {
+        address: ^Socket_Address;
+    }
+
+    Message :: struct {
+        address: ^Socket_Address;
+        contents: [] u8;
+    }
+}
+
+tcp_get_events :: (use conn: ^TCP_Connection) -> Iterator(TCP_Event) {
+    next :: (use conn: ^TCP_Connection) -> (TCP_Event, bool) {
+        if event_cursor == events.count do return __zero_value(TCP_Event), false;
+
+        defer event_cursor += 1;
+        return events[event_cursor], true;
+    }
+
+    close :: (use conn: ^TCP_Connection) {
+        for events {
+            switch it.kind {
+                case .Message {
+                    raw_free(event_allocator, (cast(^TCP_Event.Message) it.data).contents.data);
+                }
+
+                case .Disconnection {
+                    // This is a weird place to free the client
+                }
+            }
+
+            raw_free(event_allocator, it.data);
+        }
+
+        array.clear(^events);
+    }
+
+    conn.event_cursor = 0;
+
+    return .{
+        data  = conn,
+        next  = next,
+        close = close,
+    };
+}
+
+tcp_server_make :: (max_clients := 32, allocator := context.allocator) -> ^TCP_Server {
+    socket, err := socket_create(.Inet, .Stream); // IPv6?
+    if err != .None do return null;
+
+    server := make(TCP_Server, allocator=allocator);
+    server.socket = socket;
+    server.event_allocator = allocator;
+
+    server.client_count = 0;
+    server.client_allocator = allocator;
+    memory.alloc_slice(^server.clients, max_clients, allocator=allocator);
+    memory.fill_slice(server.clients, null);
+
+    return server;
+}
+
+#local tcp_server_listener :: (use server: ^TCP_Server) {
+    @ServerAlive
+    while true {
+        client_socket, client_addr := socket->accept();
+
+        client := new(TCP_Server.Client, allocator=client_allocator);
+        client.state = .Alive;
+        client.socket = client_socket;
+        client.address = client_addr;
+
+        for^ clients do if *it == null { *it = client; break; }
+        server.client_count += 1;
+
+        conn_event := new(TCP_Event.Connection, allocator=server.event_allocator);
+        conn_event.address = ^client.address;
+        server.events << .{ .Connection, conn_event };
+    }
+}
+
+tcp_server_listen :: (use server: ^TCP_Server, port: u16) -> bool {
+    if !socket->bind(port) do return false;
+
+    socket->listen();
+    thread.spawn(^listener_thread, server, tcp_server_listener);
+}
+
+tcp_server_pulse :: (use server: ^TCP_Server) -> bool {
+    for^ clients {
+        client := *it;
+        if client == null do continue;
+        if client.state == .Dying {
+            // (*it).state = .Dead;
+            raw_free(server.client_allocator, client);
+            *it = null;
+            server.client_count -= 1;
+        }
+    }
+
+    clients_with_messages := wait_to_get_client_messages(server);
+    defer if clients_with_messages.data != null do cfree(clients_with_messages.data);
+
+    for clients_with_messages {
+        if it.state != .Alive do continue;
+
+        msg_buffer: [1024] u8;
+        bytes_read := it.socket->recv_into(msg_buffer);
+
+        // If exactly 0 bytes are read from the buffer, it means that the 
+        // client has shutdown and future communication should be terminated.
+        //
+        // If a negative number of bytes are read, then an error has occured
+        // and the client should also be marked as dead.
+        if bytes_read <= 0 {
+            tcp_server_kill_client(server, it);
+            continue;
+        }
+
+        msg_event := new(TCP_Event.Message, allocator=server.event_allocator);
+        msg_event.address = ^it.address;
+        msg_event.contents = memory.copy_slice(msg_buffer[0 .. bytes_read], allocator=server.event_allocator);
+        server.events << .{ .Message, msg_event };
+    }
+
+    for clients {
+        if it.state != .Alive {
+            disconnect_event := new(TCP_Event.Disconnection, allocator=server.event_allocator);
+            disconnect_event.address = ^it.address;
+            server.events << .{ .Disconnection, disconnect_event };
+        }
+    }
+
+    array.sort(clients, (a, b) => {
+        a_val := 1 if a == null else 0; 
+        b_val := 1 if b == null else 0; 
+
+        if a_val != b_val do return b_val - a_val;
+        
+        return cast(i32) a.state - cast(i32) b.state;
+    });
+
+    client_count = array.count_where(clients, x => x != null);
+
+    return true;
+}
+
+#local {
+    tcp_server_kill_client :: (use server: ^TCP_Server, client: ^TCP_Server.Client) {
+        client.state = .Dying;
+        client.socket.vtable = null;
+    }
+
+    wait_to_get_client_messages :: (use server: ^TCP_Server) -> [] ^TCP_Server.Client {
+        // This mapping was pulled from another code piece. It is entirely
+        // possible that this mapping no longer needs to happen.
+
+        active_clients: [..] i32;
+        for i: clients.count {
+            if clients[i] == null do continue;
+
+            if clients[i].state == .Alive {
+                active_clients << i;
+            }
+        }
+        defer if active_clients.data != null do array.free(^active_clients);
+
+        poll_sockets: [..] ^Socket;
+        for active_clients {
+            poll_sockets << ^clients[it].socket;
+        }
+        defer if poll_sockets.data != null do array.free(^poll_sockets);
+        
+        changed_buffer := cast(^i32) alloc.from_stack(client_count * sizeof i32);
+        changed := socket_poll_all(poll_sockets, 500, changed_buffer[0 .. client_count]);
+
+        recv_clients: [..] ^TCP_Server.Client;
+        for changed {
+            recv_clients << clients[active_clients[it]];
+        }
+
+        return recv_clients;
+    }
+}
index bfd4775f6bf443627780b29ecb4b9e1d9863cb94..4c672e32f170108eae5c865939a9f3872a1392a3 100644 (file)
@@ -50,7 +50,9 @@ package core
     #load "./runtime/onyx_run"
     #load "./os/process"
     #load "./os/onyx_fs"
+
     #load "./net/net"
+    #load "./net/tcp"
 }
 #if runtime.runtime == .Wasi   {
     #load "./wasi/wasi"