From 474df6deed910172ffbc1bed26a14176bfa5540a Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Fri, 21 Nov 2025 13:21:23 -0500 Subject: --- src/server/message_parser.zig | 74 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 59 insertions(+), 15 deletions(-) (limited to 'src/server') diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index 404a4e3..2bc5286 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -94,9 +94,11 @@ const Message = union(MessageType) { }; fn parseJsonMessage(T: type, alloc: std.mem.Allocator, in: *std.Io.Reader) !T { - const slice = try in.takeDelimiterInclusive('}'); - - return std.json.parseFromSliceLeaky(T, alloc, slice, .{}); + var reader: std.json.Reader = .init(alloc, in); + return std.json.innerParse(T, alloc, &reader, .{ + .max_value_len = std.json.default_max_value_len, + .allocate = .alloc_always, + }); } fn parsePub(in: *std.Io.Reader) !Message.Pub { @@ -136,22 +138,69 @@ pub fn parseNextMessage(alloc: std.mem.Allocator, in: *std.Io.Reader) ?Message { break :blk MessageType.parse(word[0..len]) orelse return null; }; std.debug.print("buffered: '{s}'\n", .{in.buffered()}); - defer in.toss(2); // CRLF + // defer in.toss(2); // CRLF return switch (message_type) { - .connect => .{ .connect = parseJsonMessage(Message.Connect, alloc, in) catch return null }, + .connect => blk: { + const value: ?Message = .{ .connect = parseJsonMessage(Message.Connect, alloc, in) catch return null }; + std.debug.print("value: {s}\n", .{value.?.connect.name.?}); + std.debug.print("buffered: '{d}'\n", .{in.buffered().len}); + break :blk value; + }, .@"pub" => .{ .@"pub" = parsePub(in) catch |err| std.debug.panic("{}", .{err}) }, .ping => .ping, else => null, }; } +// test parseNextMessage { +// { +// const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\naftertheendoftheinput"; +// var reader: std.Io.Reader = .fixed(input); +// var arena: std.heap.ArenaAllocator = .init(std.testing.allocator); +// defer arena.deinit(); +// const gpa = arena.allocator(); +// const msg: ?Message = parseNextMessage(gpa, &reader); +// const expected: ?Message = .{ .connect = .{ +// .verbose = false, +// .pedantic = false, +// .tls_required = false, +// .name = "NATS CLI Version v0.2.4", +// .lang = "go", +// .version = "1.43.0", +// .protocol = 1, +// .echo = true, +// .headers = true, +// .no_responders = true, +// } }; +// try std.testing.expect(msg != null); +// try std.testing.expectEqualDeep(msg, expected); +// } +// { +// const input = "PUB hi 3\r\nfoo\r\n"; +// var reader: std.Io.Reader = .fixed(input); +// var arena: std.heap.ArenaAllocator = .init(std.testing.allocator); +// defer arena.deinit(); +// const gpa = arena.allocator(); +// const msg: ?Message = parseNextMessage(gpa, &reader); +// std.debug.print("msg: {any}\n", .{msg}); +// const expected: ?Message = .{ .@"pub" = .{ +// .subject = "hi", +// .bytes = 3, +// .payload = "foo", +// } }; +// try std.testing.expect(msg != null); +// try std.testing.expectEqualDeep(msg, expected); +// } +// } + test parseNextMessage { + const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\nPUB hi 3\r\nfoo\r\n"; + var reader: std.Io.Reader = .fixed(input); + var arena: std.heap.ArenaAllocator = .init(std.testing.allocator); + defer arena.deinit(); + const gpa = arena.allocator(); + { - const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n"; - var reader: std.Io.Reader = .fixed(input); - var arena: std.heap.ArenaAllocator = .init(std.testing.allocator); - defer arena.deinit(); - const gpa = arena.allocator(); const msg: ?Message = parseNextMessage(gpa, &reader); const expected: ?Message = .{ .connect = .{ .verbose = false, @@ -169,11 +218,6 @@ test parseNextMessage { try std.testing.expectEqualDeep(msg, expected); } { - const input = "PUB hi 3\r\nfoo\r\n"; - var reader: std.Io.Reader = .fixed(input); - var arena: std.heap.ArenaAllocator = .init(std.testing.allocator); - defer arena.deinit(); - const gpa = arena.allocator(); const msg: ?Message = parseNextMessage(gpa, &reader); std.debug.print("msg: {any}\n", .{msg}); const expected: ?Message = .{ .@"pub" = .{ -- cgit