diff options
Diffstat (limited to 'src/Server/parse.zig')
| -rw-r--r-- | src/Server/parse.zig | 115 |
1 files changed, 59 insertions, 56 deletions
diff --git a/src/Server/parse.zig b/src/Server/parse.zig index b8d979a..cda5985 100644 --- a/src/Server/parse.zig +++ b/src/Server/parse.zig @@ -126,54 +126,59 @@ pub fn @"pub"(in: *Reader) Error!Message.Pub { // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1 while (true) { - var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); - - if (iter.next()) |subject| { - if (iter.next()) |second| { - if (in.buffered().len > iter.index) { - if (in.buffered()[iter.index] == '\r') { - const bytes_str = second; - const bytes = parseUnsigned(usize, bytes_str, 10) catch return error.InvalidStream; - - if (in.buffered().len < iter.index + bytes + 4) { - try in.fill(iter.index + bytes + 4); - // Fill may shift buffer, so we have to retokenize it. - continue; - } - - // 4 bytes for CRLF on either side of the payload. - in.toss(iter.index + 2); - defer in.toss(2); - return .{ - .subject = subject, - .reply_to = null, - .payload = in.take(bytes) catch unreachable, - }; - } - - const reply_to = second; - if (iter.next()) |bytes_str| { - const bytes = parseUnsigned(usize, bytes_str, 10) catch return error.InvalidStream; + var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r\n"); + const subject = iter.next() orelse { + try in.fillMore(); + continue; + }; + const second = iter.next() orelse { + try in.fillMore(); + continue; + }; + if (in.buffered().len == iter.index) { + try in.fillMore(); + continue; + } + if (in.buffered()[iter.index] == '\r') { + const bytes = parseUnsigned(usize, second, 10) catch return error.InvalidStream; + if (in.buffered().len < iter.index + bytes + "\r\n".len + "\r\n".len) { + try in.fill(iter.index + bytes + "\r\n".len + "\r\n".len); + continue; + } + in.toss(iter.index + "\r\n".len); + return .{ + .subject = subject, + .reply_to = null, + .payload = in.take(bytes + 2) catch unreachable, + }; + } - if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') { - if (in.buffered().len < iter.index + bytes + 4) { - try in.fill(iter.index + bytes + 4); - // Fill may shift buffer, so we have to retokenize it. - continue; - } + switch (in.buffered()[iter.index]) { + '\t', ' ' => { + const reply_to = second; + const bytes = parseUnsigned(usize, iter.next() orelse { + try in.fillMore(); + continue; + }, 10) catch return error.InvalidStream; + + if (in.buffered().len == iter.index or in.buffered()[iter.index] != '\r') { + try in.fillMore(); + continue; + } - // 4 bytes for CRLF on either side of the payload. - in.toss(iter.index + 2); - defer in.toss(2); - return .{ - .subject = subject, - .reply_to = reply_to, - .payload = in.take(bytes) catch unreachable, - }; - } - } + if (in.buffered().len < iter.index + bytes + "\r\n".len + "\r\n".len) { + try in.fillMore(); + continue; } - } + + in.toss(iter.index + "\r\n".len); + return .{ + .subject = subject, + .reply_to = reply_to, + .payload = in.take(bytes + 2) catch unreachable, + }; + }, + else => {}, } try in.fillMore(); @@ -190,7 +195,7 @@ test @"pub" { Message.Pub{ .subject = "foo", .reply_to = "bar", - .payload = "hi", + .payload = "hi\r\n", }, try @"pub"(&in.interface), ); @@ -205,7 +210,7 @@ test @"pub" { Message.Pub{ .subject = "foo", .reply_to = null, - .payload = "hi", + .payload = "hi\r\n", }, try @"pub"(&in.interface), ); @@ -221,7 +226,7 @@ test @"pub" { Message.Pub{ .subject = "foo", .reply_to = null, - .payload = "hi", + .payload = "hi\r\n", }, try @"pub"(&in.interface), ); @@ -238,7 +243,7 @@ test @"pub" { Message.Pub{ .subject = "foo", .reply_to = null, - .payload = "hi", + .payload = "hi\r\n", }, try @"pub"(&in.interface), ); @@ -256,7 +261,7 @@ test @"pub" { Message.Pub{ .subject = "foo", .reply_to = null, - .payload = "hi", + .payload = "hi\r\n", }, try @"pub"(&in.interface), ); @@ -545,13 +550,12 @@ pub fn hpub(in: *Reader) Error!Message.HPub { // 4 bytes for CRLF on either side of headers and payload. in.toss(iter.index + 2); - defer in.toss(2); return .{ .header_bytes = header_bytes, .@"pub" = .{ .subject = subject, .reply_to = null, - .payload = in.take(total_bytes) catch unreachable, + .payload = in.take(total_bytes + 2) catch unreachable, }, }; } @@ -571,13 +575,12 @@ pub fn hpub(in: *Reader) Error!Message.HPub { // 4 bytes for CRLF on either side of headers and payload. in.toss(iter.index + 2); - defer in.toss(2); return .{ .header_bytes = header_bytes, .@"pub" = .{ .subject = subject, .reply_to = reply_to, - .payload = in.take(total_bytes) catch unreachable, + .payload = in.take(total_bytes + 2) catch unreachable, }, }; } @@ -602,7 +605,7 @@ test hpub { .@"pub" = .{ .subject = "foo", .reply_to = null, - .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!", + .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n", }, }, try hpub(&in.interface), @@ -620,7 +623,7 @@ test hpub { .@"pub" = .{ .subject = "foo", .reply_to = "reply", - .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!", + .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n", }, }, try hpub(&in.interface), |
