crasa-transport/src/udp.zig

163 lines
5.6 KiB
Zig

// a bidirectional reliable udp transport
//
// I'm going to call this the CRU protocol
//
// stands for crasa-reliable-udp
//
// packet layout of the payload
// this results in a non blocking transport engine where you can poll for when new messages are
// available. And the messages are sent through as fast as they can be, mangled/unacked messages
// are replayed by the server over time.
//
// messages will arrive from the server to the client /eventually/
//
// they will arrive out of order as well.
pub const UdpEngine = struct {
allocator: std.mem.Allocator,
port: u16 = 6700,
signalExit: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
address: std.net.Address,
pub fn create(allocator: std.mem.Allocator) !*@This() {
const self = try allocator.create(@This());
self.* = .{
.allocator = allocator,
.address = try std.net.Address.parseIp4("0.0.0.0", 5000),
};
return self;
}
// requires a memory safe allocator, as this is the Allocator
// that will be used to create the backing allocator on the working thread.
pub fn serve(self: *@This(), port: u16) !void {
self.port = port;
self.address = try std.net.Address.parseIp4("0.0.0.0", port);
const thread = try std.Thread.spawn(.{}, startServer, .{self});
thread.detach();
}
pub fn connect(self: *@This(), host: []const u8, port: u16) !void {
log.info("parsing host '{s}'", .{host});
const resolved_host = if (std.mem.eql(u8, host, "localhost"))
"127.0.0.1"
else
host;
self.address = try std.net.Address.resolveIp(resolved_host, port);
self.port = port;
const thread = try std.Thread.spawn(.{}, startClient, .{self});
thread.detach();
}
pub fn startServer(self: *@This()) void {
const sockfd = posix.socket(
self.address.any.family,
posix.SOCK.DGRAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK,
posix.IPPROTO.UDP,
) catch |err| {
log.err("server socket error: {any}", .{err});
return;
};
defer posix.close(sockfd);
posix.bind(sockfd, &self.address.any, self.address.getOsSockLen()) catch |err| {
log.err("server bind error: {any}", .{err});
return;
};
log.info("udp server listening on port {d}", .{self.address.getPort()});
var buffer: [1024]u8 = undefined;
while (!self.signalExit.load(.monotonic)) {
var src_addr: posix.sockaddr = undefined;
var src_len: posix.socklen_t = @sizeOf(posix.sockaddr);
const n = posix.recvfrom(sockfd, buffer[0..], 0, &src_addr, &src_len) catch |err| switch (err) {
error.WouldBlock => {
std.Thread.sleep(20 * std.time.ns_per_ms);
continue;
},
else => {
log.err("server recv error: {any}", .{err});
break;
},
};
const addr = std.net.Address.initPosix(@alignCast(&src_addr));
log.info("server recv {d} bytes from {f}", .{ n, addr });
const msg = buffer[0..n];
if (std.mem.eql(u8, msg, "HELLO")) {
_ = posix.sendto(sockfd, "HELLO", 0, &src_addr, src_len) catch |err| {
log.err("server send error: {any}", .{err});
continue;
};
log.info("server replied HELLO", .{});
}
}
}
pub fn startClient(self: *@This()) void {
const sockfd = posix.socket(
self.address.any.family,
posix.SOCK.DGRAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK,
posix.IPPROTO.UDP,
) catch |err| {
log.err("client socket error: {any}", .{err});
return;
};
defer posix.close(sockfd);
const local_addr = std.net.Address.parseIp4("0.0.0.0", 0) catch |err| {
log.err("client local addr parse error: {any}", .{err});
return;
};
posix.bind(sockfd, &local_addr.any, local_addr.getOsSockLen()) catch |err| {
log.err("client bind error: {any}", .{err});
return;
};
_ = posix.sendto(sockfd, "HELLO", 0, &self.address.any, self.address.getOsSockLen()) catch |err| {
log.err("client send error: {any}", .{err});
return;
};
log.info("client sent HELLO to {f}", .{self.address});
var buffer: [1024]u8 = undefined;
while (!self.signalExit.load(.monotonic)) {
var src_addr: posix.sockaddr = undefined;
var src_len: posix.socklen_t = @sizeOf(posix.sockaddr);
const n = posix.recvfrom(sockfd, buffer[0..], 0, &src_addr, &src_len) catch |err| switch (err) {
error.WouldBlock => {
std.Thread.sleep(20 * std.time.ns_per_ms);
continue;
},
else => {
log.err("client recv error: {any}", .{err});
break;
},
};
const msg = buffer[0..n];
log.info("client recv {d} bytes", .{n});
if (std.mem.eql(u8, msg, "HELLO")) {
log.info("client received HELLO reply", .{});
}
}
}
pub fn shouldExit(self: *@This()) bool {
return self.signalExit.load(.monotonic);
}
pub fn destroy(self: *@This()) void {
self.allocator.destroy(self);
}
};
const std = @import("std");
const posix = std.posix;
const log = @import("logging.zig").log;