From 5471cbe15587eacedac6a2672b696b7bfdbad694 Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Wed, 19 Nov 2025 09:49:32 -0500 Subject: Process pub --- src/server/message_parser.zig | 73 ++++++++++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 28 deletions(-) (limited to 'src/server/message_parser.zig') diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index 369f326..917ea70 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -47,7 +47,6 @@ pub const MessageType = enum { }, ); fn parseStaticStringMap(input: []const u8) ?MessageType { - std.debug.print("input: '{s}'\n", .{input}); return client_types.get(input); } @@ -88,7 +87,7 @@ const Message = union(MessageType) { }; const Pub = struct { subject: []const u8, - reply_to: ?[]const u8, + reply_to: ?[]const u8 = null, bytes: usize, payload: []const u8, }; @@ -102,13 +101,15 @@ fn parseJsonMessage(T: type, alloc: std.mem.Allocator, in: *std.Io.Reader) !T { fn parsePub(in: *std.Io.Reader) !Message.Pub { const subject = (try in.takeDelimiter(' ')) orelse return error.EndOfStream; - const next = (try in.takeDelimiter(' ')) orelse return error.EndOfStream; + const next = (try in.takeDelimiter('\r')) orelse return error.EndOfStream; + std.debug.print("next: '{s}'\n", .{next}); var reply_to: ?[]const u8 = null; const bytes = std.fmt.parseUnsigned(usize, next, 10) catch blk: { reply_to = next; break :blk try std.fmt.parseUnsigned(usize, (try in.takeDelimiter(' ')) orelse return error.EndOfStream, 10); }; - in.toss(2); // CRLF + + in.toss(1); // LF const payload = try in.take(bytes); return .{ @@ -126,38 +127,54 @@ pub fn parseNextMessage(alloc: std.mem.Allocator, in: *std.Io.Reader) ?Message { std.debug.print("word: {s}\n", .{word}); break :blk MessageType.parse(word) orelse return null; }; - defer in.toss(2); // CRLF + // defer in.toss(2); // CRLF return switch (message_type) { .connect => .{ .connect = parseJsonMessage(Message.Connect, alloc, in) catch return null }, - .@"pub" => .{ .@"pub" = parsePub(in) catch return null }, + .@"pub" => .{ .@"pub" = parsePub(in) catch |err| std.debug.panic("{}", .{err}) }, .ping => .{ .ping = {} }, else => null, }; } test parseNextMessage { - const input = - \\CONNECT {"verbose":false,"pedantic":false,"tls_required":false,"name":"NATS CLI Version v0.2.4","lang":"go","version":"1.43.0","protocol":1,"echo":true,"headers":true,"no_responders":true}\r\n - ; - var reader: std.Io.Reader = .fixed(input); - var arena: std.heap.ArenaAllocator = .init(std.testing.allocator); - defer arena.deinit(); - const gpa = arena.allocator(); - const msg: ?Message = parseNextMessage(gpa, &reader); - const expected: ?Message = .{ .connect = .{ - .verbose = false, - .pedantic = false, - .tls_required = false, - .name = "NATS CLI Version v0.2.4", - .lang = "go", - .version = "1.43.0", - .protocol = 1, - .echo = true, - .headers = true, - .no_responders = true, - } }; - try std.testing.expect(msg != null); - try std.testing.expectEqualDeep(msg, expected); + { + const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n"; + var reader: std.Io.Reader = .fixed(input); + var arena: std.heap.ArenaAllocator = .init(std.testing.allocator); + defer arena.deinit(); + const gpa = arena.allocator(); + const msg: ?Message = parseNextMessage(gpa, &reader); + const expected: ?Message = .{ .connect = .{ + .verbose = false, + .pedantic = false, + .tls_required = false, + .name = "NATS CLI Version v0.2.4", + .lang = "go", + .version = "1.43.0", + .protocol = 1, + .echo = true, + .headers = true, + .no_responders = true, + } }; + try std.testing.expect(msg != null); + try std.testing.expectEqualDeep(msg, expected); + } + { + const input = "PUB hi 3\r\nfoo\r\n"; + var reader: std.Io.Reader = .fixed(input); + var arena: std.heap.ArenaAllocator = .init(std.testing.allocator); + defer arena.deinit(); + const gpa = arena.allocator(); + const msg: ?Message = parseNextMessage(gpa, &reader); + std.debug.print("msg: {any}\n", .{msg}); + const expected: ?Message = .{ .@"pub" = .{ + .subject = "hi", + .bytes = 3, + .payload = "foo", + } }; + try std.testing.expect(msg != null); + try std.testing.expectEqualDeep(msg, expected); + } } test "MessageType.parse performance" { -- cgit