From 1e9e12a6b11976cd9f2832f0cfd6300e96f0bbde Mon Sep 17 00:00:00 2001 From: Brendan Hansen Date: Fri, 1 Apr 2022 20:30:49 -0500 Subject: [PATCH] added fragmented reliable data --- doc.md | 6 +++++ src/host.onyx | 69 +++++++++++++++++++++++++++++++++++++++++------ src/packet.onyx | 12 ++++++++- src/peer.onyx | 52 ++++++++++++++++++++++++++++------- src/protocol.onyx | 14 ++++++++-- udp_client.onyx | 18 ++++++++----- udp_server.onyx | 2 +- 7 files changed, 146 insertions(+), 27 deletions(-) diff --git a/doc.md b/doc.md index b576046..3aa18ff 100644 --- a/doc.md +++ b/doc.md @@ -26,3 +26,9 @@ Each packet is at most 1400 bytes in size. | peerID | sentTime | checksum | channelID | commandID | data OPTIONAL OPTIONAL + +Currently, these features are not supported: + [ ] Packets are not necessarily sequenced. A dropped packet followed + by a successful packet with be out of order. + + [x] Framemented reliable data. \ No newline at end of file diff --git a/src/host.onyx b/src/host.onyx index cbcdb80..538da4a 100644 --- a/src/host.onyx +++ b/src/host.onyx @@ -58,7 +58,7 @@ host_create :: (addr: ^net.Socket_Address, peer_count: u32) -> (^Host, Host_Crea } } - host.socket->setting(.NonBlocking, 1); + host.socket->setting(.NonBlocking, 1); host.socket->setting(.Broadcast, 1); host.addr = *addr; @@ -82,7 +82,7 @@ host_create :: (addr: ^net.Socket_Address, peer_count: u32) -> (^Host, Host_Crea host_connect :: (host: ^Host, addr: ^net.Socket_Address, channel_count: u32) -> ^Peer { peer: ^Peer; - + // Find first peer that is in the disconnected state. for^ host.peers do if it.state == .Disconnected { peer = it; @@ -100,7 +100,7 @@ host_connect :: (host: ^Host, addr: ^net.Socket_Address, channel_count: u32) -> peer.last_acknowledge_time = host.current_time; for^ peer.channels do peer_setup_channel(peer, it); - + command := new(Protocol_Connect); command.command = .Connect; command.command |= .Flag_Acknowledge; @@ -205,7 +205,7 @@ host_receive_commands :: (host: ^Host, timeout: u32 = 0) -> bool { host_handle_incoming_commands :: (host: ^Host) -> bool { header: ^Protocol_Header = ~~ host.received_data.data; current_data := host.received_data; - + peer_id := cast(u32) net.network_to_host(header.peer_id); peer: ^Peer; @@ -229,7 +229,7 @@ host_handle_incoming_commands :: (host: ^Host) -> bool { while current_data.count >= sizeof Protocol_Command_Header { if host.produced_event do break; - + command := cast(^Protocol_Command_Header) current_data.data; command_id := command_get_effective(command.command); command.seq_number = net.network_to_host(command.seq_number); @@ -268,7 +268,9 @@ host_handle_incoming_commands :: (host: ^Host) -> bool { host_handle_send_reliable_command(host, peer, ~~ command, ^current_data); } - case .Send_Unsequenced { + case .Send_Fragment { + if peer == null do return_block(); + host_handle_send_fragment_command(host, peer, ~~ command, ^current_data); } } @@ -289,7 +291,6 @@ host_handle_incoming_commands :: (host: ^Host) -> bool { } } - // host.event.peer = peer; return host.produced_event; } @@ -310,7 +311,7 @@ host_get_events :: (host: ^Host, timeout: u32 = 0) -> Iterator(^Event) { ctx := new(Context); ctx.host = host; ctx.timeout = timeout; - return .{ ctx, next, null_proc }; + return .{ ctx, next, cfree }; } #local host_notify_connect :: (host: ^Host, peer: ^Peer) { @@ -497,6 +498,58 @@ host_get_events :: (host: ^Host, timeout: u32 = 0) -> Iterator(^Event) { host_notify_message(host, peer, command.channel, *data); } +#local host_handle_send_fragment_command :: (host: ^Host, peer: ^Peer, command: ^Protocol_Send_Fragment, data: ^[] u8) { + if ~~command.channel > peer.channels.count || (peer.state != .Connected && peer.state != .Disconnect_Later) { + return; + } + + data_length := cast(u32) net.network_to_host(command.data_length); + if data_length > data.count { + return; + } + defer string.advance(data, data_length); + + @CopyPaste + channel := ^peer.channels[command.channel]; + for channel.reliable_windows { + if command.seq_number == it do return; + } + + channel.reliable_windows[channel.reliable_windows_cursor] = command.seq_number; + channel.reliable_windows_cursor += 1; + channel.reliable_windows_cursor %= channel.reliable_windows.count; + + fragment_id := net.network_to_host(command.fragment_id); + fragment: ^Fragmented_Data = null; + for^ channel.pending_fragments { + if it.fragment_id == fragment_id { + fragment = it; + break; + } + } + + if fragment == null { + fragment = array.alloc_one(^channel.pending_fragments); + fragment.fragment_id = fragment_id; + fragment.fragments_remaining = net.network_to_host(command.fragment_count); + memory.alloc_slice(^fragment.data, net.network_to_host(command.total_length)); + } + + memory.copy(fragment.data.data + net.network_to_host(command.fragment_offset), data.data, data_length); + fragment.fragments_remaining -= 1; + + if fragment.fragments_remaining == 0 { + host_notify_message(host, peer, command.channel, fragment.data); + + for iter.as_iterator(^channel.pending_fragments, by_pointer=true) { + if it.fragment_id == fragment_id { + #remove; + break; + } + } + } +} + @Relocate Event :: struct { diff --git a/src/packet.onyx b/src/packet.onyx index a170e3a..ace4186 100644 --- a/src/packet.onyx +++ b/src/packet.onyx @@ -59,7 +59,17 @@ outgoing_command_pack_into_buffer :: (out: ^Outgoing_Command, peer: ^Peer, buf: write((cast(^u8) out.command)[0 .. command_size]); if out.packet != null { - write(out.packet.data); + if command_get_effective(out.command.command) == .Send_Fragment { + // + // Converting to network byte order just to switch back here is a little silly... + fragment := cast(^Protocol_Send_Fragment) out.command; + start := cast(u32) net.network_to_host(fragment.fragment_offset); + end := start + ~~net.network_to_host(fragment.data_length); + + write(out.packet.data[start .. end]); + } else { + write(out.packet.data); + } } return buf[0 .. write_offset], true; diff --git a/src/peer.onyx b/src/peer.onyx index f5a0139..80a2732 100644 --- a/src/peer.onyx +++ b/src/peer.onyx @@ -71,8 +71,10 @@ peer_disconnect :: (peer: ^Peer) { peer_setup_channel :: (peer: ^Peer, channel: ^Channel) { channel.id = ~~((cast(u32) channel - cast(u32) peer.channels.data) / 4); - channel.seq_number = 0; + channel.outgoing_reliable_seq_number = 0; channel.reliable_windows_cursor = 0; + channel.next_fragment_id = 0; + memory.alloc_slice(^channel.reliable_windows, 32); for ^channel.reliable_windows do *it = 65535; } @@ -130,9 +132,39 @@ peer_send :: (peer: ^Peer, channel_id: Channel_ID, packet: ^Packet) -> bool { channel := ^peer.channels[channel_id]; - @TODO // Fragmented sending if packet.data.count > peer.mtu - sizeof Protocol_Header - sizeof Protocol_Send { - return false; + data_per_packet := peer.mtu - sizeof Protocol_Header - sizeof Protocol_Send_Fragment; + + number_of_segments := packet.data.count / data_per_packet; + if packet.data.count % data_per_packet != 0 do number_of_segments += 1; + + fragment_id := channel.next_fragment_id; + channel.next_fragment_id += 1; + + data_remaining := packet.data.count; + for number_of_segments { + fragment := new(Protocol_Send_Fragment); + fragment.command = .Send_Fragment; + if packet.flags & .Reliable { + fragment.command |= .Flag_Acknowledge; + } + + fragment.channel = channel_id; + fragment.data_length = net.host_to_network(cast(u16) math.min(data_remaining, data_per_packet)); + fragment.fragment_id = net.host_to_network(fragment_id); + fragment.fragment_count = net.host_to_network(number_of_segments); + fragment.fragment_number = net.host_to_network(it); + fragment.total_length = net.host_to_network(packet.data.count); + fragment.fragment_offset = net.host_to_network(packet.data.count - data_remaining); + + if peer_queue_outgoing_command(peer, fragment, packet) == null { + return false; + } + + data_remaining -= cast(u32) net.network_to_host(fragment.data_length); + } + + return true; } command := new(Protocol_Send); @@ -142,9 +174,6 @@ peer_send :: (peer: ^Peer, channel_id: Channel_ID, packet: ^Packet) -> bool { if packet.flags & .Reliable { command.command = .Send_Reliable; command.command |= .Flag_Acknowledge; - } elseif packet.flags & .Unsequenced { - command.command = .Send_Unsequenced; - command.command |= .Flag_Unsequenced; } else { command.command = .Send_Unreliable; } @@ -259,14 +288,19 @@ Channel_ID :: u8; Channel :: struct { id: Channel_ID; - seq_number: u16; reliable_windows: [] u16; reliable_windows_cursor: u32; outgoing_unreliable_seq_number: u16; outgoing_reliable_seq_number: u16; - incoming_reliable_seq_number: u16; -} + next_fragment_id: u16; + pending_fragments: [..] Fragmented_Data; +} +Fragmented_Data :: struct { + fragment_id: u16; + data: [] u8; + fragments_remaining: u32; +} diff --git a/src/protocol.onyx b/src/protocol.onyx index 824ac39..5e08004 100644 --- a/src/protocol.onyx +++ b/src/protocol.onyx @@ -9,7 +9,7 @@ Command :: enum (u8) { Ping :: 5; Send_Reliable :: 6; Send_Unreliable :: 7; - Send_Unsequenced :: 8; + Send_Fragment :: 8; Flag_Acknowledge :: 1 << 6; Flag_Unsequenced :: 1 << 7; @@ -59,6 +59,16 @@ Protocol_Send :: struct #pack { data_length: u16; } +Protocol_Send_Fragment :: struct #pack { + use header: Protocol_Command_Header; + data_length : u16; + fragment_id : u16; + fragment_count : u32; + fragment_number : u32; + total_length : u32; + fragment_offset : u32; +} + Protocol_Disconnect :: Protocol_Command_Header; Protocol_Ping :: Protocol_Command_Header; @@ -71,5 +81,5 @@ command_sizes := u32.[ /* Ping */ sizeof Protocol_Ping, /* Send_Reliable */ sizeof Protocol_Send, /* Send_Unreliable */ sizeof Protocol_Send, - /* Send_Unsequenced */ sizeof Protocol_Send, + /* Send_Fragment */ sizeof Protocol_Send_Fragment, ]; diff --git a/udp_client.onyx b/udp_client.onyx index cb7764d..507545a 100644 --- a/udp_client.onyx +++ b/udp_client.onyx @@ -60,7 +60,7 @@ main :: (args) => { while true { for host->get_events(timeout=100) { - printf("{*}\n", it); + // printf("{*}\n", it); if it.type == .Connection { println("Successfully connected to server!"); @@ -72,12 +72,18 @@ main :: (args) => { } if it.type == .Message { - packet := new(onyx_net.Packet); - packet.flags |= .Reliable; - packet.data = "HELLO"; - - onyx_net.peer_send(it.peer, 0, packet); + print("Server says: "); + print(it.data); + print("\n"); } + + // if it.type == .Message { + // packet := new(onyx_net.Packet); + // packet.flags |= .Reliable; + // packet.data = "HELLO"; + + // onyx_net.peer_send(it.peer, 0, packet); + // } } // if random.between(0, 3) == 0 { diff --git a/udp_server.onyx b/udp_server.onyx index cf076c9..184127a 100644 --- a/udp_server.onyx +++ b/udp_server.onyx @@ -36,7 +36,7 @@ main :: (args) => { if it.data == "What's Up?" { packet := new(onyx_net.Packet); packet.flags |= .Reliable; - packet.data = "Not much"; + packet.data = #file_contents "src/host.onyx"; onyx_net.peer_send(it.peer, 0, packet); } -- 2.25.1