diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/server/message_parser.zig | 340 |
1 files changed, 219 insertions, 121 deletions
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index 5c86261..23f00ae 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -415,64 +415,82 @@ fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { const subject: []const u8 = try readSubject(alloc, in); errdefer alloc.free(subject); - const second = blk: { - // Drop whitespace - while (in.peekByte()) |byte| { - if (std.ascii.isWhitespace(byte)) { - in.toss(1); - } else break; - } else |err| return err; - - var acc: std.ArrayList(u8) = .empty; - errdefer acc.deinit(alloc); - while (in.peekByte()) |byte| { - if (std.ascii.isWhitespace(byte)) break; - try acc.append(alloc, byte); - in.toss(1); - } else |err| return err; - - break :blk try acc.toOwnedSlice(alloc); + const States = enum { + before_second, + in_second, + after_second, + in_third, + in_end, }; - defer alloc.free(second); - const byte_count: usize, const reply_to: ?[]const u8 = - if (std.fmt.parseUnsigned(usize, second, 10)) |s| blk: { + var second: std.ArrayList(u8) = .empty; + defer second.deinit(alloc); + var third: ?std.ArrayList(u8) = null; + defer if (third) |*t| t.deinit(alloc); + var payload: std.Io.Writer.Allocating = .init(alloc); + + sw: switch (@as(States, .before_second)) { + .before_second => { + // Drop whitespace + const byte = try in.peekByte(); + if (std.ascii.isWhitespace(byte)) { + in.toss(1); + continue :sw .before_second; + } + continue :sw .in_second; + }, + .in_second => { + const byte = try in.peekByte(); + if (!std.ascii.isWhitespace(byte)) { + try second.append(alloc, byte); + in.toss(1); + continue :sw .in_second; + } + continue :sw .after_second; + }, + .after_second => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (std.ascii.isWhitespace(byte)) { + in.toss(1); + continue :sw .after_second; + } + third = .empty; + continue :sw .in_third; + }, + .in_third => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (std.ascii.isDigit(byte)) { + try third.?.append(alloc, byte); + in.toss(1); + continue :sw .in_third; + } + return error.InvalidStream; + }, + .in_end => { try expectStreamBytes(in, "\r\n"); - break :blk .{ s, null }; - } else |_| .{ - blk: { - var byte_count_list: std.ArrayList(u8) = .empty; - defer byte_count_list.deinit(alloc); - try in.discardAll(1); // discard space - while (in.peekByte()) |byte| { - if (std.ascii.isWhitespace(byte)) { - try expectStreamBytes(in, "\r\n"); - break; - } - defer in.toss(1); - if (std.ascii.isDigit(byte)) { - try byte_count_list.append(alloc, byte); - } else { - return error.InvalidStream; - } - } else |err| return err; - break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10); - }, - try alloc.dupe(u8, second), + }, + } + + const reply_to: ?[]const u8, const bytes: usize = + if (third) |t| .{ + try alloc.dupe(u8, second.items), + try std.fmt.parseUnsigned(usize, t.items, 10), + } else .{ + null, + try std.fmt.parseUnsigned(usize, second.items, 10), }; - const payload = blk: { - const bytes = try alloc.alloc(u8, byte_count); - errdefer alloc.free(bytes); - try in.readSliceAll(bytes); - try expectStreamBytes(in, "\r\n"); - break :blk bytes; - }; + try in.streamExact(&payload.writer, bytes); + try expectStreamBytes(in, "\r\n"); return .{ .@"pub" = .{ .subject = subject, - .payload = payload, + .payload = try payload.toOwnedSlice(), .reply_to = reply_to, }, }; @@ -515,7 +533,7 @@ test parsePub { try std.testing.expectEqualDeep( Message{ .@"pub" = .{ .subject = "foo", - .reply_to = "reply.to", + .reply_to = "5", .payload = "bar", } }, res, @@ -530,95 +548,175 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { const subject: []const u8 = try readSubject(alloc, in); errdefer alloc.free(subject); - const second = blk: { - // Drop whitespace - while (in.peekByte()) |byte| { - if (std.ascii.isWhitespace(byte)) { - in.toss(1); - } else break; - } else |err| return err; - - var acc: std.ArrayList(u8) = .empty; - errdefer acc.deinit(alloc); - while (in.peekByte()) |byte| { - if (std.ascii.isWhitespace(byte)) break; - try acc.append(alloc, byte); - in.toss(1); - } else |err| return err; - - break :blk try acc.toOwnedSlice(alloc); + const States = enum { + before_second, + in_second, + after_second, + in_third, + after_third, + in_fourth, + in_end, }; - errdefer alloc.free(second); - - const header_byte_count: usize, const reply_to: ?[]const u8 = - if (std.fmt.parseUnsigned(usize, second, 10)) |s| blk: { - try expectStreamBytes(in, "\r\n"); - break :blk .{ s, null }; - } else |_| .{ - blk: { - var byte_count_list: std.ArrayList(u8) = .empty; - defer byte_count_list.deinit(alloc); - try in.discardAll(1); // discard space - while (in.peekByte()) |byte| { - if (std.ascii.isWhitespace(byte)) { - try expectStreamBytes(in, "\r\n"); - break; - } - defer in.toss(1); - if (std.ascii.isDigit(byte)) { - try byte_count_list.append(alloc, byte); - } else { - return error.InvalidStream; - } - } else |err| return err; - break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10); - }, - second, - }; - - std.log.debug("buffered: '{s}'", .{in.buffered()}); - // Parse byte count - const byte_count = blk: { - var byte_count_list: std.ArrayList(u8) = .empty; - defer byte_count_list.deinit(alloc); - while (in.peekByte()) |byte| { + var second: std.ArrayList(u8) = .empty; + defer second.deinit(alloc); + var third: std.ArrayList(u8) = .empty; + defer third.deinit(alloc); + var fourth: ?std.ArrayList(u8) = null; + defer if (fourth) |*f| f.deinit(alloc); + var payload: std.Io.Writer.Allocating = .init(alloc); + + sw: switch (@as(States, .before_second)) { + .before_second => { + // Drop whitespace + const byte = try in.peekByte(); if (std.ascii.isWhitespace(byte)) { - try expectStreamBytes(in, "\r\n"); - break; + in.toss(1); + continue :sw .before_second; } - defer in.toss(1); - - if (std.ascii.isDigit(byte)) { - try byte_count_list.append(alloc, byte); - } else { - return error.InvalidStream; + continue :sw .in_second; + }, + .in_second => { + const byte = try in.peekByte(); + if (!std.ascii.isWhitespace(byte)) { + try second.append(alloc, byte); + in.toss(1); + continue :sw .in_second; } - } else |err| return err; + continue :sw .after_second; + }, + .after_second => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (std.ascii.isWhitespace(byte)) { + in.toss(1); + continue :sw .after_second; + } + third = .empty; + continue :sw .in_third; + }, + .in_third => { + const byte = try in.peekByte(); + if (!std.ascii.isWhitespace(byte)) { + try third.append(alloc, byte); + in.toss(1); + continue :sw .in_third; + } + continue :sw .after_third; + }, + .after_third => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (std.ascii.isWhitespace(byte)) { + in.toss(1); + continue :sw .after_third; + } + fourth = .empty; + continue :sw .in_fourth; + }, + .in_fourth => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (std.ascii.isDigit(byte)) { + try fourth.?.append(alloc, byte); + in.toss(1); + continue :sw .in_fourth; + } + return error.InvalidStream; + }, + .in_end => { + try expectStreamBytes(in, "\r\n"); + }, + } - break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10); - }; + const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize = + if (fourth) |f| .{ + try alloc.dupe(u8, second.items), + try std.fmt.parseUnsigned(usize, third.items, 10), + try std.fmt.parseUnsigned(usize, f.items, 10), + } else .{ + null, + try std.fmt.parseUnsigned(usize, second.items, 10), + try std.fmt.parseUnsigned(usize, third.items, 10), + }; - const payload = blk: { - const bytes = try alloc.alloc(u8, byte_count); - errdefer alloc.free(bytes); - try in.readSliceAll(bytes); - try expectStreamBytes(in, "\r\n"); - break :blk bytes; - }; + try in.streamExact(&payload.writer, total_bytes); + try expectStreamBytes(in, "\r\n"); return .{ .hpub = .{ - .header_bytes = header_byte_count, + .header_bytes = header_bytes, .@"pub" = .{ .subject = subject, - .payload = payload, + .payload = try payload.toOwnedSlice(), .reply_to = reply_to, }, }, }; } +test parseHPub { + { + var in: std.Io.Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); + var res = try parseHPub(std.testing.allocator, &in); + defer res.hpub.deinit(std.testing.allocator); + try std.testing.expectEqualDeep( + Message{ + .hpub = .{ + .header_bytes = 22, + .@"pub" = .{ + .subject = "foo", + .reply_to = null, + .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!", + }, + }, + }, + res, + ); + } + + { + var in: std.Io.Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); + var res = try parseHPub(std.testing.allocator, &in); + defer res.hpub.deinit(std.testing.allocator); + try std.testing.expectEqualDeep( + Message{ + .hpub = .{ + .header_bytes = 22, + .@"pub" = .{ + .subject = "foo", + .reply_to = "reply.to", + .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!", + }, + }, + }, + res, + ); + } + + { + var in: std.Io.Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); + var res = try parseHPub(std.testing.allocator, &in); + defer res.hpub.deinit(std.testing.allocator); + try std.testing.expectEqualDeep( + Message{ + .hpub = .{ + .header_bytes = 22, + .@"pub" = .{ + .subject = "foo", + .reply_to = "6", + .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!", + }, + }, + }, + res, + ); + } +} + fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 { var subject_list: std.ArrayList(u8) = try .initCapacity(alloc, 1024); errdefer subject_list.deinit(alloc); |
