summaryrefslogtreecommitdiff
path: root/src/Server/parse.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/Server/parse.zig')
-rw-r--r--src/Server/parse.zig115
1 files changed, 59 insertions, 56 deletions
diff --git a/src/Server/parse.zig b/src/Server/parse.zig
index b8d979a..cda5985 100644
--- a/src/Server/parse.zig
+++ b/src/Server/parse.zig
@@ -126,54 +126,59 @@ pub fn @"pub"(in: *Reader) Error!Message.Pub {
// See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1
while (true) {
- var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r");
-
- if (iter.next()) |subject| {
- if (iter.next()) |second| {
- if (in.buffered().len > iter.index) {
- if (in.buffered()[iter.index] == '\r') {
- const bytes_str = second;
- const bytes = parseUnsigned(usize, bytes_str, 10) catch return error.InvalidStream;
-
- if (in.buffered().len < iter.index + bytes + 4) {
- try in.fill(iter.index + bytes + 4);
- // Fill may shift buffer, so we have to retokenize it.
- continue;
- }
-
- // 4 bytes for CRLF on either side of the payload.
- in.toss(iter.index + 2);
- defer in.toss(2);
- return .{
- .subject = subject,
- .reply_to = null,
- .payload = in.take(bytes) catch unreachable,
- };
- }
-
- const reply_to = second;
- if (iter.next()) |bytes_str| {
- const bytes = parseUnsigned(usize, bytes_str, 10) catch return error.InvalidStream;
+ var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r\n");
+ const subject = iter.next() orelse {
+ try in.fillMore();
+ continue;
+ };
+ const second = iter.next() orelse {
+ try in.fillMore();
+ continue;
+ };
+ if (in.buffered().len == iter.index) {
+ try in.fillMore();
+ continue;
+ }
+ if (in.buffered()[iter.index] == '\r') {
+ const bytes = parseUnsigned(usize, second, 10) catch return error.InvalidStream;
+ if (in.buffered().len < iter.index + bytes + "\r\n".len + "\r\n".len) {
+ try in.fill(iter.index + bytes + "\r\n".len + "\r\n".len);
+ continue;
+ }
+ in.toss(iter.index + "\r\n".len);
+ return .{
+ .subject = subject,
+ .reply_to = null,
+ .payload = in.take(bytes + 2) catch unreachable,
+ };
+ }
- if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') {
- if (in.buffered().len < iter.index + bytes + 4) {
- try in.fill(iter.index + bytes + 4);
- // Fill may shift buffer, so we have to retokenize it.
- continue;
- }
+ switch (in.buffered()[iter.index]) {
+ '\t', ' ' => {
+ const reply_to = second;
+ const bytes = parseUnsigned(usize, iter.next() orelse {
+ try in.fillMore();
+ continue;
+ }, 10) catch return error.InvalidStream;
+
+ if (in.buffered().len == iter.index or in.buffered()[iter.index] != '\r') {
+ try in.fillMore();
+ continue;
+ }
- // 4 bytes for CRLF on either side of the payload.
- in.toss(iter.index + 2);
- defer in.toss(2);
- return .{
- .subject = subject,
- .reply_to = reply_to,
- .payload = in.take(bytes) catch unreachable,
- };
- }
- }
+ if (in.buffered().len < iter.index + bytes + "\r\n".len + "\r\n".len) {
+ try in.fillMore();
+ continue;
}
- }
+
+ in.toss(iter.index + "\r\n".len);
+ return .{
+ .subject = subject,
+ .reply_to = reply_to,
+ .payload = in.take(bytes + 2) catch unreachable,
+ };
+ },
+ else => {},
}
try in.fillMore();
@@ -190,7 +195,7 @@ test @"pub" {
Message.Pub{
.subject = "foo",
.reply_to = "bar",
- .payload = "hi",
+ .payload = "hi\r\n",
},
try @"pub"(&in.interface),
);
@@ -205,7 +210,7 @@ test @"pub" {
Message.Pub{
.subject = "foo",
.reply_to = null,
- .payload = "hi",
+ .payload = "hi\r\n",
},
try @"pub"(&in.interface),
);
@@ -221,7 +226,7 @@ test @"pub" {
Message.Pub{
.subject = "foo",
.reply_to = null,
- .payload = "hi",
+ .payload = "hi\r\n",
},
try @"pub"(&in.interface),
);
@@ -238,7 +243,7 @@ test @"pub" {
Message.Pub{
.subject = "foo",
.reply_to = null,
- .payload = "hi",
+ .payload = "hi\r\n",
},
try @"pub"(&in.interface),
);
@@ -256,7 +261,7 @@ test @"pub" {
Message.Pub{
.subject = "foo",
.reply_to = null,
- .payload = "hi",
+ .payload = "hi\r\n",
},
try @"pub"(&in.interface),
);
@@ -545,13 +550,12 @@ pub fn hpub(in: *Reader) Error!Message.HPub {
// 4 bytes for CRLF on either side of headers and payload.
in.toss(iter.index + 2);
- defer in.toss(2);
return .{
.header_bytes = header_bytes,
.@"pub" = .{
.subject = subject,
.reply_to = null,
- .payload = in.take(total_bytes) catch unreachable,
+ .payload = in.take(total_bytes + 2) catch unreachable,
},
};
}
@@ -571,13 +575,12 @@ pub fn hpub(in: *Reader) Error!Message.HPub {
// 4 bytes for CRLF on either side of headers and payload.
in.toss(iter.index + 2);
- defer in.toss(2);
return .{
.header_bytes = header_bytes,
.@"pub" = .{
.subject = subject,
.reply_to = reply_to,
- .payload = in.take(total_bytes) catch unreachable,
+ .payload = in.take(total_bytes + 2) catch unreachable,
},
};
}
@@ -602,7 +605,7 @@ test hpub {
.@"pub" = .{
.subject = "foo",
.reply_to = null,
- .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!",
+ .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n",
},
},
try hpub(&in.interface),
@@ -620,7 +623,7 @@ test hpub {
.@"pub" = .{
.subject = "foo",
.reply_to = "reply",
- .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!",
+ .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n",
},
},
try hpub(&in.interface),