host.mtu = Host_Default_MTU;
host.ping_interval = 3000;
host.no_ack_disconnect_timeout = 10000;
+ host.current_time = ~~ (os.time() & cast(u64) 0xFFFFFFFF);
for^ peer: host.peers {
peer.host = host;
peer.incoming_id = ~~((cast(u32) peer - cast(u32) host.peers.data) / sizeof Peer);
peer.outgoing_id = ~~((cast(u32) peer - cast(u32) host.peers.data) / sizeof Peer);
peer.mtu = host.mtu;
+ peer.no_ack_resend_timeout = 50;
}
return host, .None;
peer.addr = *addr;
peer.connect_id = random.int();
peer.mtu = Host_Default_MTU;
+ peer.last_acknowledge_time = host.current_time;
- for^ channel: peer.channels {
- channel.id = ~~((cast(u32) channel - cast(u32) peer.channels.data) / 4);
- channel.seq_number = 0;
- channel.max_reliable_windows = 16;
- memory.alloc_slice(^channel.reliable_windows, channel.max_reliable_windows);
- }
+ for^ peer.channels do peer_setup_channel(peer, it);
command := new(Protocol_Connect);
command.command = .Connect;
host.produced_event = false;
host.current_time = ~~ (os.time() & cast(u64) 0xFFFFFFFF);
- sent_command := host_send_commands(host);
- received_command := host_receive_commands(host, timeout);
+ host_send_commands(host);
+ host_receive_commands(host, timeout);
return host.produced_event;
}
time_diff := math.abs(cast(i64) peer.last_acknowledge_time - cast(i64) host.current_time);
- if peer.state == .Connected {
- // Peer pinging
- if time_diff >= ~~host.ping_interval {
- peer_send_ping(peer);
- }
+ // Peer pinging
+ if time_diff >= ~~host.ping_interval {
+ peer_send_ping(peer);
+ }
- // Detect a disconnected peer
- if time_diff >= ~~host.no_ack_disconnect_timeout {
- peer_send_disconnect(peer);
- peer_flush_outgoing_commands(peer);
- peer_disconnect(peer);
+ // Detect a disconnected peer
+ if time_diff >= ~~host.no_ack_disconnect_timeout {
+ peer_send_disconnect(peer);
+ peer_flush_outgoing_commands(peer);
+ peer_disconnect(peer);
- host_notify_disconnect(host, peer);
- continue;
- }
+ host_notify_disconnect(host, peer);
+ continue;
}
// Packet loss calcuations
+
+
// Dropped packet detection
+ peer_check_outgoing_commands(peer);
+ // Sending commands
peer_flush_outgoing_commands(peer);
}
}
}
case .Disconnect {
+ if peer == null do return_block();
+ host_handle_disconnect_command(host, peer, ~~ command);
}
case .Ping {
// This just requires an acknowledgement
}
- case .Send_Reliable {
+ case .Send_Reliable, .Send_Unreliable {
if peer == null do return_block();
host_handle_send_reliable_command(host, peer, ~~ command, ^current_data);
}
- case .Send_Unreliable {
- }
-
case .Send_Unsequenced {
}
}
channel_count := net.network_to_host(command.channel_count);
memory.alloc_slice(^peer.channels, channel_count);
- memory.set(peer.channels.data, 0, channel_count * sizeof Channel);
+ for ^peer.channels do peer_setup_channel(peer, it);
peer.state = .Acknowledging_Connection;
peer.connect_id = net.network_to_host(command.connect_id);
return;
}
+#local host_handle_disconnect_command :: (host: ^Host, peer: ^Peer, command: ^Protocol_Disconnect) {
+ peer.state = .Disconnected;
+ host_notify_disconnect(host, peer);
+}
+
//
// This is slightly misnamed, as it is actually handling a received reliable message.
#local host_handle_send_reliable_command :: (host: ^Host, peer: ^Peer, command: ^Protocol_Send, data: ^[] u8) {
if data_length > data.count {
return;
}
+ defer string.advance(data, data_length);
+
+ if command_get_effective(command.command) == .Send_Reliable {
+ channel := ^peer.channels[command.channel];
+ for channel.reliable_windows {
+ if command.seq_number == it do return;
+ }
+
+ channel.reliable_windows[channel.reliable_windows_cursor] = command.seq_number;
+ channel.reliable_windows_cursor += 1;
+ channel.reliable_windows_cursor %= channel.reliable_windows.count;
+ }
host_notify_message(host, peer, command.channel, *data);
- string.advance(data, data_length);
}
string :: package core.string
random :: package core.random
os :: package core.os
+ math :: package core.math
+ iter :: package core.iter
}
#load_all "./."
// Unix time of last message that was acknowledge to be received.
last_acknowledge_time: u32;
+ no_ack_resend_timeout: u32;
+
outgoing_reliable_seq_number: u16;
incoming_reliable_seq_number: u16;
outgoing_unsequenced_group: u32;
peer_destroy(peer);
}
+peer_setup_channel :: (peer: ^Peer, channel: ^Channel) {
+ channel.id = ~~((cast(u32) channel - cast(u32) peer.channels.data) / 4);
+ channel.seq_number = 0;
+ channel.reliable_windows_cursor = 0;
+ memory.alloc_slice(^channel.reliable_windows, 32);
+ for ^channel.reliable_windows do *it = 65535;
+}
+
peer_queue_outgoing_command :: #match {
(peer: ^Peer, command: ^Protocol_Command_Header, packet: ^Packet = null) -> ^Outgoing_Command {
out := new(Outgoing_Command);
peer.acknowledgements << .{ sent_time, command };
}
+peer_check_outgoing_commands :: (peer: ^Peer) {
+ for iter.as_iterator(^peer.sent_reliable_commands) {
+ time_diff := math.abs(cast(i64) it.sent_time - cast(i64) peer.host.current_time);
+ if time_diff >= ~~ peer.no_ack_resend_timeout {
+ peer.outgoing_commands << it;
+ #remove;
+ }
+ }
+}
+
peer_flush_outgoing_commands :: (peer: ^Peer) -> i32 {
send_buffer: [65535] u8;
peer_remove_sent_reliable_command :: (peer: ^Peer, seq_num: u16, channel: Channel_ID) -> Command {
command: ^Outgoing_Command;
- index := 0;
- for peer.sent_reliable_commands {
+ for iter.as_iterator(^peer.sent_reliable_commands) {
if it.reliable_seq_number == seq_num && it.command.channel == channel {
command = it;
+ #remove;
break;
}
-
- index += 1;
}
if command == null do return .None;
defer {
cfree(command.command);
cfree(command);
- array.fast_delete(^peer.sent_reliable_commands, index);
}
return command_get_effective(command.command.command);
}
seq_number: u16;
reliable_windows: [] u16;
- max_reliable_windows: u32;
+ reliable_windows_cursor: u32;
outgoing_unreliable_seq_number: u16;
outgoing_reliable_seq_number: u16;
incoming_reliable_seq_number: u16;
-
}
addr.port = 8080;
peer := onyx_net.host_connect(host, ^addr, 2);
+ input_thread: thread.Thread;
+ td_type :: struct { peer: ^onyx_net.Peer; }
+ td := td_type.{ peer };
+
+ thread.spawn(^input_thread, ^td, (data) => {
+ peer := data.peer;
+ input_reader := io.reader_make(^stdin);
+
+ while true {
+ line := io.read_line(^input_reader) |> string.strip_whitespace();
+
+ switch line {
+ case "disconnect" {
+ println("Disconnecting");
+ onyx_net.peer_send_disconnect(peer);
+ onyx_net.peer_flush_outgoing_commands(peer);
+ onyx_net.peer_disconnect(peer);
+ break;
+ }
+
+ case "send" {
+ packet := new(onyx_net.Packet);
+ packet.flags |= .Reliable;
+ packet.data = "What's Up?";
+ println("Sending what's up...");
+
+ onyx_net.peer_send(peer, 0, packet);
+ }
+ }
+ }
+ });
+
while true {
- for host->get_events(timeout=500) {
+ for host->get_events(timeout=100) {
printf("{*}\n", it);
if it.type == .Connection {
}
}
- if random.between(0, 3) == 0 {
- packet := new(onyx_net.Packet);
- packet.flags |= .Reliable;
- packet.data = "What's Up?";
- println("Sending what's up...");
+ // if random.between(0, 3) == 0 {
+ // packet := new(onyx_net.Packet);
+ // packet.flags |= .Reliable;
+ // packet.data = "What's Up?";
+ // println("Sending what's up...");
- onyx_net.peer_send(peer, 0, packet);
- }
+ // onyx_net.peer_send(peer, 0, packet);
+ // }
+
+ // if random.between(0, 100) == 4 {
+ // println("Disconnecting");
+ // onyx_net.peer_send_disconnect(peer);
+ // onyx_net.peer_flush_outgoing_commands(peer);
+ // onyx_net.peer_disconnect(peer);
+ // break;
+ // }
}
}
}
if it.type == .Disconnection {
- printf("Disconnection: {} {}\n", it.peer.addr, it.peer);
+ printf("Disconnection: {}\n", it.peer.addr);
}
if it.type == .Message {