package onyx_net
+@CLEANUP
+use package core
+
Host :: struct {
socket : net.Socket;
addr : net.Socket_Address;
peers : [] Peer;
- maximum_peers: u32;
+ connected_peers: u32;
+
+ mtu: u32;
+ current_time: u32;
+
+ // Transient data used during processing of an message.
+ packet_data: [2] [Host_Default_MTU] u8;
+ event: Event;
+ produced_event: bool;
+
+ received_data: [] u8;
+ received_addr: net.Socket_Address;
+
+ pulse :: host_pulse;
+ get_events :: host_get_events;
+}
+
+#local Host_Creation_Error :: enum {
+ None;
+ Socket_Creation_Failed;
+ Socket_Binding_Failed;
+}
+
+host_create :: (addr: ^net.Socket_Address, peer_count: u32) -> (^Host, Host_Creation_Error) {
+ errored := false;
+ host := new(Host);
+ defer if errored do cfree(host);
+
+ memory.alloc_slice(^host.peers, peer_count);
+ defer if errored do memory.free_slice(^host.peers);
+ host.connected_peers = 0;
+
+ if socket, err := net.socket_create(.Inet, .Dgram); err != .None {
+ errored = true;
+ return null, .Socket_Creation_Failed;
+
+ } else {
+ host.socket = socket;
+ }
+ defer if errored do net.socket_close(^host.socket);
+
+ if addr != null {
+ if !host.socket->bind(addr.port) {
+ errored = true;
+ return null, .Socket_Binding_Failed;
+ }
+ }
+
+ host.socket->setting(.NonBlocking, 1);
+ host.socket->setting(.Broadcast, 1);
+
+ host.addr = *addr;
+ host.mtu = Host_Default_MTU;
+
+ for^ peer: host.peers {
+ peer.host = host;
+ peer.state = .Disconnected;
+ peer.channels = .{ null, 0 };
+ peer.incoming_id = ~~((cast(u32) peer - cast(u32) host.peers.data) / sizeof Peer);
+ peer.outgoing_id = ~~((cast(u32) peer - cast(u32) host.peers.data) / sizeof Peer);
+ peer.mtu = host.mtu;
+ }
+
+ return host, .None;
+}
+
+host_connect :: (host: ^Host, addr: ^net.Socket_Address, channel_count: u32) -> ^Peer {
+ peer: ^Peer;
+
+ // Find first peer that is in the disconnected state.
+ for^ host.peers do if it.state == .Disconnected {
+ peer = it;
+ break;
+ }
+
+ if peer == null do return null;
+
+ memory.alloc_slice(^peer.channels, channel_count);
+ peer.host = host;
+ peer.state = .Connecting;
+ peer.addr = *addr;
+ peer.connect_id = random.int();
+ peer.mtu = Host_Default_MTU;
+
+ for^ channel: peer.channels {
+ channel.id = ~~((cast(u32) channel - cast(u32) peer.channels.data) / 4);
+ channel.seq_number = 0;
+ channel.max_reliable_windows = 16;
+ memory.alloc_slice(^channel.reliable_windows, channel.max_reliable_windows);
+ }
+
+ command := new(Protocol_Connect);
+ command.command = .Connect;
+ command.command |= .Flag_Acknowledge;
+ command.channel = 255;
+ command.outgoing_peer_id = net.host_to_network(peer.outgoing_id);
+ command.mtu = net.host_to_network(peer.mtu);
+ command.window_size = 0;
+ command.channel_count = net.host_to_network(channel_count);
+ command.connect_id = net.host_to_network(peer.connect_id);
+
+ peer_queue_outgoing_command(peer, command);
+ return peer;
+}
+
+host_free :: (host: ^Host) {
+ net.socket_close(^host.socket);
+ memory.free_slice(^host.peers);
+ cfree(host);
+}
+
+host_broadcast :: (host: ^Host, channel: Channel_ID, packet: ^Packet) {
+ for ^peer: host.peers {
+ if peer.state != .Connected do continue;
+
+ peer_send(peer, channel, packet);
+ }
+}
+
+host_pulse :: (host: ^Host, timeout: u32 = 0) -> bool {
+ host.current_time = ~~ os.time();
+
+ sent_command := host_send_commands(host);
+ received_command := host_receive_commands(host, timeout);
+
+ return host.produced_event;
+}
+
+host_send_commands :: (host: ^Host) -> bool {
+ for ^peer: host.peers {
+ //
+ // Skip over peers that are dead or disconnected.
+ if peer.state == .Disconnected || peer.state == .Zombie {
+ continue;
+ }
+
+ //
+ // Have peers send their acknowledgements for messages they received.
+ // Currently, this will issue its own send() call. A better way is to
+ // pool the messages together using something like sendmsg(), but Onyx
+ // does not currently support that.
+ if peer.acknowledgements.count > 0 {
+ peer_send_acknowledgements(peer);
+ }
+
+ // TODO
+ // Peer pinging
+ // Packet loss calcuations
+ // Dropped packet detection
+
+ peer_flush_outgoing_commands(peer);
+ }
+}
+
+host_receive_commands :: (host: ^Host, timeout: u32 = 0) -> bool {
+ if timeout > 0 {
+ check_sockets := (^net.Socket).[ ^host.socket ];
+ changed_buffer := alloc.array_from_stack(i32, 1);
+ changed := net.socket_poll_all(check_sockets, timeout, changed_buffer);
+
+ if changed.count == 0 do return false;
+ }
+
+ buffer: [] u8 = host.packet_data[0];
+ recv_addr, recv_bytes := host.socket ->recvfrom(buffer);
+
+ host.received_addr = recv_addr;
+ host.received_data = buffer[0 .. recv_bytes];
+
+ return host_handle_incoming_commands(host);
+}
+
+host_handle_incoming_commands :: (host: ^Host) -> bool {
+ header: ^Protocol_Header = ~~ host.received_data.data;
+ current_data := host.received_data;
+
+ peer_id := cast(u32) net.network_to_host(header.peer_id);
+ peer: ^Peer;
+
+ if peer_id == 0xffff --- // a Peer Id of 0xffff is used for a newly connecting peer.
+ elseif peer_id >= host.peers.count || peer_id >= host.peers.count {
+ return false;
+
+ } else {
+ peer = ^host.peers[peer_id];
+ }
+
+ if peer != null {
+ peer.addr = host.received_addr;
+ }
+
+ string.advance(^current_data, sizeof typeof *header);
+
+ return_block :: macro () {
+ printf("DISCARDING: {}\n", peer_id);
+ return host.event.type != .None;
+ }
+
+ while current_data.count > sizeof Protocol_Command_Header {
+ if host.produced_event do break;
+
+ command := cast(^Protocol_Command_Header) current_data.data;
+ command_id := command_get_effective(command.command);
+ printf("Received command: {} {*p}\n", command_sizes[command_id], cast(Command) command_id, command);
+ string.advance(^current_data, command_sizes[command_id]);
+
+ switch command_id {
+ case .Acknowledge {
+ if peer == null do return_block();
+ host_handle_acknowledge_command(host, peer, ~~ command);
+ }
+
+ case .Connect {
+ if peer != null do return_block();
+ peer = host_handle_connect_command(host, ~~ command);
+ if peer == null do return_block();
+ }
+
+ case .Verify_Connect {
+ if peer == null do return_block();
+ host_handle_verify_connect_command(host, peer, ~~ command);
+ }
+
+ case .Disconnect {
+ }
+
+ case .Ping {
+ }
+
+ case .Send_Reliable {
+ if peer == null do return_block();
+ host_handle_send_reliable_command(host, peer, ~~ command, ^current_data);
+ }
+
+ case .Send_Unreliable {
+ }
+
+ case .Send_Unsequenced {
+ }
+ }
+
+ if peer != null && (command.command & .Flag_Acknowledge) != 0 {
+ sent_time := net.network_to_host(header.sent_time);
+
+ switch peer.state {
+ case .Acknowledging_Disconnect {
+ if command_id == .Disconnect {
+ peer_queue_acknowledgement(peer, command, sent_time);
+ }
+ }
+
+ case #default {
+ peer_queue_acknowledgement(peer, command, sent_time);
+ }
+ }
+ }
+ }
+
+ host.event.peer = peer;
+ return host.produced_event;
+}
+
+host_get_events :: (host: ^Host, timeout: u32 = 0) -> Iterator(^Event) {
+ Context :: struct {
+ host: ^Host;
+ timeout: u32 = 0;
+ }
+
+ next :: (use ctx: ^Context) -> (^Event, bool) {
+ if host_pulse(host, timeout) {
+ host.produced_event = false;
+ return ^host.event, true;
+ }
+
+ return null, false;
+ }
+
+ ctx := new(Context);
+ ctx.host = host;
+ ctx.timeout = timeout;
+ return .{ ctx, next, null_proc };
+}
+
+#local host_notify_connect :: (host: ^Host, peer: ^Peer) {
+ host.event.type = .Connection;
+ host.event.peer = peer;
+ host.event.channel_id = 255; // Magic constant
+ host.event.data = .[];
+
+ host.produced_event = true;
+}
+
+#local host_notify_disconnect :: (host: ^Host, peer: ^Peer) {
+ host.event.type = .Disconnection;
+ host.event.peer = peer;
+ host.event.channel_id = 255; // Magic constant
+ host.event.data = .[];
+
+ host.produced_event = true;
+}
+
+#local host_notify_message :: (host: ^Host, peer: ^Peer, channel_id: u8, data: [] u8) {
+ host.event.type = .Message;
+ host.event.peer = peer;
+ host.event.channel_id = channel_id;
+ host.event.data = data;
+
+ host.produced_event = true;
+}
+
+#local host_handle_acknowledge_command :: (host: ^Host, peer: ^Peer, command: ^Protocol_Acknowledge) {
+ if peer.state == .Disconnected || peer.state == .Zombie do return;
+
+ recv_sent_time := cast(u32) net.network_to_host(command.recv_sent_time);
+ recv_sent_time |= host.current_time & 0xFFFF0000;
+ if (recv_sent_time & 0x8000) > (host.current_time & 0x8000) {
+ recv_sent_time -= 0x10000;
+ }
+
+ //
+ // If the acknowledgement is for a message in the future, ignore it.
+ if host.current_time < recv_sent_time {
+ return;
+ }
+
+ recv_seq_number := net.network_to_host(command.recv_seq_number);
+ removed_command_id := peer_remove_sent_reliable_command(peer, recv_seq_number, command.channel);
+ printf("Received acknowledgement for {}.\n", removed_command_id);
+
+ switch peer.state {
+ case .Acknowledging_Connection {
+ if removed_command_id == .Verify_Connect {
+ peer.state = .Connected;
+ host_notify_connect(host, peer);
+ }
+ }
+
+ case .Disconnecting {
+ if removed_command_id == .Disconnect {
+ host_notify_disconnect(host, peer);
+ }
+ }
+
+ case .Disconnect_Later {
+ if peer.outgoing_commands.count == 0 {
+ peer_disconnect(peer);
+ }
+ }
+ }
+}
+
+#local host_handle_connect_command :: (host: ^Host, command: ^Protocol_Connect) -> ^Peer {
+ peer: ^Peer;
+
+ for^ host.peers {
+ if it.state == .Disconnected {
+ peer = it;
+ break;
+
+ } elseif it.state == .Connecting && it.addr.addr == host.received_addr.addr {
+ if it.addr.port == host.received_addr.port && it.connect_id == command.connect_id do return null;
+ }
+ }
+
+ if peer == null do return null;
+
+ channel_count := net.network_to_host(command.channel_count);
+ memory.alloc_slice(^peer.channels, channel_count);
+ memory.set(peer.channels.data, 0, channel_count * sizeof Channel);
+
+ peer.state = .Acknowledging_Connection;
+ peer.connect_id = net.network_to_host(command.connect_id);
+ peer.addr = host.received_addr;
+ peer.outgoing_id = net.network_to_host(command.outgoing_peer_id);
+ peer.mtu = net.network_to_host(command.mtu);
+
+ verify := new(Protocol_Verify_Connect);
+ verify.command = .Verify_Connect;
+ verify.command |= .Flag_Acknowledge;
+ verify.channel = 255;
+ verify.channel_count = net.host_to_network(peer.channels.count);
+ verify.outgoing_peer_id = net.host_to_network(peer.incoming_id);
+ verify.mtu = net.host_to_network(peer.mtu);
+ verify.window_size = command.window_size;
+ verify.connect_id = net.host_to_network(peer.connect_id);
+
+ peer_queue_outgoing_command(peer, verify);
+
+ return peer;
}
+#local host_handle_verify_connect_command :: (host: ^Host, peer: ^Peer, command: ^Protocol_Verify_Connect) {
+ if net.network_to_host(command.connect_id) != peer.connect_id do return;
+ channel_count := net.network_to_host(command.channel_count);
+
+ //
+ // These magic constants refer to the fact that the connection
+ // packet will have sequence number 1, and channel id 255.
+ peer_remove_sent_reliable_command(peer, 1, 255);
+
+ peer.channels.count = math.min(peer.channels.count, channel_count);
+ peer.outgoing_id = net.network_to_host(command.outgoing_peer_id);
+
+ peer.mtu = math.min(peer.mtu, net.network_to_host(command.mtu));
+ peer.window_size = math.min(peer.window_size, net.network_to_host(command.window_size));
+
+ peer.state = .Connected;
+ host_notify_connect(host, peer);
+ return;
+}
+
+//
+// This is slightly misnamed, as it is actually handling a received reliable message.
+#local host_handle_send_reliable_command :: (host: ^Host, peer: ^Peer, command: ^Protocol_Send, data: ^[] u8) {
+ host_notify_message(host, peer, command.channel, *data);
+ string.advance(data, ~~ command.data_length);
+}
+
+
+@Relocate
+Event :: struct {
+ Type :: enum {
+ None;
+ Connection;
+ Disconnection;
+ Message;
+ }
+
+ type: Type;
+ peer: ^Peer;
+ channel_id: Channel_ID;
+ data: [] u8;
+}
+
+#local {
+ Host_Default_MTU :: 1400
+}
Acknowledging_Connection;
Connection_Pending;
Connection_Successed;
- Connection;
+ Connected;
Disconnect_Later;
Disconnecting;
Acknowledging_Disconnect;
state: Peer_State;
channels: [] Channel;
+
+ incoming_id: u16;
+ outgoing_id: u16;
+ connect_id: u32;
+ mtu: u32;
+ window_size: u32;
+ last_send_time: u32;
+
+ outgoing_reliable_seq_number: u16;
+ incoming_reliable_seq_number: u16;
+ outgoing_unsequenced_group: u32;
+
+ outgoing_commands: [..] ^Outgoing_Command;
+ incoming_commands: [..] ^Incoming_Command;
+ acknowledgements: [..] Acknowledgement;
+ sent_reliable_commands: [..] ^Outgoing_Command;
+}
+
+peer_destroy :: (peer: ^Peer) {
+ memory.free_slice(^peer.channels);
+ array.free(^peer.outgoing_commands);
+ array.free(^peer.incoming_commands);
+}
+
+peer_disconnect :: (peer: ^Peer) {
+ if peer.state == .Disconnected do return;
+
+ peer.state = .Disconnected;
+
+ peer_destroy(peer);
+}
+
+peer_queue_outgoing_command :: #match {
+ (peer: ^Peer, command: ^Protocol_Command_Header, packet: ^Packet = null) -> ^Outgoing_Command {
+ out := new(Outgoing_Command);
+ out.command = command;
+ out.packet = packet;
+
+ peer_setup_outgoing_command(peer, out);
+ return out;
+ }
+}
+
+peer_setup_outgoing_command :: (peer: ^Peer, command: ^Outgoing_Command) {
+ if command.command.channel == 255 {
+ peer.outgoing_reliable_seq_number += 1;
+ command.reliable_seq_number = peer.outgoing_reliable_seq_number;
+ command.unreliable_seq_number = 0;
+
+ } else {
+ channel := ^peer.channels[command.command.channel];
+
+ // Oof... That's a long chain of command.
+ if (command.command.command & .Flag_Acknowledge) != 0 {
+ channel.outgoing_reliable_seq_number += 1;
+ command.reliable_seq_number = channel.outgoing_reliable_seq_number;
+ command.unreliable_seq_number = 0;
+
+ } elseif (command.command.command & .Flag_Unsequenced) != 0 {
+ peer.outgoing_unsequenced_group += 1;
+ command.reliable_seq_number = 0;
+ command.unreliable_seq_number = 0;
+
+ } else {
+ channel.outgoing_unreliable_seq_number += 1;
+
+ command.reliable_seq_number = channel.outgoing_reliable_seq_number;
+ command.unreliable_seq_number = channel.outgoing_unreliable_seq_number;
+ }
+ }
+
+ command.send_attempts = 0;
+ command.sent_time = 0;
+ command.command.seq_number = net.host_to_network(command.reliable_seq_number);
+
+ peer.outgoing_commands << command;
+}
+
+peer_send :: (peer: ^Peer, channel_id: Channel_ID, packet: ^Packet) -> bool {
+ if peer.state != .Connected || ~~channel_id > peer.channels.count do return false;
+
+ channel := ^peer.channels[channel_id];
+
+ @TODO // Fragmented sending
+ if packet.data.count > peer.mtu - sizeof Protocol_Header - sizeof Protocol_Send do return false;
+
+ command := new(Protocol_Send);
+ command.channel = channel_id;
+ command.data_length = net.host_to_network(cast(u16) packet.data.count);
+
+ if packet.flags & .Reliable {
+ command.command = .Send_Reliable;
+ command.command |= .Flag_Acknowledge;
+ } elseif packet.flags & .Unsequenced {
+ command.command = .Send_Unsequenced;
+ command.command |= .Flag_Unsequenced;
+ } else {
+ command.command = .Send_Unreliable;
+ }
+
+ return peer_queue_outgoing_command(peer, command, packet) != null;
+}
+
+peer_queue_acknowledgement :: (peer: ^Peer, command: ^Protocol_Command_Header, sent_time: u16) {
+ peer.acknowledgements << .{ sent_time, command };
+}
+
+peer_flush_outgoing_commands :: (peer: ^Peer) -> i32 {
+ peer.last_send_time = peer.host.current_time;
+
+ send_buffer: [65535] u8;
+
+ total_sent := 0;
+ for peer.outgoing_commands {
+ to_send, success := it->pack_into_buffer(peer, send_buffer);
+ if !success do continue;
+
+ sent_bytes := peer.host.socket->sendto(to_send, ^peer.addr);
+ if sent_bytes < 0 {
+ return -1;
+ }
+
+ total_sent += sent_bytes;
+ peer.sent_reliable_commands << it;
+ }
+
+ array.clear(^peer.outgoing_commands);
+ return total_sent;
+}
+
+peer_send_acknowledgements :: (peer: ^Peer) {
+ send_buffer: [65535] u8;
+
+ for ^ack: peer.acknowledgements {
+ command: Protocol_Acknowledge;
+ command.command = .Acknowledge;
+ command.channel = ack.command.channel;
+ command.seq_number = 0;
+ command.recv_seq_number = ack.command.seq_number;
+ command.recv_sent_time = ack.sent_time;
+
+ out: Outgoing_Command;
+ out.command = ^command;
+
+ to_send, success := out->pack_into_buffer(peer, send_buffer);
+ if !success do continue;
+
+ sent_bytes := peer.host.socket->sendto(to_send, ^peer.addr);
+ if sent_bytes < 0 do return;
+ }
+
+ array.clear(^peer.acknowledgements);
}
+peer_remove_sent_reliable_command :: (peer: ^Peer, seq_num: u16, channel: Channel_ID) -> Command {
+ command: ^Outgoing_Command;
+ index := 0;
+ for peer.sent_reliable_commands {
+ if it.reliable_seq_number == seq_num && it.command.channel == channel {
+ command = it;
+ break;
+ }
+
+ index += 1;
+ }
+
+ if command == null do return .None;
+
+ defer {
+ cfree(command.command);
+ cfree(command);
+ array.fast_delete(^peer.sent_reliable_commands, index);
+ }
+ return command_get_effective(command.command.command);
+}
+Channel_ID :: u8;
+
Channel :: struct {
+ id: Channel_ID;
seq_number: u16;
-
+
+ reliable_windows: [] u16;
+ max_reliable_windows: u32;
+
+ outgoing_unreliable_seq_number: u16;
+ outgoing_reliable_seq_number: u16;
+ incoming_reliable_seq_number: u16;
+
}
+
+