added fragmented reliable data
authorBrendan Hansen <brendan.f.hansen@gmail.com>
Sat, 2 Apr 2022 01:30:49 +0000 (20:30 -0500)
committerBrendan Hansen <brendan.f.hansen@gmail.com>
Sat, 2 Apr 2022 01:30:49 +0000 (20:30 -0500)
doc.md
src/host.onyx
src/packet.onyx
src/peer.onyx
src/protocol.onyx
udp_client.onyx
udp_server.onyx

diff --git a/doc.md b/doc.md
index b5760463c691957bcfd042132e5dfebac790d255..3aa18ff119fbdccb03e466b472e2462c504a5b14 100644 (file)
--- a/doc.md
+++ b/doc.md
@@ -26,3 +26,9 @@ Each packet is at most 1400 bytes in size.
 | peerID | sentTime | checksum | channelID | commandID | data 
            OPTIONAL   OPTIONAL
 
+
+Currently, these features are not supported:
+    [ ] Packets are not necessarily sequenced. A dropped packet followed
+        by a successful packet with be out of order.
+    
+    [x] Framemented reliable data.
\ No newline at end of file
index cbcdb80c52d3fce42664e9b08198ecde0ed17da3..538da4a561aec80fb4ff70e856f33e701e238f61 100644 (file)
@@ -58,7 +58,7 @@ host_create :: (addr: ^net.Socket_Address, peer_count: u32) -> (^Host, Host_Crea
         }
     }
 
-    host.socket->setting(.NonBlocking, 1);    
+    host.socket->setting(.NonBlocking, 1);
     host.socket->setting(.Broadcast, 1);
 
     host.addr = *addr;
@@ -82,7 +82,7 @@ host_create :: (addr: ^net.Socket_Address, peer_count: u32) -> (^Host, Host_Crea
 
 host_connect :: (host: ^Host, addr: ^net.Socket_Address, channel_count: u32) -> ^Peer {
     peer: ^Peer;
-    
+
     // Find first peer that is in the disconnected state.
     for^ host.peers do if it.state == .Disconnected {
         peer = it;
@@ -100,7 +100,7 @@ host_connect :: (host: ^Host, addr: ^net.Socket_Address, channel_count: u32) ->
     peer.last_acknowledge_time = host.current_time;
 
     for^ peer.channels do peer_setup_channel(peer, it);
-    
+
     command := new(Protocol_Connect);
     command.command = .Connect;
     command.command |= .Flag_Acknowledge;
@@ -205,7 +205,7 @@ host_receive_commands :: (host: ^Host, timeout: u32 = 0) -> bool {
 host_handle_incoming_commands :: (host: ^Host) -> bool {
     header: ^Protocol_Header = ~~ host.received_data.data;
     current_data := host.received_data;
-    
+
     peer_id := cast(u32) net.network_to_host(header.peer_id);
     peer: ^Peer;
 
@@ -229,7 +229,7 @@ host_handle_incoming_commands :: (host: ^Host) -> bool {
 
     while current_data.count >= sizeof Protocol_Command_Header {
         if host.produced_event do break;
-        
+
         command := cast(^Protocol_Command_Header) current_data.data;
         command_id := command_get_effective(command.command);
         command.seq_number = net.network_to_host(command.seq_number);
@@ -268,7 +268,9 @@ host_handle_incoming_commands :: (host: ^Host) -> bool {
                 host_handle_send_reliable_command(host, peer, ~~ command, ^current_data);
             }
 
-            case .Send_Unsequenced {
+            case .Send_Fragment {
+                if peer == null do return_block();
+                host_handle_send_fragment_command(host, peer, ~~ command, ^current_data);
             }
         }
 
@@ -289,7 +291,6 @@ host_handle_incoming_commands :: (host: ^Host) -> bool {
         }
     }
 
-    // host.event.peer = peer;
     return host.produced_event;
 }
 
@@ -310,7 +311,7 @@ host_get_events :: (host: ^Host, timeout: u32 = 0) -> Iterator(^Event) {
     ctx := new(Context);
     ctx.host = host;
     ctx.timeout = timeout;
-    return .{ ctx, next, null_proc };
+    return .{ ctx, next, cfree };
 }
 
 #local host_notify_connect :: (host: ^Host, peer: ^Peer) {
@@ -497,6 +498,58 @@ host_get_events :: (host: ^Host, timeout: u32 = 0) -> Iterator(^Event) {
     host_notify_message(host, peer, command.channel, *data);
 }
 
+#local host_handle_send_fragment_command :: (host: ^Host, peer: ^Peer, command: ^Protocol_Send_Fragment, data: ^[] u8) {
+    if ~~command.channel > peer.channels.count || (peer.state != .Connected && peer.state != .Disconnect_Later) {
+        return;
+    }
+
+    data_length := cast(u32) net.network_to_host(command.data_length);
+    if data_length > data.count {
+        return;
+    }
+    defer string.advance(data, data_length);
+
+    @CopyPaste
+    channel := ^peer.channels[command.channel];
+    for channel.reliable_windows {
+        if command.seq_number == it do return;
+    }
+
+    channel.reliable_windows[channel.reliable_windows_cursor] = command.seq_number;
+    channel.reliable_windows_cursor += 1;
+    channel.reliable_windows_cursor %= channel.reliable_windows.count;
+
+    fragment_id := net.network_to_host(command.fragment_id);
+    fragment: ^Fragmented_Data = null;
+    for^ channel.pending_fragments {
+        if it.fragment_id == fragment_id {
+            fragment = it;
+            break;
+        }
+    }
+
+    if fragment == null {
+        fragment = array.alloc_one(^channel.pending_fragments);
+        fragment.fragment_id = fragment_id;
+        fragment.fragments_remaining = net.network_to_host(command.fragment_count);
+        memory.alloc_slice(^fragment.data, net.network_to_host(command.total_length));
+    }
+
+    memory.copy(fragment.data.data + net.network_to_host(command.fragment_offset), data.data, data_length);
+    fragment.fragments_remaining -= 1;
+
+    if fragment.fragments_remaining == 0 {
+        host_notify_message(host, peer, command.channel, fragment.data);
+
+        for iter.as_iterator(^channel.pending_fragments, by_pointer=true) {
+            if it.fragment_id == fragment_id {
+                #remove;
+                break;
+            }
+        }
+    }
+}
+
 
 @Relocate
 Event :: struct {
index a170e3a6b60d483f4798ed1bfb7f8fdb42004e1f..ace41865aeb2f3fdfbbe933937bb58cc1c658da9 100644 (file)
@@ -59,7 +59,17 @@ outgoing_command_pack_into_buffer :: (out: ^Outgoing_Command, peer: ^Peer, buf:
     write((cast(^u8) out.command)[0 .. command_size]);
 
     if out.packet != null {
-        write(out.packet.data);
+        if command_get_effective(out.command.command) == .Send_Fragment {
+            //
+            // Converting to network byte order just to switch back here is a little silly...
+            fragment := cast(^Protocol_Send_Fragment) out.command;
+            start := cast(u32) net.network_to_host(fragment.fragment_offset);
+            end   := start + ~~net.network_to_host(fragment.data_length);
+
+            write(out.packet.data[start .. end]);
+        } else {
+            write(out.packet.data);
+        }
     }
 
     return buf[0 .. write_offset], true;
index f5a0139679a13b0b9f3108c06d6b5b3c6facef91..80a273240e00a543636640573261bc7e10206067 100644 (file)
@@ -71,8 +71,10 @@ peer_disconnect :: (peer: ^Peer) {
 
 peer_setup_channel :: (peer: ^Peer, channel: ^Channel) {
     channel.id = ~~((cast(u32) channel - cast(u32) peer.channels.data) / 4);
-    channel.seq_number = 0;
+    channel.outgoing_reliable_seq_number = 0;
     channel.reliable_windows_cursor = 0;
+    channel.next_fragment_id = 0;
+
     memory.alloc_slice(^channel.reliable_windows, 32);
     for ^channel.reliable_windows do *it = 65535;
 }
@@ -130,9 +132,39 @@ peer_send :: (peer: ^Peer, channel_id: Channel_ID, packet: ^Packet) -> bool {
 
     channel := ^peer.channels[channel_id];
 
-    @TODO // Fragmented sending
     if packet.data.count > peer.mtu - sizeof Protocol_Header - sizeof Protocol_Send {
-        return false;
+        data_per_packet := peer.mtu - sizeof Protocol_Header - sizeof Protocol_Send_Fragment;
+
+        number_of_segments := packet.data.count / data_per_packet;
+        if packet.data.count % data_per_packet != 0 do number_of_segments += 1;
+
+        fragment_id := channel.next_fragment_id;
+        channel.next_fragment_id += 1;
+
+        data_remaining := packet.data.count;
+        for number_of_segments {
+            fragment := new(Protocol_Send_Fragment);
+            fragment.command = .Send_Fragment;
+            if packet.flags & .Reliable {
+                fragment.command |= .Flag_Acknowledge;
+            }
+
+            fragment.channel = channel_id;
+            fragment.data_length = net.host_to_network(cast(u16) math.min(data_remaining, data_per_packet));
+            fragment.fragment_id = net.host_to_network(fragment_id);
+            fragment.fragment_count = net.host_to_network(number_of_segments);
+            fragment.fragment_number = net.host_to_network(it);
+            fragment.total_length = net.host_to_network(packet.data.count);
+            fragment.fragment_offset = net.host_to_network(packet.data.count - data_remaining);
+
+            if peer_queue_outgoing_command(peer, fragment, packet) == null {
+                return false;
+            }
+
+            data_remaining -= cast(u32) net.network_to_host(fragment.data_length);
+        }
+
+        return true;
     }
 
     command := new(Protocol_Send);
@@ -142,9 +174,6 @@ peer_send :: (peer: ^Peer, channel_id: Channel_ID, packet: ^Packet) -> bool {
     if packet.flags & .Reliable {
         command.command = .Send_Reliable;
         command.command |= .Flag_Acknowledge;
-    } elseif packet.flags & .Unsequenced {
-        command.command = .Send_Unsequenced;
-        command.command |= .Flag_Unsequenced;
     } else {
         command.command = .Send_Unreliable;
     }
@@ -259,14 +288,19 @@ Channel_ID :: u8;
 
 Channel :: struct {
     id: Channel_ID;
-    seq_number: u16;
 
     reliable_windows: [] u16;
     reliable_windows_cursor: u32;
 
     outgoing_unreliable_seq_number: u16;
     outgoing_reliable_seq_number: u16;
-    incoming_reliable_seq_number: u16;
-}
 
+    next_fragment_id: u16;
+    pending_fragments: [..] Fragmented_Data;
+}
 
+Fragmented_Data :: struct {
+    fragment_id: u16;
+    data: [] u8;
+    fragments_remaining: u32;
+}
index 824ac399986e24527b998ad0d318d2419ad42dce..5e08004f188112d3fe7c12641e1db2e0b496ced3 100644 (file)
@@ -9,7 +9,7 @@ Command :: enum (u8) {
     Ping             :: 5;
     Send_Reliable    :: 6;
     Send_Unreliable  :: 7;
-    Send_Unsequenced :: 8;
+    Send_Fragment    :: 8;
 
     Flag_Acknowledge :: 1 << 6;
     Flag_Unsequenced :: 1 << 7;
@@ -59,6 +59,16 @@ Protocol_Send :: struct #pack {
     data_length: u16;
 }
 
+Protocol_Send_Fragment :: struct #pack {
+    use header: Protocol_Command_Header;
+    data_length     : u16;
+    fragment_id     : u16;
+    fragment_count  : u32;
+    fragment_number : u32;
+    total_length    : u32;
+    fragment_offset : u32;
+}
+
 Protocol_Disconnect :: Protocol_Command_Header;
 Protocol_Ping       :: Protocol_Command_Header;
 
@@ -71,5 +81,5 @@ command_sizes := u32.[
     /* Ping             */ sizeof Protocol_Ping,
     /* Send_Reliable    */ sizeof Protocol_Send,
     /* Send_Unreliable  */ sizeof Protocol_Send,
-    /* Send_Unsequenced */ sizeof Protocol_Send,
+    /* Send_Fragment    */ sizeof Protocol_Send_Fragment,
 ];
index cb7764d986c3c43117b3d692ac2890caf623bc0d..507545a49ee87f48ce906e623ac4cd48e3469f89 100644 (file)
@@ -60,7 +60,7 @@ main :: (args) => {
 
     while true {
         for host->get_events(timeout=100) {
-            printf("{*}\n", it);
+            // printf("{*}\n", it);
 
             if it.type == .Connection {
                 println("Successfully connected to server!");
@@ -72,12 +72,18 @@ main :: (args) => {
             }
 
             if it.type == .Message {
-                packet := new(onyx_net.Packet);
-                packet.flags |= .Reliable;
-                packet.data = "HELLO";
-
-                onyx_net.peer_send(it.peer, 0, packet);
+                print("Server says: ");
+                print(it.data);
+                print("\n");
             }
+
+            // if it.type == .Message {
+            //     packet := new(onyx_net.Packet);
+            //     packet.flags |= .Reliable;
+            //     packet.data = "HELLO";
+
+            //     onyx_net.peer_send(it.peer, 0, packet);
+            // }
         }
 
         // if random.between(0, 3) == 0 {
index cf076c90d2921475899477eb904b0dcd27387e03..184127a126f3139c4ed34ce536b695ed758ada71 100644 (file)
@@ -36,7 +36,7 @@ main :: (args) => {
                 if it.data == "What's Up?" {
                     packet := new(onyx_net.Packet);
                     packet.flags |= .Reliable;
-                    packet.data = "Not much";
+                    packet.data = #file_contents "src/host.onyx";
 
                     onyx_net.peer_send(it.peer, 0, packet);
                 }