From 826da348a51c0650394e564850e9a0c65c1cfeea Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Tue, 2 Dec 2025 22:37:50 -0500 Subject: --- src/server/message_parser.zig | 159 ++++++++++++++++++++++++------------------ 1 file changed, 91 insertions(+), 68 deletions(-) (limited to 'src/server/message_parser.zig') diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index bee11bc..e791d14 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -31,9 +31,9 @@ pub const MessageType = enum { } }; -pub const Message = union(enum) { +pub const Message = union(MessageType) { info: ServerInfo, - connect: Connect, + connect: AllocatedConnect, @"pub": Pub, hpub: void, sub: Sub, @@ -71,6 +71,14 @@ pub const Message = union(enum) { /// feature. proto: u32 = 1, }; + pub const AllocatedConnect = struct { + allocator: std.heap.ArenaAllocator, + connect: Connect, + + pub fn deinit(self: *AllocatedConnect) void { + self.allocator.deinit(); + } + }; pub const Connect = struct { verbose: bool = false, pedantic: bool = false, @@ -136,8 +144,20 @@ pub const Message = union(enum) { /// An error should be handled by cleaning up this connection. pub fn next(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { + // errdefer |err| { + // std.debug.print("Error occurred: {}\n", .{err}); + + // // Get the error return trace + // if (@errorReturnTrace()) |trace| { + // std.debug.print("Error return trace:\n", .{}); + // std.debug.dumpStackTrace(trace); + // } else { + // std.debug.print("No error return trace available\n", .{}); + // } + // } + var operation_string: std.ArrayList(u8) = blk: { - var buf: ["CONTINUE".len]u8 = undefined; + var buf: ["CONTINUE".len + 1]u8 = undefined; break :blk .initBuffer(&buf); }; @@ -149,15 +169,15 @@ pub const Message = union(enum) { } else |err| return err; const operation = parse(operation_string.items) orelse { - std.debug.print("operation: '{s}'\n", .{operation_string.items}); - std.debug.print("buffered: '{s}'", .{in.buffered()}); return error.InvalidOperation; }; switch (operation) { .connect => { // TODO: should be ARENA allocator - var connect_string_writer_allocating: std.Io.Writer.Allocating = try .initCapacity(alloc, 1024); + var connect_arena_allocator: std.heap.ArenaAllocator = .init(alloc); + const connect_allocator = connect_arena_allocator.allocator(); + const connect_string_writer_allocating: std.Io.Writer.Allocating = try .initCapacity(connect_allocator, 1024); var connect_string_writer = connect_string_writer_allocating.writer; try in.discardAll(1); // throw away space @@ -167,9 +187,9 @@ pub const Message = union(enum) { std.debug.assert(std.mem.eql(u8, try in.take(3), "}\r\n")); // discard '}\r\n' // TODO: should be CONNECTION allocator - const res = try std.json.parseFromSliceLeaky(Connect, alloc, connect_string_writer.buffered(), .{ .allocate = .alloc_always }); + const res = try std.json.parseFromSliceLeaky(Connect, connect_allocator, connect_string_writer.buffered(), .{ .allocate = .alloc_always }); - return .{ .connect = res }; + return .{ .connect = .{ .allocator = connect_arena_allocator, .connect = res } }; }, .@"pub" => { try in.discardAll(1); // throw away space @@ -311,67 +331,70 @@ fn parsePub(in: *std.Io.Reader) !Message.Pub { } // try returning error in debug mode, only null in release? -pub fn parseNextMessage(alloc: std.mem.Allocator, in: *std.Io.Reader) ?Message { - const message_type: MessageType = blk: { - var word: ["CONNECT".len]u8 = undefined; - var len: usize = 0; - for (&word, 0..) |*b, i| { - const byte = in.takeByte() catch return null; - if (std.ascii.isUpper(byte)) { - b.* = byte; - len = i + 1; - } else break; - } - - break :blk Message.parse(word[0..len]) orelse return null; - }; - - // defer in.toss(2); // CRLF - return switch (message_type) { - .connect => blk: { - const value: ?Message = .{ .connect = parseJsonMessage(Message.Connect, alloc, in) catch return null }; - - break :blk value; - }, - .@"pub" => .{ .@"pub" = parsePub(in) catch |err| std.debug.panic("{}", .{err}) }, - .ping => .ping, - else => null, - }; -} - -test parseNextMessage { - const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\nPUB hi 3\r\nfoo\r\n"; - var reader: std.Io.Reader = .fixed(input); - var arena: std.heap.ArenaAllocator = .init(std.testing.allocator); - defer arena.deinit(); - const gpa = arena.allocator(); +// pub fn parseNextMessage(alloc: std.mem.Allocator, in: *std.Io.Reader) ?Message { +// const message_type: MessageType = blk: { +// var word: ["CONNECT".len]u8 = undefined; +// var len: usize = 0; +// for (&word, 0..) |*b, i| { +// const byte = in.takeByte() catch return null; +// if (std.ascii.isUpper(byte)) { +// b.* = byte; +// len = i + 1; +// } else break; +// } + +// break :blk Message.parse(word[0..len]) orelse return null; +// }; + +// // defer in.toss(2); // CRLF +// return switch (message_type) { +// .connect => blk: { +// const value: ?Message = .{ .connect = parseJsonMessage(Message.Connect, alloc, in) catch return null }; + +// break :blk value; +// }, +// .@"pub" => .{ .@"pub" = parsePub(in) catch |err| std.debug.panic("{}", .{err}) }, +// .ping => .ping, +// else => null, +// }; +// } - { - const msg: Message = try Message.next(gpa, &reader); - const expected: Message = .{ .connect = .{ - .verbose = false, - .pedantic = false, - .tls_required = false, - .name = "NATS CLI Version v0.2.4", - .lang = "go", - .version = "1.43.0", - .protocol = 1, - .echo = true, - .headers = true, - .no_responders = true, - } }; - - try std.testing.expectEqualDeep(expected, msg); - } - { - const msg: Message = try Message.next(gpa, &reader); - const expected: Message = .{ .@"pub" = .{ - .subject = "hi", - .payload = "foo", - } }; - try std.testing.expectEqualDeep(expected, msg); - } -} +// test parseNextMessage { +// const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\nPUB hi 3\r\nfoo\r\n"; +// var reader: std.Io.Reader = .fixed(input); +// var arena: std.heap.ArenaAllocator = .init(std.testing.allocator); +// defer arena.deinit(); +// const gpa = arena.allocator(); + +// { +// const msg: Message = try Message.next(gpa, &reader); +// const expected: Message = .{ .connect = .{ +// .connect = .{ +// .verbose = false, +// .pedantic = false, +// .tls_required = false, +// .name = try gpa.dupe(u8, "NATS CLI Version v0.2.4"), +// .lang = try gpa.dupe(u8, "go"), +// .version = try gpa.dupe(u8, "1.43.0"), +// .protocol = 1, +// .echo = true, +// .headers = true, +// .no_responders = true, +// }, +// .allocator = arena, +// } }; + +// try std.testing.expectEqualDeep(expected, msg); +// } +// { +// const msg: Message = try Message.next(gpa, &reader); +// const expected: Message = .{ .@"pub" = .{ +// .subject = "hi", +// .payload = "foo", +// } }; +// try std.testing.expectEqualDeep(expected, msg); +// } +// } // test "MessageType.parse performance" { // // Measure perf for parseMemEql -- cgit