From 9e32d014c234f2bdded380de5193b05469739a70 Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Sat, 3 Jan 2026 02:33:24 +0000 Subject: Restructuring parser Adding tests fore everything --- src/server/message_parser.zig | 366 ++++++++++++++++++++++++++++++------------ 1 file changed, 261 insertions(+), 105 deletions(-) diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index 0d60df5..3adf704 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -168,6 +168,10 @@ pub const Message = union(enum) { sid: []const u8, /// A number of messages to wait for before automatically unsubscribing. max_msgs: ?usize = null, + + pub fn deinit(self: Unsub, alloc: std.mem.Allocator) void { + alloc.free(self.sid); + } }; pub const Msg = struct { subject: []const u8, @@ -231,6 +235,8 @@ pub const Message = union(enum) { break :blk .initBuffer(&buf); }; + std.log.debug("buffered: '{s}'", .{in.buffered()}); + while (in.peekByte()) |byte| { if (std.ascii.isUpper(byte)) { try operation_string.appendBounded(byte); @@ -239,9 +245,12 @@ pub const Message = union(enum) { } else |err| return err; const operation = parse(operation_string.items) orelse { + std.log.err("Invalid operation: '{s}'", .{operation_string.items}); return error.InvalidOperation; }; + errdefer std.log.err("Failed to parse {s}", .{operation_string.items}); + switch (operation) { .connect => { // for storing the json string @@ -290,104 +299,239 @@ pub const Message = union(enum) { return .pong; }, .sub => { - if (!std.ascii.isWhitespace(try in.takeByte())) { - @branchHint(.unlikely); - return error.InvalidStream; - } - const subject = try readSubject(alloc, in, .sub); - errdefer alloc.free(subject); - const second = blk: { - // Drop whitespace - while (in.peekByte()) |byte| { - if (std.ascii.isWhitespace(byte)) { - in.toss(1); - } else break; - } else |err| return err; - - var acc: std.ArrayList(u8) = .empty; - errdefer acc.deinit(alloc); - while (in.peekByte()) |byte| { - if (std.ascii.isWhitespace(byte)) break; - try acc.append(alloc, byte); - in.toss(1); - } else |err| return err; - - break :blk try acc.toOwnedSlice(alloc); - }; - errdefer alloc.free(second); - const queue_group = if ((try in.peekByte()) != '\r') second else null; - // We do not need an errdefer free for queue group, because it will only be second (already has errdefer free) or null. - const sid = if (queue_group) |_| try alloc.dupe(u8, try in.takeDelimiterExclusive('\r')) else second; - // if queue_group is null, that means sid is second, and already has an errdefer free. - errdefer if (queue_group) |_| alloc.free(sid); - try expectStreamBytes(in, "\r\n"); - return .{ - .sub = .{ - .subject = subject, - .queue_group = queue_group, - .sid = sid, - }, - }; + return parseSub(alloc, in); }, .unsub => { - if (!std.ascii.isWhitespace(try in.takeByte())) { - @branchHint(.unlikely); - return error.InvalidStream; - } - // Parse sid - const sid = blk: { - var acc: std.ArrayList(u8) = .empty; - errdefer acc.deinit(alloc); - while (in.peekByte()) |byte| { - if (std.ascii.isWhitespace(byte)) break; - try acc.append(alloc, byte); - in.toss(1); - } else |err| return err; - break :blk try acc.toOwnedSlice(alloc); - }; - errdefer alloc.free(sid); - - if ((try in.peekByte()) == '\r') { - try expectStreamBytes(in, "\r\n"); - return .{ - .unsub = .{ - .sid = sid, - }, - }; - } else if (std.ascii.isWhitespace(try in.peekByte())) { - in.toss(1); - const max_msgs = blk: { - var max_msgs_list: std.ArrayList(u8) = .empty; - errdefer max_msgs_list.deinit(alloc); - while (in.peekByte()) |byte| { - if (std.ascii.isWhitespace(byte)) { - try expectStreamBytes(in, "\r\n"); - break; - } - - if (std.ascii.isDigit(byte)) { - try max_msgs_list.append(alloc, byte); - } else { - return error.InvalidStream; - } - } else |err| return err; - - break :blk try std.fmt.parseUnsigned(usize, max_msgs_list.items, 10); - }; - - return .{ - .unsub = .{ - .sid = sid, - .max_msgs = max_msgs, - }, - }; - } else return error.InvalidStream; + return parseUnsub(alloc, in); }, else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}), } } }; +fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { + try in.discardAll(1); // throw away space + const subject = try readSubject(alloc, in, .sub); + + const States = enum { + before_second, + in_second, + after_second, + in_third, + in_end, + }; + + var second: std.ArrayList(u8) = .empty; + errdefer second.deinit(alloc); + var third: ?std.ArrayList(u8) = null; + errdefer if (third) |*t| t.deinit(alloc); + + sw: switch (@as(States, .before_second)) { + .before_second => { + const byte = try in.peekByte(); + if (std.ascii.isWhitespace(byte)) { + in.toss(1); + continue :sw .before_second; + } + continue :sw .in_second; + }, + .in_second => { + const byte = try in.peekByte(); + if (!std.ascii.isWhitespace(byte)) { + try second.append(alloc, byte); + in.toss(1); + continue :sw .in_second; + } + continue :sw .after_second; + }, + .after_second => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (std.ascii.isWhitespace(byte)) { + in.toss(1); + continue :sw .after_second; + } + third = .empty; + continue :sw .in_third; + }, + .in_third => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } + try third.?.append(alloc, byte); + in.toss(1); + }, + .in_end => { + try expectStreamBytes(in, "\r\n"); + }, + } + + return .{ + .sub = .{ + .subject = subject, + .queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null, + .sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc), + }, + }; +} + +test parseSub { + { + var in: std.Io.Reader = .fixed(" foo 1\r\n"); + var res = try parseSub(std.testing.allocator, &in); + defer res.sub.deinit(std.testing.allocator); + try std.testing.expectEqualDeep( + Message{ + .sub = .{ + .subject = "foo", + .queue_group = null, + .sid = "1", + }, + }, + res, + ); + } + { + var in: std.Io.Reader = .fixed(" foo 1\r\n"); + var res = try parseSub(std.testing.allocator, &in); + defer res.sub.deinit(std.testing.allocator); + try std.testing.expectEqualDeep( + Message{ + .sub = .{ + .subject = "foo", + .queue_group = null, + .sid = "1", + }, + }, + res, + ); + } + { + var in: std.Io.Reader = .fixed(" foo q 1\r\n"); + var res = try parseSub(std.testing.allocator, &in); + defer res.sub.deinit(std.testing.allocator); + try std.testing.expectEqualDeep( + Message{ + .sub = .{ + .subject = "foo", + .queue_group = "q", + .sid = "1", + }, + }, + res, + ); + } + { + var in: std.Io.Reader = .fixed(" 1 q 1\r\n"); + var res = try parseSub(std.testing.allocator, &in); + defer res.sub.deinit(std.testing.allocator); + try std.testing.expectEqualDeep( + Message{ + .sub = .{ + .subject = "1", + .queue_group = "q", + .sid = "1", + }, + }, + res, + ); + } + { + var in: std.Io.Reader = .fixed(" $SRV.PING 4\r\n"); + var res = try parseSub(std.testing.allocator, &in); + defer res.sub.deinit(std.testing.allocator); + try std.testing.expectEqualDeep( + Message{ + .sub = .{ + .subject = "$SRV.PING", + .queue_group = null, + .sid = "4", + }, + }, + res, + ); + } +} + +fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { + try in.discardAll(1); // throw away space + var first: std.ArrayList(u8) = .empty; + errdefer first.deinit(alloc); + + while (in.peekByte()) |byte| { + if (std.ascii.isWhitespace(byte)) break; + try first.append(alloc, byte); + in.toss(1); + } else |err| return err; + + while (in.peekByte()) |byte| { + if (!std.ascii.isWhitespace(byte) or byte == '\r') break; + in.toss(1); + } else |err| return err; + + if (try in.peekByte() == '\r') { + try expectStreamBytes(in, "\r\n"); + return .{ + .unsub = .{ + .sid = try first.toOwnedSlice(alloc), + }, + }; + } else { + var second: std.ArrayList(u8) = .empty; + defer second.deinit(alloc); + + while (in.peekByte()) |byte| { + if (std.ascii.isWhitespace(byte)) break; + try second.append(alloc, byte); + in.toss(1); + } else |err| return err; + + try expectStreamBytes(in, "\r\n"); + return .{ + .unsub = .{ + .max_msgs = try std.fmt.parseUnsigned(usize, second.items, 10), + .sid = try first.toOwnedSlice(alloc), + }, + }; + } +} + +test parseUnsub { + { + var in: std.Io.Reader = .fixed(" 1\r\n"); + var res = try parseUnsub(std.testing.allocator, &in); + defer res.unsub.deinit(std.testing.allocator); + try std.testing.expectEqualDeep( + Message{ + .unsub = .{ + .sid = "1", + .max_msgs = null, + }, + }, + res, + ); + try std.testing.expectEqual(0, in.buffered().len); + } + + { + var in: std.Io.Reader = .fixed(" 1 1\r\n"); + var res = try parseUnsub(std.testing.allocator, &in); + defer res.unsub.deinit(std.testing.allocator); + try std.testing.expectEqualDeep( + Message{ + .unsub = .{ + .sid = "1", + .max_msgs = 1, + }, + }, + res, + ); + try std.testing.expectEqual(0, in.buffered().len); + } +} + fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { try in.discardAll(1); // throw away space @@ -483,13 +627,16 @@ test parsePub { var res = try parsePub(std.testing.allocator, &in); defer res.@"pub".deinit(std.testing.allocator); try std.testing.expectEqualDeep( - Message{ .@"pub" = .{ - .subject = "foo", - .reply_to = null, - .payload = "bar", - } }, + Message{ + .@"pub" = .{ + .subject = "foo", + .reply_to = null, + .payload = "bar", + }, + }, res, ); + try std.testing.expectEqual(0, in.buffered().len); } { @@ -497,13 +644,16 @@ test parsePub { var res = try parsePub(std.testing.allocator, &in); defer res.@"pub".deinit(std.testing.allocator); try std.testing.expectEqualDeep( - Message{ .@"pub" = .{ - .subject = "foo", - .reply_to = "reply.to", - .payload = "bar", - } }, + Message{ + .@"pub" = .{ + .subject = "foo", + .reply_to = "reply.to", + .payload = "bar", + }, + }, res, ); + try std.testing.expectEqual(0, in.buffered().len); } // numeric reply subject @@ -512,13 +662,16 @@ test parsePub { var res = try parsePub(std.testing.allocator, &in); defer res.@"pub".deinit(std.testing.allocator); try std.testing.expectEqualDeep( - Message{ .@"pub" = .{ - .subject = "foo", - .reply_to = "5", - .payload = "bar", - } }, + Message{ + .@"pub" = .{ + .subject = "foo", + .reply_to = "5", + .payload = "bar", + }, + }, res, ); + try std.testing.expectEqual(0, in.buffered().len); } } @@ -658,6 +811,7 @@ test parseHPub { }, res, ); + try std.testing.expectEqual(0, in.buffered().len); } { @@ -677,6 +831,7 @@ test parseHPub { }, res, ); + try std.testing.expectEqual(0, in.buffered().len); } { @@ -696,6 +851,7 @@ test parseHPub { }, res, ); + try std.testing.expectEqual(0, in.buffered().len); } } -- cgit