From a4ec798521bda5564e2dc96c2184100116b54d28 Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Sat, 3 Jan 2026 03:16:51 +0000 Subject: Fix parse errors, ownership errors. --- src/server/message_parser.zig | 110 ++++++++++++++++++++++++++++-------------- 1 file changed, 73 insertions(+), 37 deletions(-) (limited to 'src/server/message_parser.zig') diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index 3adf704..54149cb 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -235,8 +235,6 @@ pub const Message = union(enum) { break :blk .initBuffer(&buf); }; - std.log.debug("buffered: '{s}'", .{in.buffered()}); - while (in.peekByte()) |byte| { if (std.ascii.isUpper(byte)) { try operation_string.appendBounded(byte); @@ -362,6 +360,7 @@ fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { } try third.?.append(alloc, byte); in.toss(1); + continue :sw .in_third; }, .in_end => { try expectStreamBytes(in, "\r\n"); @@ -453,49 +452,86 @@ test parseSub { res, ); } + { + var in: std.Io.Reader = .fixed(" foo.echo q 10\r\n"); + var res = try parseSub(std.testing.allocator, &in); + defer res.sub.deinit(std.testing.allocator); + try std.testing.expectEqualDeep( + Message{ + .sub = .{ + .subject = "foo.echo", + .queue_group = "q", + .sid = "10", + }, + }, + res, + ); + } } fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { - try in.discardAll(1); // throw away space + const States = enum { + before_first, + in_first, + after_first, + in_second, + in_end, + }; + var first: std.ArrayList(u8) = .empty; errdefer first.deinit(alloc); + var second: ?std.ArrayList(u8) = null; + defer if (second) |*s| s.deinit(alloc); - while (in.peekByte()) |byte| { - if (std.ascii.isWhitespace(byte)) break; - try first.append(alloc, byte); - in.toss(1); - } else |err| return err; - - while (in.peekByte()) |byte| { - if (!std.ascii.isWhitespace(byte) or byte == '\r') break; - in.toss(1); - } else |err| return err; - - if (try in.peekByte() == '\r') { - try expectStreamBytes(in, "\r\n"); - return .{ - .unsub = .{ - .sid = try first.toOwnedSlice(alloc), - }, - }; - } else { - var second: std.ArrayList(u8) = .empty; - defer second.deinit(alloc); - - while (in.peekByte()) |byte| { - if (std.ascii.isWhitespace(byte)) break; - try second.append(alloc, byte); + sw: switch (@as(States, .before_first)) { + .before_first => { + const byte = try in.peekByte(); + if (std.ascii.isWhitespace(byte)) { + in.toss(1); + continue :sw .before_first; + } + continue :sw .in_first; + }, + .in_first => { + const byte = try in.peekByte(); + if (!std.ascii.isWhitespace(byte)) { + try first.append(alloc, byte); + in.toss(1); + continue :sw .in_first; + } + continue :sw .after_first; + }, + .after_first => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (std.ascii.isWhitespace(byte)) { + in.toss(1); + continue :sw .after_first; + } + second = .empty; + continue :sw .in_second; + }, + .in_second => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } + try second.?.append(alloc, byte); in.toss(1); - } else |err| return err; - - try expectStreamBytes(in, "\r\n"); - return .{ - .unsub = .{ - .max_msgs = try std.fmt.parseUnsigned(usize, second.items, 10), - .sid = try first.toOwnedSlice(alloc), - }, - }; + continue :sw .in_second; + }, + .in_end => { + try expectStreamBytes(in, "\r\n"); + }, } + + return .{ + .unsub = .{ + .sid = try first.toOwnedSlice(alloc), + .max_msgs = if (second) |s| try std.fmt.parseUnsigned(usize, s.items, 10) else null, + }, + }; } test parseUnsub { -- cgit