From a6b52e60aeb09d48c0e86c87a63e72860619e744 Mon Sep 17 00:00:00 2001 From: Brendan Hansen Date: Fri, 1 Apr 2022 16:42:53 -0500 Subject: [PATCH] added disconnection and resending commands --- src/host.onyx | 69 ++++++++++++++++++++++++++++++------------------- src/module.onyx | 2 ++ src/peer.onyx | 30 ++++++++++++++++----- udp_client.onyx | 56 +++++++++++++++++++++++++++++++++------ udp_server.onyx | 2 +- 5 files changed, 116 insertions(+), 43 deletions(-) diff --git a/src/host.onyx b/src/host.onyx index 11d78c1..cbcdb80 100644 --- a/src/host.onyx +++ b/src/host.onyx @@ -65,6 +65,7 @@ host_create :: (addr: ^net.Socket_Address, peer_count: u32) -> (^Host, Host_Crea host.mtu = Host_Default_MTU; host.ping_interval = 3000; host.no_ack_disconnect_timeout = 10000; + host.current_time = ~~ (os.time() & cast(u64) 0xFFFFFFFF); for^ peer: host.peers { peer.host = host; @@ -73,6 +74,7 @@ host_create :: (addr: ^net.Socket_Address, peer_count: u32) -> (^Host, Host_Crea peer.incoming_id = ~~((cast(u32) peer - cast(u32) host.peers.data) / sizeof Peer); peer.outgoing_id = ~~((cast(u32) peer - cast(u32) host.peers.data) / sizeof Peer); peer.mtu = host.mtu; + peer.no_ack_resend_timeout = 50; } return host, .None; @@ -95,13 +97,9 @@ host_connect :: (host: ^Host, addr: ^net.Socket_Address, channel_count: u32) -> peer.addr = *addr; peer.connect_id = random.int(); peer.mtu = Host_Default_MTU; + peer.last_acknowledge_time = host.current_time; - for^ channel: peer.channels { - channel.id = ~~((cast(u32) channel - cast(u32) peer.channels.data) / 4); - channel.seq_number = 0; - channel.max_reliable_windows = 16; - memory.alloc_slice(^channel.reliable_windows, channel.max_reliable_windows); - } + for^ peer.channels do peer_setup_channel(peer, it); command := new(Protocol_Connect); command.command = .Connect; @@ -135,8 +133,8 @@ host_pulse :: (host: ^Host, timeout: u32 = 0) -> bool { host.produced_event = false; host.current_time = ~~ (os.time() & cast(u64) 0xFFFFFFFF); - sent_command := host_send_commands(host); - received_command := host_receive_commands(host, timeout); + host_send_commands(host); + host_receive_commands(host, timeout); return host.produced_event; } @@ -160,26 +158,28 @@ host_send_commands :: (host: ^Host) -> bool { time_diff := math.abs(cast(i64) peer.last_acknowledge_time - cast(i64) host.current_time); - if peer.state == .Connected { - // Peer pinging - if time_diff >= ~~host.ping_interval { - peer_send_ping(peer); - } + // Peer pinging + if time_diff >= ~~host.ping_interval { + peer_send_ping(peer); + } - // Detect a disconnected peer - if time_diff >= ~~host.no_ack_disconnect_timeout { - peer_send_disconnect(peer); - peer_flush_outgoing_commands(peer); - peer_disconnect(peer); + // Detect a disconnected peer + if time_diff >= ~~host.no_ack_disconnect_timeout { + peer_send_disconnect(peer); + peer_flush_outgoing_commands(peer); + peer_disconnect(peer); - host_notify_disconnect(host, peer); - continue; - } + host_notify_disconnect(host, peer); + continue; } // Packet loss calcuations + + // Dropped packet detection + peer_check_outgoing_commands(peer); + // Sending commands peer_flush_outgoing_commands(peer); } } @@ -255,20 +255,19 @@ host_handle_incoming_commands :: (host: ^Host) -> bool { } case .Disconnect { + if peer == null do return_block(); + host_handle_disconnect_command(host, peer, ~~ command); } case .Ping { // This just requires an acknowledgement } - case .Send_Reliable { + case .Send_Reliable, .Send_Unreliable { if peer == null do return_block(); host_handle_send_reliable_command(host, peer, ~~ command, ^current_data); } - case .Send_Unreliable { - } - case .Send_Unsequenced { } } @@ -421,7 +420,7 @@ host_get_events :: (host: ^Host, timeout: u32 = 0) -> Iterator(^Event) { channel_count := net.network_to_host(command.channel_count); memory.alloc_slice(^peer.channels, channel_count); - memory.set(peer.channels.data, 0, channel_count * sizeof Channel); + for ^peer.channels do peer_setup_channel(peer, it); peer.state = .Acknowledging_Connection; peer.connect_id = net.network_to_host(command.connect_id); @@ -466,6 +465,11 @@ host_get_events :: (host: ^Host, timeout: u32 = 0) -> Iterator(^Event) { return; } +#local host_handle_disconnect_command :: (host: ^Host, peer: ^Peer, command: ^Protocol_Disconnect) { + peer.state = .Disconnected; + host_notify_disconnect(host, peer); +} + // // This is slightly misnamed, as it is actually handling a received reliable message. #local host_handle_send_reliable_command :: (host: ^Host, peer: ^Peer, command: ^Protocol_Send, data: ^[] u8) { @@ -477,9 +481,20 @@ host_get_events :: (host: ^Host, timeout: u32 = 0) -> Iterator(^Event) { if data_length > data.count { return; } + defer string.advance(data, data_length); + + if command_get_effective(command.command) == .Send_Reliable { + channel := ^peer.channels[command.channel]; + for channel.reliable_windows { + if command.seq_number == it do return; + } + + channel.reliable_windows[channel.reliable_windows_cursor] = command.seq_number; + channel.reliable_windows_cursor += 1; + channel.reliable_windows_cursor %= channel.reliable_windows.count; + } host_notify_message(host, peer, command.channel, *data); - string.advance(data, data_length); } diff --git a/src/module.onyx b/src/module.onyx index 6db6c8a..4bee62e 100644 --- a/src/module.onyx +++ b/src/module.onyx @@ -9,6 +9,8 @@ package onyx_net string :: package core.string random :: package core.random os :: package core.os + math :: package core.math + iter :: package core.iter } #load_all "./." diff --git a/src/peer.onyx b/src/peer.onyx index e432ffa..f5a0139 100644 --- a/src/peer.onyx +++ b/src/peer.onyx @@ -43,6 +43,8 @@ Peer :: struct { // Unix time of last message that was acknowledge to be received. last_acknowledge_time: u32; + no_ack_resend_timeout: u32; + outgoing_reliable_seq_number: u16; incoming_reliable_seq_number: u16; outgoing_unsequenced_group: u32; @@ -67,6 +69,14 @@ peer_disconnect :: (peer: ^Peer) { peer_destroy(peer); } +peer_setup_channel :: (peer: ^Peer, channel: ^Channel) { + channel.id = ~~((cast(u32) channel - cast(u32) peer.channels.data) / 4); + channel.seq_number = 0; + channel.reliable_windows_cursor = 0; + memory.alloc_slice(^channel.reliable_windows, 32); + for ^channel.reliable_windows do *it = 65535; +} + peer_queue_outgoing_command :: #match { (peer: ^Peer, command: ^Protocol_Command_Header, packet: ^Packet = null) -> ^Outgoing_Command { out := new(Outgoing_Command); @@ -146,6 +156,16 @@ peer_queue_acknowledgement :: (peer: ^Peer, command: ^Protocol_Command_Header, s peer.acknowledgements << .{ sent_time, command }; } +peer_check_outgoing_commands :: (peer: ^Peer) { + for iter.as_iterator(^peer.sent_reliable_commands) { + time_diff := math.abs(cast(i64) it.sent_time - cast(i64) peer.host.current_time); + if time_diff >= ~~ peer.no_ack_resend_timeout { + peer.outgoing_commands << it; + #remove; + } + } +} + peer_flush_outgoing_commands :: (peer: ^Peer) -> i32 { send_buffer: [65535] u8; @@ -217,14 +237,12 @@ peer_send_disconnect :: (peer: ^Peer) { peer_remove_sent_reliable_command :: (peer: ^Peer, seq_num: u16, channel: Channel_ID) -> Command { command: ^Outgoing_Command; - index := 0; - for peer.sent_reliable_commands { + for iter.as_iterator(^peer.sent_reliable_commands) { if it.reliable_seq_number == seq_num && it.command.channel == channel { command = it; + #remove; break; } - - index += 1; } if command == null do return .None; @@ -232,7 +250,6 @@ peer_remove_sent_reliable_command :: (peer: ^Peer, seq_num: u16, channel: Channe defer { cfree(command.command); cfree(command); - array.fast_delete(^peer.sent_reliable_commands, index); } return command_get_effective(command.command.command); } @@ -245,12 +262,11 @@ Channel :: struct { seq_number: u16; reliable_windows: [] u16; - max_reliable_windows: u32; + reliable_windows_cursor: u32; outgoing_unreliable_seq_number: u16; outgoing_reliable_seq_number: u16; incoming_reliable_seq_number: u16; - } diff --git a/udp_client.onyx b/udp_client.onyx index a31f049..cb7764d 100644 --- a/udp_client.onyx +++ b/udp_client.onyx @@ -26,8 +26,40 @@ main :: (args) => { addr.port = 8080; peer := onyx_net.host_connect(host, ^addr, 2); + input_thread: thread.Thread; + td_type :: struct { peer: ^onyx_net.Peer; } + td := td_type.{ peer }; + + thread.spawn(^input_thread, ^td, (data) => { + peer := data.peer; + input_reader := io.reader_make(^stdin); + + while true { + line := io.read_line(^input_reader) |> string.strip_whitespace(); + + switch line { + case "disconnect" { + println("Disconnecting"); + onyx_net.peer_send_disconnect(peer); + onyx_net.peer_flush_outgoing_commands(peer); + onyx_net.peer_disconnect(peer); + break; + } + + case "send" { + packet := new(onyx_net.Packet); + packet.flags |= .Reliable; + packet.data = "What's Up?"; + println("Sending what's up..."); + + onyx_net.peer_send(peer, 0, packet); + } + } + } + }); + while true { - for host->get_events(timeout=500) { + for host->get_events(timeout=100) { printf("{*}\n", it); if it.type == .Connection { @@ -48,13 +80,21 @@ main :: (args) => { } } - if random.between(0, 3) == 0 { - packet := new(onyx_net.Packet); - packet.flags |= .Reliable; - packet.data = "What's Up?"; - println("Sending what's up..."); + // if random.between(0, 3) == 0 { + // packet := new(onyx_net.Packet); + // packet.flags |= .Reliable; + // packet.data = "What's Up?"; + // println("Sending what's up..."); - onyx_net.peer_send(peer, 0, packet); - } + // onyx_net.peer_send(peer, 0, packet); + // } + + // if random.between(0, 100) == 4 { + // println("Disconnecting"); + // onyx_net.peer_send_disconnect(peer); + // onyx_net.peer_flush_outgoing_commands(peer); + // onyx_net.peer_disconnect(peer); + // break; + // } } } diff --git a/udp_server.onyx b/udp_server.onyx index e14fc67..cf076c9 100644 --- a/udp_server.onyx +++ b/udp_server.onyx @@ -29,7 +29,7 @@ main :: (args) => { } if it.type == .Disconnection { - printf("Disconnection: {} {}\n", it.peer.addr, it.peer); + printf("Disconnection: {}\n", it.peer.addr); } if it.type == .Message { -- 2.25.1