From 69528a1b72bc578430e3a3e12d7cd63680986c29 Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Sun, 4 Jan 2026 20:25:30 -0500 Subject: Probe for optimal network buffer size. We want to match the underlying system socket buffer. Filling this buffer minimizes the number of syscalls we do. Larger would be a waste. Also changed parser to use enums that more closely match the NATS message types. --- src/server/Client.zig | 18 ++--- src/server/Server.zig | 172 ++++++++++++++++++++++++++++-------------- src/server/message_parser.zig | 66 ++++++++-------- 3 files changed, 159 insertions(+), 97 deletions(-) diff --git a/src/server/Client.zig b/src/server/Client.zig index 53a66b9..1e93360 100644 --- a/src/server/Client.zig +++ b/src/server/Client.zig @@ -41,29 +41,29 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { for (0..len) |i| { const msg = msgs[i]; defer switch (msg) { - .msg => |m| m.deinit(alloc), - .hmsg => |h| h.deinit(alloc), + .MSG => |m| m.deinit(alloc), + .HMSG => |h| h.deinit(alloc), else => {}, }; errdefer for (msgs[i + 1 .. len]) |mg| switch (mg) { - .msg => |m| { + .MSG => |m| { m.deinit(alloc); }, else => {}, }; switch (msg) { - .@"+ok" => { + .@"+OK" => { _ = try self.to_client.write("+OK\r\n"); }, - .pong => { + .PONG => { _ = try self.to_client.write("PONG\r\n"); }, - .info => |info| { + .INFO => |info| { _ = try self.to_client.write("INFO "); try std.json.Stringify.value(info, .{}, self.to_client); _ = try self.to_client.write("\r\n"); }, - .msg => |m| { + .MSG => |m| { @branchHint(.likely); try self.to_client.print( "MSG {s} {s} {s} {d}\r\n{s}\r\n", @@ -76,7 +76,7 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { }, ); }, - .hmsg => |hmsg| { + .HMSG => |hmsg| { @branchHint(.likely); try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n{s}\r\n", .{ hmsg.msg.subject, @@ -87,7 +87,7 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { hmsg.msg.payload, }); }, - .@"-err" => |s| { + .@"-ERR" => |s| { _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); }, else => |m| { diff --git a/src/server/Server.zig b/src/server/Server.zig index eaecdf2..f7b849c 100644 --- a/src/server/Server.zig +++ b/src/server/Server.zig @@ -4,6 +4,7 @@ const ArrayList = std.ArrayList; const AutoHashMapUnmanaged = std.AutoHashMapUnmanaged; const Io = std.Io; +const Dir = Io.Dir; const Group = Io.Group; const IpAddress = std.Io.net.IpAddress; const Mutex = Io.Mutex; @@ -21,6 +22,7 @@ pub const Subscription = struct { subject: []const u8, client_id: usize, sid: []const u8, + queue: *Queue(Message), fn deinit(self: Subscription, alloc: Allocator) void { alloc.free(self.subject); @@ -63,13 +65,24 @@ pub fn start(server: *Server, io: Io, gpa: Allocator) !void { var client_group: Group = .init; defer client_group.cancel(io); + const read_buffer_size, const write_buffer_size = getBufferSizes(io); + log.debug("read buf: {d} write buf: {d}", .{ read_buffer_size, write_buffer_size }); + var id: usize = 0; while (true) : (id +%= 1) { if (server.clients.contains(id)) continue; log.debug("Accepting next client", .{}); const stream = try tcp_server.accept(io); log.debug("Accepted connection {d}", .{id}); - _ = client_group.concurrent(io, handleConnectionInfallible, .{ server, gpa, io, id, stream }) catch { + _ = client_group.concurrent(io, handleConnectionInfallible, .{ + server, + gpa, + io, + id, + stream, + read_buffer_size, + write_buffer_size, + }) catch { log.err("Could not start concurrent handler for {d}", .{id}); stream.close(io); }; @@ -96,13 +109,29 @@ fn removeClient(server: *Server, io: Io, allocator: Allocator, id: usize) void { } } -fn handleConnectionInfallible(server: *Server, server_allocator: Allocator, io: Io, id: usize, stream: Stream) void { - handleConnection(server, server_allocator, io, id, stream) catch |err| { +fn handleConnectionInfallible( + server: *Server, + server_allocator: Allocator, + io: Io, + id: usize, + stream: Stream, + r_buf_size: usize, + w_buf_size: usize, +) void { + handleConnection(server, server_allocator, io, id, stream, r_buf_size, w_buf_size) catch |err| { log.err("Failed processing client {d}: {any}", .{ id, err }); }; } -fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: usize, stream: Stream) !void { +fn handleConnection( + server: *Server, + server_allocator: Allocator, + io: Io, + id: usize, + stream: Stream, + r_buf_size: usize, + w_buf_size: usize, +) !void { defer stream.close(io); // TODO: use a client allocator for things that should only live for as long as the client? @@ -111,26 +140,27 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us // messages when done processing them (usually outside the client process, ie: publish). // Set up client writer - // TODO: how many bytes can fit in a network write syscall? cat /proc/sys/net/core/wmem_max - var w_buffer: [212992]u8 = undefined; - var writer = stream.writer(io, &w_buffer); + const w_buffer: []u8 = try server_allocator.alloc(u8, w_buf_size); + defer server_allocator.free(w_buffer); + var writer = stream.writer(io, w_buffer); const out = &writer.interface; // Set up client reader - // TODO: how many bytes can fit in a network read syscall? cat /proc/sys/net/core/rmem_max - var r_buffer: [212992]u8 = undefined; - var reader = stream.reader(io, &r_buffer); + const r_buffer: []u8 = try server_allocator.alloc(u8, r_buf_size); + defer server_allocator.free(r_buffer); + var reader = stream.reader(io, r_buffer); const in = &reader.interface; // Set up buffer queue - var qbuf: [r_buffer.len / @sizeOf(Message)]Message = undefined; - var queue: Queue(Message) = .init(&qbuf); + const qbuf: []Message = try server_allocator.alloc(Message, r_buffer.len / @sizeOf(Message)); + defer server_allocator.free(qbuf); + var queue: Queue(Message) = .init(qbuf); defer { queue.close(io); while (queue.getOne(io)) |msg| { switch (msg) { - .msg => |m| m.deinit(server_allocator), - .hmsg => |h| h.deinit(server_allocator), + .MSG => |m| m.deinit(server_allocator), + .HMSG => |h| h.deinit(server_allocator), else => {}, } } else |_| {} @@ -144,7 +174,7 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us defer server.removeClient(io, server_allocator, id); // Do initial handshake with client - try queue.putOne(io, .{ .info = server.info }); + try queue.putOne(io, .{ .INFO = server.info }); var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator }); defer client_task.cancel(io) catch {}; @@ -152,27 +182,28 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us // Messages are owned by the server after they are received from the client while (client.next(server_allocator)) |msg| { switch (msg) { - .ping => { + .PING => { // Respond to ping with pong. - try client.send(io, .pong); + try client.send(io, .PONG); }, - .@"pub", .hpub => { + .PUB, .HPUB => { + @branchHint(.likely); defer switch (msg) { - .@"pub" => |pb| pb.deinit(server_allocator), - .hpub => |hp| hp.deinit(server_allocator), + .PUB => |pb| pb.deinit(server_allocator), + .HPUB => |hp| hp.deinit(server_allocator), else => unreachable, }; try server.publishMessage(io, server_allocator, &client, msg); }, - .sub => |sub| { + .SUB => |sub| { defer sub.deinit(server_allocator); - try server.subscribe(io, server_allocator, id, sub); + try server.subscribe(io, server_allocator, id, &queue, sub); }, - .unsub => |unsub| { + .UNSUB => |unsub| { defer unsub.deinit(server_allocator); try server.unsubscribe(io, server_allocator, id, unsub); }, - .connect => |connect| { + .CONNECT => |connect| { if (client.connect) |*current| { current.deinit(server_allocator); } @@ -227,53 +258,46 @@ test subjectMatches { } fn publishMessage(server: *Server, io: Io, alloc: Allocator, source_client: *Client, msg: Message) !void { - errdefer { - if (source_client.connect) |c| { - if (c.verbose) { - source_client.send(io, .{ .@"-err" = "Slow Consumer" }) catch {}; - } + defer if (source_client.connect) |c| { + if (c.verbose) { + source_client.send(io, .@"+OK") catch {}; } - } + }; + const subject = switch (msg) { - .@"pub" => |pb| pb.subject, - .hpub => |hp| hp.@"pub".subject, + .PUB => |pb| pb.subject, + .HPUB => |hp| hp.@"pub".subject, else => unreachable, }; try server.subs_lock.lock(io); defer server.subs_lock.unlock(io); for (server.subscriptions.items) |subscription| { if (subjectMatches(subscription.subject, subject)) { - const client = server.clients.get(subscription.client_id) orelse { - log.debug("Trying to publish to a client that no longer exists: {d}\n", .{subscription.client_id}); - continue; - }; - switch (msg) { - .@"pub" => |pb| client.send(io, .{ - .msg = try pb.toMsg(alloc, subscription.sid), - }) catch |err| switch (err) { - error.Canceled => return err, - else => {}, + .PUB => |pb| { + try subscription.queue.putOne(io, .{ + .MSG = try pb.toMsg(alloc, subscription.sid), + }); }, - .hpub => |hp| client.send(io, .{ .hmsg = try hp.toHMsg( - alloc, - subscription.sid, - ) }) catch |err| switch (err) { - error.Canceled => return err, - else => {}, + .HPUB => |hp| { + try subscription.queue.putOne(io, .{ + .HMSG = try hp.toHMsg(alloc, subscription.sid), + }); }, else => unreachable, } } } - if (source_client.connect) |c| { - if (c.verbose) { - source_client.send(io, .@"+ok") catch {}; - } - } } -fn subscribe(server: *Server, io: Io, gpa: Allocator, id: usize, msg: Message.Sub) !void { +fn subscribe( + server: *Server, + io: Io, + gpa: Allocator, + id: usize, + queue: *Queue(Message), + msg: Message.Sub, +) !void { try server.subs_lock.lock(io); defer server.subs_lock.unlock(io); const subject = try gpa.dupe(u8, msg.subject); @@ -284,10 +308,17 @@ fn subscribe(server: *Server, io: Io, gpa: Allocator, id: usize, msg: Message.Su .subject = subject, .client_id = id, .sid = sid, + .queue = queue, }); } -fn unsubscribe(server: *Server, io: Io, gpa: Allocator, id: usize, msg: Message.Unsub) !void { +fn unsubscribe( + server: *Server, + io: Io, + gpa: Allocator, + id: usize, + msg: Message.Unsub, +) !void { try server.subs_lock.lock(io); defer server.subs_lock.unlock(io); const len = server.subscriptions.items.len; @@ -301,5 +332,36 @@ fn unsubscribe(server: *Server, io: Io, gpa: Allocator, id: usize, msg: Message. } } +const parseUnsigned = std.fmt.parseUnsigned; + +fn getBufferSizes(io: Io) struct { usize, usize } { + const default_size = 4 * 1024; + const default = .{ default_size, default_size }; + + const dir = Dir.openDirAbsolute(io, "/proc/sys/net/core", .{}) catch { + log.err("couldn't open /proc/sys/net/core", .{}); + return default; + }; + + var buf: [64]u8 = undefined; + + const rmem_max = readBufferSize(io, dir, "rmem_max", &buf, default_size); + const wmem_max = readBufferSize(io, dir, "wmem_max", &buf, default_size); + + return .{ rmem_max, wmem_max }; +} + +fn readBufferSize(io: Io, dir: anytype, filename: []const u8, buf: []u8, default: usize) usize { + const bytes = dir.readFile(io, filename, buf) catch |err| { + log.err("couldn't open {s}: {any}", .{ filename, err }); + return default; + }; + + return parseUnsigned(usize, bytes[0 .. bytes.len - 1], 10) catch |err| { + log.err("couldn't parse {s}: {any}", .{ bytes[0 .. bytes.len - 1], err }); + return default; + }; +} + pub const default_id = "server-id-123"; pub const default_name = "Zits Server"; diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index 1e7527d..8b4859b 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -20,18 +20,18 @@ const log = std.log; pub const MessageType = @typeInfo(Message).@"union".tag_type.?; pub const Message = union(enum) { - info: ServerInfo, - connect: Connect, - @"pub": Pub, - hpub: HPub, - sub: Sub, - unsub: Unsub, - msg: Msg, - hmsg: HMsg, - ping, - pong, - @"+ok": void, - @"-err": []const u8, + INFO: ServerInfo, + CONNECT: Connect, + PUB: Pub, + HPUB: HPub, + SUB: Sub, + UNSUB: Unsub, + MSG: Msg, + HMSG: HMsg, + PING, + PONG, + @"+OK": void, + @"-ERR": []const u8, pub const ServerInfo = struct { /// The unique identifier of the NATS server. server_id: []const u8, @@ -220,15 +220,15 @@ pub const Message = union(enum) { const client_types = StaticStringMap(MessageType).initComptime( .{ // {"INFO", .info}, - .{ "CONNECT", .connect }, - .{ "PUB", .@"pub" }, - .{ "HPUB", .hpub }, - .{ "SUB", .sub }, - .{ "UNSUB", .unsub }, + .{ @tagName(.CONNECT), .CONNECT }, + .{ @tagName(.PUB), .PUB }, + .{ @tagName(.HPUB), .HPUB }, + .{ @tagName(.SUB), .SUB }, + .{ @tagName(.UNSUB), .UNSUB }, // {"MSG", .msg}, // {"HMSG", .hmsg}, - .{ "PING", .ping }, - .{ "PONG", .pong }, + .{ @tagName(.PING), .PING }, + .{ @tagName(.PONG), .PONG }, // {"+OK", .@"+ok"}, // {"-ERR", .@"-err"}, }, @@ -267,7 +267,7 @@ pub const Message = union(enum) { errdefer log.err("Failed to parse {s}", .{operation_string.items}); switch (operation) { - .connect => { + .CONNECT => { // for storing the json string var connect_string_writer_allocating: AllocatingWriter = .init(alloc); defer connect_string_writer_allocating.deinit(); @@ -295,28 +295,28 @@ pub const Message = union(enum) { .{ .allocate = .alloc_always }, ); - return .{ .connect = try res.dupe(alloc) }; + return .{ .CONNECT = try res.dupe(alloc) }; }, - .@"pub" => { + .PUB => { @branchHint(.likely); return parsePub(alloc, in); }, - .hpub => { + .HPUB => { @branchHint(.likely); return parseHPub(alloc, in); }, - .ping => { + .PING => { try expectStreamBytes(in, "\r\n"); - return .ping; + return .PING; }, - .pong => { + .PONG => { try expectStreamBytes(in, "\r\n"); - return .pong; + return .PONG; }, - .sub => { + .SUB => { return parseSub(alloc, in); }, - .unsub => { + .UNSUB => { return parseUnsub(alloc, in); }, else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}), @@ -385,7 +385,7 @@ fn parseSub(alloc: Allocator, in: *Reader) !Message { } return .{ - .sub = .{ + .SUB = .{ .subject = subject, .queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null, .sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc), @@ -546,7 +546,7 @@ fn parseUnsub(alloc: Allocator, in: *Reader) !Message { } return .{ - .unsub = .{ + .UNSUB = .{ .sid = try first.toOwnedSlice(alloc), .max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null, }, @@ -671,7 +671,7 @@ fn parsePub(alloc: Allocator, in: *Reader) !Message { try expectStreamBytes(in, "\r\n"); return .{ - .@"pub" = .{ + .PUB = .{ .subject = subject, .payload = try payload.toOwnedSlice(), .reply_to = reply_to, @@ -843,7 +843,7 @@ fn parseHPub(alloc: Allocator, in: *Reader) !Message { try expectStreamBytes(in, "\r\n"); return .{ - .hpub = .{ + .HPUB = .{ .header_bytes = header_bytes, .@"pub" = .{ .subject = subject, -- cgit