summaryrefslogtreecommitdiff
path: root/src/server
diff options
context:
space:
mode:
Diffstat (limited to 'src/server')
-rw-r--r--src/server/message_parser.zig340
1 files changed, 219 insertions, 121 deletions
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig
index 5c86261..23f00ae 100644
--- a/src/server/message_parser.zig
+++ b/src/server/message_parser.zig
@@ -415,64 +415,82 @@ fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const subject: []const u8 = try readSubject(alloc, in);
errdefer alloc.free(subject);
- const second = blk: {
- // Drop whitespace
- while (in.peekByte()) |byte| {
- if (std.ascii.isWhitespace(byte)) {
- in.toss(1);
- } else break;
- } else |err| return err;
-
- var acc: std.ArrayList(u8) = .empty;
- errdefer acc.deinit(alloc);
- while (in.peekByte()) |byte| {
- if (std.ascii.isWhitespace(byte)) break;
- try acc.append(alloc, byte);
- in.toss(1);
- } else |err| return err;
-
- break :blk try acc.toOwnedSlice(alloc);
+ const States = enum {
+ before_second,
+ in_second,
+ after_second,
+ in_third,
+ in_end,
};
- defer alloc.free(second);
- const byte_count: usize, const reply_to: ?[]const u8 =
- if (std.fmt.parseUnsigned(usize, second, 10)) |s| blk: {
+ var second: std.ArrayList(u8) = .empty;
+ defer second.deinit(alloc);
+ var third: ?std.ArrayList(u8) = null;
+ defer if (third) |*t| t.deinit(alloc);
+ var payload: std.Io.Writer.Allocating = .init(alloc);
+
+ sw: switch (@as(States, .before_second)) {
+ .before_second => {
+ // Drop whitespace
+ const byte = try in.peekByte();
+ if (std.ascii.isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .before_second;
+ }
+ continue :sw .in_second;
+ },
+ .in_second => {
+ const byte = try in.peekByte();
+ if (!std.ascii.isWhitespace(byte)) {
+ try second.append(alloc, byte);
+ in.toss(1);
+ continue :sw .in_second;
+ }
+ continue :sw .after_second;
+ },
+ .after_second => {
+ const byte = try in.peekByte();
+ if (byte == '\r') {
+ continue :sw .in_end;
+ } else if (std.ascii.isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .after_second;
+ }
+ third = .empty;
+ continue :sw .in_third;
+ },
+ .in_third => {
+ const byte = try in.peekByte();
+ if (byte == '\r') {
+ continue :sw .in_end;
+ } else if (std.ascii.isDigit(byte)) {
+ try third.?.append(alloc, byte);
+ in.toss(1);
+ continue :sw .in_third;
+ }
+ return error.InvalidStream;
+ },
+ .in_end => {
try expectStreamBytes(in, "\r\n");
- break :blk .{ s, null };
- } else |_| .{
- blk: {
- var byte_count_list: std.ArrayList(u8) = .empty;
- defer byte_count_list.deinit(alloc);
- try in.discardAll(1); // discard space
- while (in.peekByte()) |byte| {
- if (std.ascii.isWhitespace(byte)) {
- try expectStreamBytes(in, "\r\n");
- break;
- }
- defer in.toss(1);
- if (std.ascii.isDigit(byte)) {
- try byte_count_list.append(alloc, byte);
- } else {
- return error.InvalidStream;
- }
- } else |err| return err;
- break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
- },
- try alloc.dupe(u8, second),
+ },
+ }
+
+ const reply_to: ?[]const u8, const bytes: usize =
+ if (third) |t| .{
+ try alloc.dupe(u8, second.items),
+ try std.fmt.parseUnsigned(usize, t.items, 10),
+ } else .{
+ null,
+ try std.fmt.parseUnsigned(usize, second.items, 10),
};
- const payload = blk: {
- const bytes = try alloc.alloc(u8, byte_count);
- errdefer alloc.free(bytes);
- try in.readSliceAll(bytes);
- try expectStreamBytes(in, "\r\n");
- break :blk bytes;
- };
+ try in.streamExact(&payload.writer, bytes);
+ try expectStreamBytes(in, "\r\n");
return .{
.@"pub" = .{
.subject = subject,
- .payload = payload,
+ .payload = try payload.toOwnedSlice(),
.reply_to = reply_to,
},
};
@@ -515,7 +533,7 @@ test parsePub {
try std.testing.expectEqualDeep(
Message{ .@"pub" = .{
.subject = "foo",
- .reply_to = "reply.to",
+ .reply_to = "5",
.payload = "bar",
} },
res,
@@ -530,95 +548,175 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const subject: []const u8 = try readSubject(alloc, in);
errdefer alloc.free(subject);
- const second = blk: {
- // Drop whitespace
- while (in.peekByte()) |byte| {
- if (std.ascii.isWhitespace(byte)) {
- in.toss(1);
- } else break;
- } else |err| return err;
-
- var acc: std.ArrayList(u8) = .empty;
- errdefer acc.deinit(alloc);
- while (in.peekByte()) |byte| {
- if (std.ascii.isWhitespace(byte)) break;
- try acc.append(alloc, byte);
- in.toss(1);
- } else |err| return err;
-
- break :blk try acc.toOwnedSlice(alloc);
+ const States = enum {
+ before_second,
+ in_second,
+ after_second,
+ in_third,
+ after_third,
+ in_fourth,
+ in_end,
};
- errdefer alloc.free(second);
-
- const header_byte_count: usize, const reply_to: ?[]const u8 =
- if (std.fmt.parseUnsigned(usize, second, 10)) |s| blk: {
- try expectStreamBytes(in, "\r\n");
- break :blk .{ s, null };
- } else |_| .{
- blk: {
- var byte_count_list: std.ArrayList(u8) = .empty;
- defer byte_count_list.deinit(alloc);
- try in.discardAll(1); // discard space
- while (in.peekByte()) |byte| {
- if (std.ascii.isWhitespace(byte)) {
- try expectStreamBytes(in, "\r\n");
- break;
- }
- defer in.toss(1);
- if (std.ascii.isDigit(byte)) {
- try byte_count_list.append(alloc, byte);
- } else {
- return error.InvalidStream;
- }
- } else |err| return err;
- break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
- },
- second,
- };
-
- std.log.debug("buffered: '{s}'", .{in.buffered()});
- // Parse byte count
- const byte_count = blk: {
- var byte_count_list: std.ArrayList(u8) = .empty;
- defer byte_count_list.deinit(alloc);
- while (in.peekByte()) |byte| {
+ var second: std.ArrayList(u8) = .empty;
+ defer second.deinit(alloc);
+ var third: std.ArrayList(u8) = .empty;
+ defer third.deinit(alloc);
+ var fourth: ?std.ArrayList(u8) = null;
+ defer if (fourth) |*f| f.deinit(alloc);
+ var payload: std.Io.Writer.Allocating = .init(alloc);
+
+ sw: switch (@as(States, .before_second)) {
+ .before_second => {
+ // Drop whitespace
+ const byte = try in.peekByte();
if (std.ascii.isWhitespace(byte)) {
- try expectStreamBytes(in, "\r\n");
- break;
+ in.toss(1);
+ continue :sw .before_second;
}
- defer in.toss(1);
-
- if (std.ascii.isDigit(byte)) {
- try byte_count_list.append(alloc, byte);
- } else {
- return error.InvalidStream;
+ continue :sw .in_second;
+ },
+ .in_second => {
+ const byte = try in.peekByte();
+ if (!std.ascii.isWhitespace(byte)) {
+ try second.append(alloc, byte);
+ in.toss(1);
+ continue :sw .in_second;
}
- } else |err| return err;
+ continue :sw .after_second;
+ },
+ .after_second => {
+ const byte = try in.peekByte();
+ if (byte == '\r') {
+ continue :sw .in_end;
+ } else if (std.ascii.isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .after_second;
+ }
+ third = .empty;
+ continue :sw .in_third;
+ },
+ .in_third => {
+ const byte = try in.peekByte();
+ if (!std.ascii.isWhitespace(byte)) {
+ try third.append(alloc, byte);
+ in.toss(1);
+ continue :sw .in_third;
+ }
+ continue :sw .after_third;
+ },
+ .after_third => {
+ const byte = try in.peekByte();
+ if (byte == '\r') {
+ continue :sw .in_end;
+ } else if (std.ascii.isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .after_third;
+ }
+ fourth = .empty;
+ continue :sw .in_fourth;
+ },
+ .in_fourth => {
+ const byte = try in.peekByte();
+ if (byte == '\r') {
+ continue :sw .in_end;
+ } else if (std.ascii.isDigit(byte)) {
+ try fourth.?.append(alloc, byte);
+ in.toss(1);
+ continue :sw .in_fourth;
+ }
+ return error.InvalidStream;
+ },
+ .in_end => {
+ try expectStreamBytes(in, "\r\n");
+ },
+ }
- break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
- };
+ const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize =
+ if (fourth) |f| .{
+ try alloc.dupe(u8, second.items),
+ try std.fmt.parseUnsigned(usize, third.items, 10),
+ try std.fmt.parseUnsigned(usize, f.items, 10),
+ } else .{
+ null,
+ try std.fmt.parseUnsigned(usize, second.items, 10),
+ try std.fmt.parseUnsigned(usize, third.items, 10),
+ };
- const payload = blk: {
- const bytes = try alloc.alloc(u8, byte_count);
- errdefer alloc.free(bytes);
- try in.readSliceAll(bytes);
- try expectStreamBytes(in, "\r\n");
- break :blk bytes;
- };
+ try in.streamExact(&payload.writer, total_bytes);
+ try expectStreamBytes(in, "\r\n");
return .{
.hpub = .{
- .header_bytes = header_byte_count,
+ .header_bytes = header_bytes,
.@"pub" = .{
.subject = subject,
- .payload = payload,
+ .payload = try payload.toOwnedSlice(),
.reply_to = reply_to,
},
},
};
}
+test parseHPub {
+ {
+ var in: std.Io.Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
+ var res = try parseHPub(std.testing.allocator, &in);
+ defer res.hpub.deinit(std.testing.allocator);
+ try std.testing.expectEqualDeep(
+ Message{
+ .hpub = .{
+ .header_bytes = 22,
+ .@"pub" = .{
+ .subject = "foo",
+ .reply_to = null,
+ .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!",
+ },
+ },
+ },
+ res,
+ );
+ }
+
+ {
+ var in: std.Io.Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
+ var res = try parseHPub(std.testing.allocator, &in);
+ defer res.hpub.deinit(std.testing.allocator);
+ try std.testing.expectEqualDeep(
+ Message{
+ .hpub = .{
+ .header_bytes = 22,
+ .@"pub" = .{
+ .subject = "foo",
+ .reply_to = "reply.to",
+ .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!",
+ },
+ },
+ },
+ res,
+ );
+ }
+
+ {
+ var in: std.Io.Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
+ var res = try parseHPub(std.testing.allocator, &in);
+ defer res.hpub.deinit(std.testing.allocator);
+ try std.testing.expectEqualDeep(
+ Message{
+ .hpub = .{
+ .header_bytes = 22,
+ .@"pub" = .{
+ .subject = "foo",
+ .reply_to = "6",
+ .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!",
+ },
+ },
+ },
+ res,
+ );
+ }
+}
+
fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 {
var subject_list: std.ArrayList(u8) = try .initCapacity(alloc, 1024);
errdefer subject_list.deinit(alloc);