From 69528a1b72bc578430e3a3e12d7cd63680986c29 Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Sun, 4 Jan 2026 20:25:30 -0500 Subject: Probe for optimal network buffer size. We want to match the underlying system socket buffer. Filling this buffer minimizes the number of syscalls we do. Larger would be a waste. Also changed parser to use enums that more closely match the NATS message types. --- src/server/message_parser.zig | 66 +++++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 33 deletions(-) (limited to 'src/server/message_parser.zig') diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index 1e7527d..8b4859b 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -20,18 +20,18 @@ const log = std.log; pub const MessageType = @typeInfo(Message).@"union".tag_type.?; pub const Message = union(enum) { - info: ServerInfo, - connect: Connect, - @"pub": Pub, - hpub: HPub, - sub: Sub, - unsub: Unsub, - msg: Msg, - hmsg: HMsg, - ping, - pong, - @"+ok": void, - @"-err": []const u8, + INFO: ServerInfo, + CONNECT: Connect, + PUB: Pub, + HPUB: HPub, + SUB: Sub, + UNSUB: Unsub, + MSG: Msg, + HMSG: HMsg, + PING, + PONG, + @"+OK": void, + @"-ERR": []const u8, pub const ServerInfo = struct { /// The unique identifier of the NATS server. server_id: []const u8, @@ -220,15 +220,15 @@ pub const Message = union(enum) { const client_types = StaticStringMap(MessageType).initComptime( .{ // {"INFO", .info}, - .{ "CONNECT", .connect }, - .{ "PUB", .@"pub" }, - .{ "HPUB", .hpub }, - .{ "SUB", .sub }, - .{ "UNSUB", .unsub }, + .{ @tagName(.CONNECT), .CONNECT }, + .{ @tagName(.PUB), .PUB }, + .{ @tagName(.HPUB), .HPUB }, + .{ @tagName(.SUB), .SUB }, + .{ @tagName(.UNSUB), .UNSUB }, // {"MSG", .msg}, // {"HMSG", .hmsg}, - .{ "PING", .ping }, - .{ "PONG", .pong }, + .{ @tagName(.PING), .PING }, + .{ @tagName(.PONG), .PONG }, // {"+OK", .@"+ok"}, // {"-ERR", .@"-err"}, }, @@ -267,7 +267,7 @@ pub const Message = union(enum) { errdefer log.err("Failed to parse {s}", .{operation_string.items}); switch (operation) { - .connect => { + .CONNECT => { // for storing the json string var connect_string_writer_allocating: AllocatingWriter = .init(alloc); defer connect_string_writer_allocating.deinit(); @@ -295,28 +295,28 @@ pub const Message = union(enum) { .{ .allocate = .alloc_always }, ); - return .{ .connect = try res.dupe(alloc) }; + return .{ .CONNECT = try res.dupe(alloc) }; }, - .@"pub" => { + .PUB => { @branchHint(.likely); return parsePub(alloc, in); }, - .hpub => { + .HPUB => { @branchHint(.likely); return parseHPub(alloc, in); }, - .ping => { + .PING => { try expectStreamBytes(in, "\r\n"); - return .ping; + return .PING; }, - .pong => { + .PONG => { try expectStreamBytes(in, "\r\n"); - return .pong; + return .PONG; }, - .sub => { + .SUB => { return parseSub(alloc, in); }, - .unsub => { + .UNSUB => { return parseUnsub(alloc, in); }, else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}), @@ -385,7 +385,7 @@ fn parseSub(alloc: Allocator, in: *Reader) !Message { } return .{ - .sub = .{ + .SUB = .{ .subject = subject, .queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null, .sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc), @@ -546,7 +546,7 @@ fn parseUnsub(alloc: Allocator, in: *Reader) !Message { } return .{ - .unsub = .{ + .UNSUB = .{ .sid = try first.toOwnedSlice(alloc), .max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null, }, @@ -671,7 +671,7 @@ fn parsePub(alloc: Allocator, in: *Reader) !Message { try expectStreamBytes(in, "\r\n"); return .{ - .@"pub" = .{ + .PUB = .{ .subject = subject, .payload = try payload.toOwnedSlice(), .reply_to = reply_to, @@ -843,7 +843,7 @@ fn parseHPub(alloc: Allocator, in: *Reader) !Message { try expectStreamBytes(in, "\r\n"); return .{ - .hpub = .{ + .HPUB = .{ .header_bytes = header_bytes, .@"pub" = .{ .subject = subject, -- cgit