added disconnection and resending commands
authorBrendan Hansen <brendan.f.hansen@gmail.com>
Fri, 1 Apr 2022 21:42:53 +0000 (16:42 -0500)
committerBrendan Hansen <brendan.f.hansen@gmail.com>
Fri, 1 Apr 2022 21:42:53 +0000 (16:42 -0500)
src/host.onyx
src/module.onyx
src/peer.onyx
udp_client.onyx
udp_server.onyx

index 11d78c1c0ed559382c894fda4bb79e476eff2860..cbcdb80c52d3fce42664e9b08198ecde0ed17da3 100644 (file)
@@ -65,6 +65,7 @@ host_create :: (addr: ^net.Socket_Address, peer_count: u32) -> (^Host, Host_Crea
     host.mtu = Host_Default_MTU;
     host.ping_interval = 3000;
     host.no_ack_disconnect_timeout = 10000;
+    host.current_time = ~~ (os.time() & cast(u64) 0xFFFFFFFF);
 
     for^ peer: host.peers {
         peer.host = host;
@@ -73,6 +74,7 @@ host_create :: (addr: ^net.Socket_Address, peer_count: u32) -> (^Host, Host_Crea
         peer.incoming_id = ~~((cast(u32) peer - cast(u32) host.peers.data) / sizeof Peer);
         peer.outgoing_id = ~~((cast(u32) peer - cast(u32) host.peers.data) / sizeof Peer);
         peer.mtu = host.mtu;
+        peer.no_ack_resend_timeout = 50;
     }
 
     return host, .None;
@@ -95,13 +97,9 @@ host_connect :: (host: ^Host, addr: ^net.Socket_Address, channel_count: u32) ->
     peer.addr = *addr;
     peer.connect_id = random.int();
     peer.mtu = Host_Default_MTU;
+    peer.last_acknowledge_time = host.current_time;
 
-    for^ channel: peer.channels {
-        channel.id = ~~((cast(u32) channel - cast(u32) peer.channels.data) / 4);
-        channel.seq_number = 0;
-        channel.max_reliable_windows = 16;
-        memory.alloc_slice(^channel.reliable_windows, channel.max_reliable_windows);
-    }
+    for^ peer.channels do peer_setup_channel(peer, it);
     
     command := new(Protocol_Connect);
     command.command = .Connect;
@@ -135,8 +133,8 @@ host_pulse :: (host: ^Host, timeout: u32 = 0) -> bool {
     host.produced_event = false;
     host.current_time = ~~ (os.time() & cast(u64) 0xFFFFFFFF);
 
-    sent_command     := host_send_commands(host);
-    received_command := host_receive_commands(host, timeout);
+    host_send_commands(host);
+    host_receive_commands(host, timeout);
 
     return host.produced_event;
 }
@@ -160,26 +158,28 @@ host_send_commands :: (host: ^Host) -> bool {
 
         time_diff := math.abs(cast(i64) peer.last_acknowledge_time - cast(i64) host.current_time);
 
-        if peer.state == .Connected {
-            // Peer pinging
-            if time_diff >= ~~host.ping_interval {
-                peer_send_ping(peer);
-            }
+        // Peer pinging
+        if time_diff >= ~~host.ping_interval {
+            peer_send_ping(peer);
+        }
 
-            // Detect a disconnected peer
-            if time_diff >= ~~host.no_ack_disconnect_timeout {
-                peer_send_disconnect(peer);
-                peer_flush_outgoing_commands(peer);
-                peer_disconnect(peer);
+        // Detect a disconnected peer
+        if time_diff >= ~~host.no_ack_disconnect_timeout {
+            peer_send_disconnect(peer);
+            peer_flush_outgoing_commands(peer);
+            peer_disconnect(peer);
 
-                host_notify_disconnect(host, peer);
-                continue;
-            }
+            host_notify_disconnect(host, peer);
+            continue;
         }
 
         // Packet loss calcuations
+
+
         // Dropped packet detection
+        peer_check_outgoing_commands(peer);
 
+        // Sending commands
         peer_flush_outgoing_commands(peer);
     }
 }
@@ -255,20 +255,19 @@ host_handle_incoming_commands :: (host: ^Host) -> bool {
             }
 
             case .Disconnect {
+                if peer == null do return_block();
+                host_handle_disconnect_command(host, peer, ~~ command);
             }
 
             case .Ping {
                 // This just requires an acknowledgement
             }
 
-            case .Send_Reliable {
+            case .Send_Reliable, .Send_Unreliable {
                 if peer == null do return_block();
                 host_handle_send_reliable_command(host, peer, ~~ command, ^current_data);
             }
 
-            case .Send_Unreliable {
-            }
-
             case .Send_Unsequenced {
             }
         }
@@ -421,7 +420,7 @@ host_get_events :: (host: ^Host, timeout: u32 = 0) -> Iterator(^Event) {
 
     channel_count := net.network_to_host(command.channel_count);
     memory.alloc_slice(^peer.channels, channel_count);
-    memory.set(peer.channels.data, 0, channel_count * sizeof Channel);
+    for ^peer.channels do peer_setup_channel(peer, it);
 
     peer.state = .Acknowledging_Connection;
     peer.connect_id = net.network_to_host(command.connect_id);
@@ -466,6 +465,11 @@ host_get_events :: (host: ^Host, timeout: u32 = 0) -> Iterator(^Event) {
     return;
 }
 
+#local host_handle_disconnect_command :: (host: ^Host, peer: ^Peer, command: ^Protocol_Disconnect) {
+    peer.state = .Disconnected;
+    host_notify_disconnect(host, peer);
+}
+
 //
 // This is slightly misnamed, as it is actually handling a received reliable message.
 #local host_handle_send_reliable_command :: (host: ^Host, peer: ^Peer, command: ^Protocol_Send, data: ^[] u8) {
@@ -477,9 +481,20 @@ host_get_events :: (host: ^Host, timeout: u32 = 0) -> Iterator(^Event) {
     if data_length > data.count {
         return;
     }
+    defer string.advance(data, data_length);
+
+    if command_get_effective(command.command) == .Send_Reliable {
+        channel := ^peer.channels[command.channel];
+        for channel.reliable_windows {
+            if command.seq_number == it do return;
+        }
+
+        channel.reliable_windows[channel.reliable_windows_cursor] = command.seq_number;
+        channel.reliable_windows_cursor += 1;
+        channel.reliable_windows_cursor %= channel.reliable_windows.count;
+    }
 
     host_notify_message(host, peer, command.channel, *data);
-    string.advance(data, data_length);
 }
 
 
index 6db6c8a8c1f3ad5c05e4cf914abaff1257f2004f..4bee62e28ff84256e0282d1de96e1e1344994ab2 100644 (file)
@@ -9,6 +9,8 @@ package onyx_net
     string :: package core.string
     random :: package core.random
     os     :: package core.os
+    math   :: package core.math
+    iter   :: package core.iter
 }
 
 #load_all "./."
index e432ffad658ed9990c1ad6d6b0f54a8c229b7a9f..f5a0139679a13b0b9f3108c06d6b5b3c6facef91 100644 (file)
@@ -43,6 +43,8 @@ Peer :: struct {
     // Unix time of last message that was acknowledge to be received.
     last_acknowledge_time: u32;
 
+    no_ack_resend_timeout: u32;
+
     outgoing_reliable_seq_number: u16;
     incoming_reliable_seq_number: u16;
     outgoing_unsequenced_group:   u32;
@@ -67,6 +69,14 @@ peer_disconnect :: (peer: ^Peer) {
     peer_destroy(peer);
 }
 
+peer_setup_channel :: (peer: ^Peer, channel: ^Channel) {
+    channel.id = ~~((cast(u32) channel - cast(u32) peer.channels.data) / 4);
+    channel.seq_number = 0;
+    channel.reliable_windows_cursor = 0;
+    memory.alloc_slice(^channel.reliable_windows, 32);
+    for ^channel.reliable_windows do *it = 65535;
+}
+
 peer_queue_outgoing_command :: #match {
     (peer: ^Peer, command: ^Protocol_Command_Header, packet: ^Packet = null) -> ^Outgoing_Command {
         out := new(Outgoing_Command);
@@ -146,6 +156,16 @@ peer_queue_acknowledgement :: (peer: ^Peer, command: ^Protocol_Command_Header, s
     peer.acknowledgements << .{ sent_time, command };
 }
 
+peer_check_outgoing_commands :: (peer: ^Peer) {
+    for iter.as_iterator(^peer.sent_reliable_commands) {
+        time_diff := math.abs(cast(i64) it.sent_time - cast(i64) peer.host.current_time);
+        if time_diff >= ~~ peer.no_ack_resend_timeout {
+            peer.outgoing_commands << it;
+            #remove;
+        }
+    }
+}
+
 peer_flush_outgoing_commands :: (peer: ^Peer) -> i32 {
     send_buffer: [65535] u8;
 
@@ -217,14 +237,12 @@ peer_send_disconnect :: (peer: ^Peer) {
 
 peer_remove_sent_reliable_command :: (peer: ^Peer, seq_num: u16, channel: Channel_ID) -> Command {
     command: ^Outgoing_Command;
-    index := 0;
-    for peer.sent_reliable_commands {
+    for iter.as_iterator(^peer.sent_reliable_commands) {
         if it.reliable_seq_number == seq_num && it.command.channel == channel {
             command = it;
+            #remove;
             break;
         }
-
-        index += 1;
     }
 
     if command == null do return .None;
@@ -232,7 +250,6 @@ peer_remove_sent_reliable_command :: (peer: ^Peer, seq_num: u16, channel: Channe
     defer {
         cfree(command.command);
         cfree(command);
-        array.fast_delete(^peer.sent_reliable_commands, index);
     }
     return command_get_effective(command.command.command);
 }
@@ -245,12 +262,11 @@ Channel :: struct {
     seq_number: u16;
 
     reliable_windows: [] u16;
-    max_reliable_windows: u32;
+    reliable_windows_cursor: u32;
 
     outgoing_unreliable_seq_number: u16;
     outgoing_reliable_seq_number: u16;
     incoming_reliable_seq_number: u16;
-
 }
 
 
index a31f049116ade4225520744b9df6d6011fbc4ebf..cb7764d986c3c43117b3d692ac2890caf623bc0d 100644 (file)
@@ -26,8 +26,40 @@ main :: (args) => {
     addr.port = 8080;
     peer := onyx_net.host_connect(host, ^addr, 2);
 
+    input_thread: thread.Thread;
+    td_type :: struct { peer: ^onyx_net.Peer; } 
+    td := td_type.{ peer };
+
+    thread.spawn(^input_thread, ^td, (data) => {
+        peer := data.peer;
+        input_reader := io.reader_make(^stdin);
+
+        while true {
+            line := io.read_line(^input_reader) |> string.strip_whitespace();
+            
+            switch line {
+                case "disconnect" {
+                    println("Disconnecting");
+                    onyx_net.peer_send_disconnect(peer);
+                    onyx_net.peer_flush_outgoing_commands(peer);
+                    onyx_net.peer_disconnect(peer);
+                    break;
+                }
+
+                case "send" {
+                    packet := new(onyx_net.Packet);
+                    packet.flags |= .Reliable;
+                    packet.data = "What's Up?";
+                    println("Sending what's up...");
+
+                    onyx_net.peer_send(peer, 0, packet);
+                }
+            }
+        }
+    });
+
     while true {
-        for host->get_events(timeout=500) {
+        for host->get_events(timeout=100) {
             printf("{*}\n", it);
 
             if it.type == .Connection {
@@ -48,13 +80,21 @@ main :: (args) => {
             }
         }
 
-        if random.between(0, 3) == 0 {
-            packet := new(onyx_net.Packet);
-            packet.flags |= .Reliable;
-            packet.data = "What's Up?";
-            println("Sending what's up...");
+        // if random.between(0, 3) == 0 {
+        //     packet := new(onyx_net.Packet);
+        //     packet.flags |= .Reliable;
+        //     packet.data = "What's Up?";
+        //     println("Sending what's up...");
 
-            onyx_net.peer_send(peer, 0, packet);
-        }
+        //     onyx_net.peer_send(peer, 0, packet);
+        // }
+
+        // if random.between(0, 100) == 4 {
+        //     println("Disconnecting");
+        //     onyx_net.peer_send_disconnect(peer);
+        //     onyx_net.peer_flush_outgoing_commands(peer);
+        //     onyx_net.peer_disconnect(peer);
+        //     break;
+        // }
     }
 }
index e14fc6707e66ad05edd118ec2971dcd1f2a18725..cf076c90d2921475899477eb904b0dcd27387e03 100644 (file)
@@ -29,7 +29,7 @@ main :: (args) => {
             }
 
             if it.type == .Disconnection {
-                printf("Disconnection: {}   {}\n", it.peer.addr, it.peer);
+                printf("Disconnection: {}\n", it.peer.addr);
             }
 
             if it.type == .Message {