From b0be7f99117d1713ecbca0c6caf4ec995e75e155 Mon Sep 17 00:00:00 2001 From: Brendan Hansen Date: Wed, 30 Mar 2022 16:08:10 -0500 Subject: [PATCH] implemented basic dead peer detection --- src/host.onyx | 64 +++++++++++++++++++++++++++++++++++++++++++---- src/packet.onyx | 4 +-- src/peer.onyx | 49 ++++++++++++++++++++++++++++++------ src/protocol.onyx | 3 ++- udp_client.onyx | 5 ++++ udp_server.onyx | 7 +++++- 6 files changed, 116 insertions(+), 16 deletions(-) diff --git a/src/host.onyx b/src/host.onyx index 6cc260b..11d78c1 100644 --- a/src/host.onyx +++ b/src/host.onyx @@ -9,6 +9,8 @@ Host :: struct { peers : [] Peer; connected_peers: u32; + ping_interval: u32; + no_ack_disconnect_timeout: u32; mtu: u32; current_time: u32; @@ -61,6 +63,8 @@ host_create :: (addr: ^net.Socket_Address, peer_count: u32) -> (^Host, Host_Crea host.addr = *addr; host.mtu = Host_Default_MTU; + host.ping_interval = 3000; + host.no_ack_disconnect_timeout = 10000; for^ peer: host.peers { peer.host = host; @@ -154,8 +158,25 @@ host_send_commands :: (host: ^Host) -> bool { peer_send_acknowledgements(peer); } - // TODO - // Peer pinging + time_diff := math.abs(cast(i64) peer.last_acknowledge_time - cast(i64) host.current_time); + + if peer.state == .Connected { + // Peer pinging + if time_diff >= ~~host.ping_interval { + peer_send_ping(peer); + } + + // Detect a disconnected peer + if time_diff >= ~~host.no_ack_disconnect_timeout { + peer_send_disconnect(peer); + peer_flush_outgoing_commands(peer); + peer_disconnect(peer); + + host_notify_disconnect(host, peer); + continue; + } + } + // Packet loss calcuations // Dropped packet detection @@ -206,7 +227,7 @@ host_handle_incoming_commands :: (host: ^Host) -> bool { return host.event.type != .None; } - while current_data.count > sizeof Protocol_Command_Header { + while current_data.count >= sizeof Protocol_Command_Header { if host.produced_event do break; command := cast(^Protocol_Command_Header) current_data.data; @@ -237,6 +258,7 @@ host_handle_incoming_commands :: (host: ^Host) -> bool { } case .Ping { + // This just requires an acknowledgement } case .Send_Reliable { @@ -268,7 +290,7 @@ host_handle_incoming_commands :: (host: ^Host) -> bool { } } - host.event.peer = peer; + // host.event.peer = peer; return host.produced_event; } @@ -322,6 +344,26 @@ host_get_events :: (host: ^Host, timeout: u32 = 0) -> Iterator(^Event) { #local host_handle_acknowledge_command :: (host: ^Host, peer: ^Peer, command: ^Protocol_Acknowledge) { if peer.state == .Disconnected || peer.state == .Zombie do return; + // + // This code was taken straight out of ENet's implementation, and it took me + // a while to understand what was going on here, so let me try to write it + // down for future me. + // + // As an optimization, the entire Unix time is not sent over the network, only + // the lower 16-bits are. This means that in order to compare times, the original + // time must be reconstructed. This is possible given a certain restriction: + // The response occurs within ~30 seconds of the timestamp. Since this is only + // reconstructed on the original machine, a difference of system clocks does + // not matter. + // + // The reason 30 seconds pops up is as follows: 16 bits represents 2^16 milliseconds. + // Given 2^10 milliseconds is about 1000 milliseconds, there are 2^6=64 seconds + // representable. What the code below does is stitch back together the upper 16-bits + // using the current time on the system, but also checks for a roll over case where + // the upper bits rolled over while the message was being processed. If this is the + // case the received time goes back 2^16 milliseconds. I believe this means that + // the response must occur within 30 seconds of the original message sent time, + // but I would have to think about it more to prove that. recv_sent_time := cast(u32) net.network_to_host(command.recv_sent_time); recv_sent_time |= host.current_time & 0xFFFF0000; if (recv_sent_time & 0x8000) == 0x8000 && (host.current_time & 0x8000) == 0 { @@ -334,6 +376,8 @@ host_get_events :: (host: ^Host, timeout: u32 = 0) -> Iterator(^Event) { return; } + peer.last_acknowledge_time = recv_sent_time; + recv_seq_number := net.network_to_host(command.recv_seq_number); removed_command_id := peer_remove_sent_reliable_command(peer, recv_seq_number, command.channel); // printf("Received acknowledgement for {}.\n", removed_command_id); @@ -384,6 +428,7 @@ host_get_events :: (host: ^Host, timeout: u32 = 0) -> Iterator(^Event) { peer.addr = host.received_addr; peer.outgoing_id = net.network_to_host(command.outgoing_peer_id); peer.mtu = net.network_to_host(command.mtu); + peer.last_acknowledge_time = host.current_time; verify := new(Protocol_Verify_Connect); verify.command = .Verify_Connect; @@ -424,8 +469,17 @@ host_get_events :: (host: ^Host, timeout: u32 = 0) -> Iterator(^Event) { // // This is slightly misnamed, as it is actually handling a received reliable message. #local host_handle_send_reliable_command :: (host: ^Host, peer: ^Peer, command: ^Protocol_Send, data: ^[] u8) { + if ~~command.channel > peer.channels.count || (peer.state != .Connected && peer.state != .Disconnect_Later) { + return; + } + + data_length := cast(u32) net.network_to_host(command.data_length); + if data_length > data.count { + return; + } + host_notify_message(host, peer, command.channel, *data); - string.advance(data, ~~ command.data_length); + string.advance(data, data_length); } diff --git a/src/packet.onyx b/src/packet.onyx index 99eca06..a170e3a 100644 --- a/src/packet.onyx +++ b/src/packet.onyx @@ -5,8 +5,8 @@ Outgoing_Command :: struct { packet : ^Packet; reliable_seq_number : u16; unreliable_seq_number : u16; - sent_time : u16; - send_attempts : u16; + sent_time : u32; + send_attempts : u32; pack_into_buffer :: outgoing_command_pack_into_buffer; } diff --git a/src/peer.onyx b/src/peer.onyx index 926be77..e432ffa 100644 --- a/src/peer.onyx +++ b/src/peer.onyx @@ -20,12 +20,28 @@ Peer :: struct { channels: [] Channel; + // The other side should use this id to reference this peer. incoming_id: u16; + + // This should be used in an outgoing messages to the other + // side. This is set up in the initial handshakes. outgoing_id: u16; + + // Magic random number to verify a "Verify_Connect" packet. connect_id: u32; + + // Maximum-Transmission-Unit mtu: u32; + + // Unused window_size: u32; + + // Unix time of last sent message; not necessary last successfully + // delivered message. last_send_time: u32; + + // Unix time of last message that was acknowledge to be received. + last_acknowledge_time: u32; outgoing_reliable_seq_number: u16; incoming_reliable_seq_number: u16; @@ -38,9 +54,9 @@ Peer :: struct { } peer_destroy :: (peer: ^Peer) { - memory.free_slice(^peer.channels); - array.free(^peer.outgoing_commands); - array.free(^peer.incoming_commands); + if peer.channels.data != null do memory.free_slice(^peer.channels); + if peer.outgoing_commands.data != null do array.free(^peer.outgoing_commands); + if peer.incoming_commands.data != null do array.free(^peer.incoming_commands); } peer_disconnect :: (peer: ^Peer) { @@ -131,12 +147,13 @@ peer_queue_acknowledgement :: (peer: ^Peer, command: ^Protocol_Command_Header, s } peer_flush_outgoing_commands :: (peer: ^Peer) -> i32 { - peer.last_send_time = peer.host.current_time; - send_buffer: [65535] u8; total_sent := 0; for peer.outgoing_commands { + it.sent_time = peer.host.current_time; + it.send_attempts += 1; + to_send, success := it->pack_into_buffer(peer, send_buffer); if !success do continue; @@ -146,6 +163,7 @@ peer_flush_outgoing_commands :: (peer: ^Peer) -> i32 { } total_sent += sent_bytes; + peer.last_send_time = peer.host.current_time; if (it.command.command & .Flag_Acknowledge) != 0 { peer.sent_reliable_commands << it; @@ -160,11 +178,13 @@ peer_send_acknowledgements :: (peer: ^Peer) { send_buffer: [65535] u8; for ^ack: peer.acknowledgements { + seq_num := net.host_to_network(ack.command.seq_number); + command: Protocol_Acknowledge; command.command = .Acknowledge; command.channel = ack.command.channel; - command.seq_number = 0; - command.recv_seq_number = net.host_to_network(ack.command.seq_number); + command.seq_number = seq_num; + command.recv_seq_number = seq_num; command.recv_sent_time = net.host_to_network(ack.sent_time); out: Outgoing_Command; @@ -180,6 +200,21 @@ peer_send_acknowledgements :: (peer: ^Peer) { array.clear(^peer.acknowledgements); } +peer_send_ping :: (peer: ^Peer) { + ping := new(Protocol_Ping); + ping.command = .Ping; + ping.command |= .Flag_Acknowledge; + ping.channel = 255; + peer_queue_outgoing_command(peer, ping); +} + +peer_send_disconnect :: (peer: ^Peer) { + disconnect := new(Protocol_Disconnect); + disconnect.command = .Disconnect; + disconnect.channel = 255; + peer_queue_outgoing_command(peer, disconnect); +} + peer_remove_sent_reliable_command :: (peer: ^Peer, seq_num: u16, channel: Channel_ID) -> Command { command: ^Outgoing_Command; index := 0; diff --git a/src/protocol.onyx b/src/protocol.onyx index 76fa352..824ac39 100644 --- a/src/protocol.onyx +++ b/src/protocol.onyx @@ -60,6 +60,7 @@ Protocol_Send :: struct #pack { } Protocol_Disconnect :: Protocol_Command_Header; +Protocol_Ping :: Protocol_Command_Header; command_sizes := u32.[ /* None */ 0, @@ -67,7 +68,7 @@ command_sizes := u32.[ /* Connect */ sizeof Protocol_Connect, /* Verify_Connect */ sizeof Protocol_Verify_Connect, /* Disconnect */ sizeof Protocol_Disconnect, - /* Ping */ 0, + /* Ping */ sizeof Protocol_Ping, /* Send_Reliable */ sizeof Protocol_Send, /* Send_Unreliable */ sizeof Protocol_Send, /* Send_Unsequenced */ sizeof Protocol_Send, diff --git a/udp_client.onyx b/udp_client.onyx index d721fb9..a31f049 100644 --- a/udp_client.onyx +++ b/udp_client.onyx @@ -34,6 +34,11 @@ main :: (args) => { println("Successfully connected to server!"); } + if it.type == .Disconnection { + println("Lost connection to server."); + break break; + } + if it.type == .Message { packet := new(onyx_net.Packet); packet.flags |= .Reliable; diff --git a/udp_server.onyx b/udp_server.onyx index 90e924e..e14fc67 100644 --- a/udp_server.onyx +++ b/udp_server.onyx @@ -17,9 +17,10 @@ main :: (args) => { while true { for server->get_events(timeout = 2000) { - printf("{*}\n", it); + // printf("{*}\n", it); if it.type == .Connection { + printf("New connection from: {}\n", it.peer.addr); packet := new(onyx_net.Packet); packet.flags |= .Reliable; packet.data = "Welcome!!!"; @@ -27,6 +28,10 @@ main :: (args) => { onyx_net.host_broadcast(server, 0, packet); } + if it.type == .Disconnection { + printf("Disconnection: {} {}\n", it.peer.addr, it.peer); + } + if it.type == .Message { if it.data == "What's Up?" { packet := new(onyx_net.Packet); -- 2.25.1