summaryrefslogtreecommitdiff
path: root/src/Server/parse.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/Server/parse.zig')
-rw-r--r--src/Server/parse.zig137
1 files changed, 88 insertions, 49 deletions
diff --git a/src/Server/parse.zig b/src/Server/parse.zig
index 6ae99a5..2aca6e8 100644
--- a/src/Server/parse.zig
+++ b/src/Server/parse.zig
@@ -45,6 +45,8 @@ pub fn control(in: *Reader) !message.Control {
break :blk min_len;
};
std.debug.assert(in.buffer.len >= longest_ctrl);
+ // Wait until at least the enough text to parse the shortest control value is available
+ try in.fill(3);
while (true) {
var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r");
if (iter.next()) |str| {
@@ -55,6 +57,7 @@ pub fn control(in: *Reader) !message.Control {
return error.InvalidControl;
}
}
+ log.debug("filling more in control.", .{});
try in.fillMore();
}
}
@@ -146,7 +149,7 @@ pub fn @"pub"(in: *Reader) !Message.Pub {
if (iter.next()) |bytes_str| {
const bytes = try parseUnsigned(usize, bytes_str, 10);
- if (in.buffered()[iter.index] == '\r') {
+ if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') {
if (in.buffered().len < iter.index + bytes + 4) {
try in.fill(iter.index + bytes + 4);
// Fill may shift buffer, so we have to retokenize it.
@@ -287,7 +290,7 @@ pub fn sub(in: *Reader) !Message.Sub {
const queue_group = second;
if (iter.next()) |sid| {
- if (in.buffered()[iter.index] == '\r') {
+ if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') {
if (in.buffered().len < iter.index + 2) {
try in.fill(iter.index + 2);
// Fill may shift buffer, so we have to retokenize it.
@@ -380,27 +383,11 @@ pub fn unsub(in: *Reader) !Message.Unsub {
// See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1
while (true) {
- var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r");
+ var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r\n");
if (iter.next()) |sid| {
- if (in.buffered()[iter.index] == '\r') {
- if (in.buffered().len < iter.index + 2) {
- try in.fill(iter.index + 2);
- // Fill may shift buffer, so we have to retokenize it.
- continue;
- }
-
- // 2 bytes for CRLF at the end.
- in.toss(iter.index + 2);
- return .{
- .sid = sid,
- .max_msgs = null,
- };
- }
- if (iter.next()) |max_msgs_str| {
+ if (in.buffered().len > iter.index) {
if (in.buffered()[iter.index] == '\r') {
- const max_msgs = try parseUnsigned(usize, max_msgs_str, 10);
-
if (in.buffered().len < iter.index + 2) {
try in.fill(iter.index + 2);
// Fill may shift buffer, so we have to retokenize it.
@@ -409,16 +396,48 @@ pub fn unsub(in: *Reader) !Message.Unsub {
// 2 bytes for CRLF at the end.
in.toss(iter.index + 2);
-
return .{
.sid = sid,
- .max_msgs = max_msgs,
+ .max_msgs = null,
};
}
+ if (iter.next()) |max_msgs_str| {
+ if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') {
+ const max_msgs = try parseUnsigned(usize, max_msgs_str, 10);
+
+ if (in.buffered().len < iter.index + 2) {
+ try in.fill(iter.index + 2);
+ // Fill may shift buffer, so we have to retokenize it.
+ continue;
+ }
+
+ // 2 bytes for CRLF at the end.
+ in.toss(iter.index + 2);
+
+ return .{
+ .sid = sid,
+ .max_msgs = max_msgs,
+ };
+ }
+ }
}
}
- try in.fillMore();
+ in.fillMore() catch |err| switch (err) {
+ error.EndOfStream => {
+ iter.reset();
+ const sid = iter.next() orelse return error.EndOfStream;
+ const max_msgs = if (iter.next()) |max_msgs_str| blk: {
+ log.debug("max_msgs: {any}", .{max_msgs_str});
+ break :blk try parseUnsigned(usize, max_msgs_str, 10);
+ } else null;
+ return .{
+ .sid = sid,
+ .max_msgs = max_msgs,
+ };
+ },
+ else => |e| return e,
+ };
}
}
@@ -478,6 +497,20 @@ test unsub {
try unsub(&in.interface),
);
}
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = " 1\r" },
+ .{ .buffer = "\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Unsub{
+ .sid = "1",
+ .max_msgs = null,
+ },
+ try unsub(&in.interface),
+ );
+ }
}
/// The return value is owned by the reader passed to this function.
@@ -521,28 +554,26 @@ pub fn hpub(in: *Reader) !Message.HPub {
const reply_to = second;
const header_bytes_str = third;
if (iter.next()) |total_bytes_str| {
- if (in.buffered().len > iter.index) {
- if (in.buffered()[iter.index] == '\r') {
- const header_bytes = try parseUnsigned(usize, header_bytes_str, 10);
- const total_bytes = try parseUnsigned(usize, total_bytes_str, 10);
-
- if (in.buffered().len < iter.index + total_bytes + 4) {
- try in.fill(iter.index + total_bytes + 4);
- continue;
- }
-
- // 4 bytes for CRLF on either side of headers and payload.
- in.toss(iter.index + 2);
- defer in.toss(2);
- return .{
- .header_bytes = header_bytes,
- .@"pub" = .{
- .subject = subject,
- .reply_to = reply_to,
- .payload = in.take(total_bytes) catch unreachable,
- },
- };
+ if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') {
+ const header_bytes = try parseUnsigned(usize, header_bytes_str, 10);
+ const total_bytes = try parseUnsigned(usize, total_bytes_str, 10);
+
+ if (in.buffered().len < iter.index + total_bytes + 4) {
+ try in.fill(iter.index + total_bytes + 4);
+ continue;
}
+
+ // 4 bytes for CRLF on either side of headers and payload.
+ in.toss(iter.index + 2);
+ defer in.toss(2);
+ return .{
+ .header_bytes = header_bytes,
+ .@"pub" = .{
+ .subject = subject,
+ .reply_to = reply_to,
+ .payload = in.take(total_bytes) catch unreachable,
+ },
+ };
}
}
}
@@ -593,7 +624,12 @@ test hpub {
// TODO: more tests
}
-pub fn connect(alloc: Allocator, in: *Reader) !Message.Connect {
+pub fn connect(alloc: Allocator, in: *Reader) error{
+ EndOfStream,
+ ReadFailed,
+ OutOfMemory,
+ InvalidStream,
+}!Message.Connect {
// for storing the json string
var connect_string_writer_allocating: AllocatingWriter = .init(alloc);
defer connect_string_writer_allocating.deinit();
@@ -607,18 +643,21 @@ pub fn connect(alloc: Allocator, in: *Reader) !Message.Connect {
try in.discardAll(1); // throw away space
// Should read the next JSON object to the fixed buffer writer.
- _ = try in.streamDelimiter(connect_string_writer, '}');
- try connect_string_writer.writeByte('}');
+ _ = in.streamDelimiter(connect_string_writer, '}') catch |err| switch (err) {
+ error.WriteFailed => return error.OutOfMemory,
+ else => |e| return e,
+ };
+ connect_string_writer.writeByte('}') catch return error.OutOfMemory;
try expectStreamBytes(in, "}\r\n"); // discard '}\r\n'
const connect_str = try connect_string_writer_allocating.toOwnedSlice();
defer alloc.free(connect_str);
- const res = try std.json.parseFromSliceLeaky(
+ const res = std.json.parseFromSliceLeaky(
Message.Connect,
connect_allocator,
connect_str,
.{ .allocate = .alloc_always },
- );
+ ) catch return error.InvalidStream;
return res.dupe(alloc);
}