diff options
Diffstat (limited to 'src/Server/parse.zig')
| -rw-r--r-- | src/Server/parse.zig | 137 |
1 files changed, 88 insertions, 49 deletions
diff --git a/src/Server/parse.zig b/src/Server/parse.zig index 6ae99a5..2aca6e8 100644 --- a/src/Server/parse.zig +++ b/src/Server/parse.zig @@ -45,6 +45,8 @@ pub fn control(in: *Reader) !message.Control { break :blk min_len; }; std.debug.assert(in.buffer.len >= longest_ctrl); + // Wait until at least the enough text to parse the shortest control value is available + try in.fill(3); while (true) { var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); if (iter.next()) |str| { @@ -55,6 +57,7 @@ pub fn control(in: *Reader) !message.Control { return error.InvalidControl; } } + log.debug("filling more in control.", .{}); try in.fillMore(); } } @@ -146,7 +149,7 @@ pub fn @"pub"(in: *Reader) !Message.Pub { if (iter.next()) |bytes_str| { const bytes = try parseUnsigned(usize, bytes_str, 10); - if (in.buffered()[iter.index] == '\r') { + if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') { if (in.buffered().len < iter.index + bytes + 4) { try in.fill(iter.index + bytes + 4); // Fill may shift buffer, so we have to retokenize it. @@ -287,7 +290,7 @@ pub fn sub(in: *Reader) !Message.Sub { const queue_group = second; if (iter.next()) |sid| { - if (in.buffered()[iter.index] == '\r') { + if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') { if (in.buffered().len < iter.index + 2) { try in.fill(iter.index + 2); // Fill may shift buffer, so we have to retokenize it. @@ -380,27 +383,11 @@ pub fn unsub(in: *Reader) !Message.Unsub { // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1 while (true) { - var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); + var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r\n"); if (iter.next()) |sid| { - if (in.buffered()[iter.index] == '\r') { - if (in.buffered().len < iter.index + 2) { - try in.fill(iter.index + 2); - // Fill may shift buffer, so we have to retokenize it. - continue; - } - - // 2 bytes for CRLF at the end. - in.toss(iter.index + 2); - return .{ - .sid = sid, - .max_msgs = null, - }; - } - if (iter.next()) |max_msgs_str| { + if (in.buffered().len > iter.index) { if (in.buffered()[iter.index] == '\r') { - const max_msgs = try parseUnsigned(usize, max_msgs_str, 10); - if (in.buffered().len < iter.index + 2) { try in.fill(iter.index + 2); // Fill may shift buffer, so we have to retokenize it. @@ -409,16 +396,48 @@ pub fn unsub(in: *Reader) !Message.Unsub { // 2 bytes for CRLF at the end. in.toss(iter.index + 2); - return .{ .sid = sid, - .max_msgs = max_msgs, + .max_msgs = null, }; } + if (iter.next()) |max_msgs_str| { + if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') { + const max_msgs = try parseUnsigned(usize, max_msgs_str, 10); + + if (in.buffered().len < iter.index + 2) { + try in.fill(iter.index + 2); + // Fill may shift buffer, so we have to retokenize it. + continue; + } + + // 2 bytes for CRLF at the end. + in.toss(iter.index + 2); + + return .{ + .sid = sid, + .max_msgs = max_msgs, + }; + } + } } } - try in.fillMore(); + in.fillMore() catch |err| switch (err) { + error.EndOfStream => { + iter.reset(); + const sid = iter.next() orelse return error.EndOfStream; + const max_msgs = if (iter.next()) |max_msgs_str| blk: { + log.debug("max_msgs: {any}", .{max_msgs_str}); + break :blk try parseUnsigned(usize, max_msgs_str, 10); + } else null; + return .{ + .sid = sid, + .max_msgs = max_msgs, + }; + }, + else => |e| return e, + }; } } @@ -478,6 +497,20 @@ test unsub { try unsub(&in.interface), ); } + { + var buf: [64]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = " 1\r" }, + .{ .buffer = "\n" }, + }); + try std.testing.expectEqualDeep( + Message.Unsub{ + .sid = "1", + .max_msgs = null, + }, + try unsub(&in.interface), + ); + } } /// The return value is owned by the reader passed to this function. @@ -521,28 +554,26 @@ pub fn hpub(in: *Reader) !Message.HPub { const reply_to = second; const header_bytes_str = third; if (iter.next()) |total_bytes_str| { - if (in.buffered().len > iter.index) { - if (in.buffered()[iter.index] == '\r') { - const header_bytes = try parseUnsigned(usize, header_bytes_str, 10); - const total_bytes = try parseUnsigned(usize, total_bytes_str, 10); - - if (in.buffered().len < iter.index + total_bytes + 4) { - try in.fill(iter.index + total_bytes + 4); - continue; - } - - // 4 bytes for CRLF on either side of headers and payload. - in.toss(iter.index + 2); - defer in.toss(2); - return .{ - .header_bytes = header_bytes, - .@"pub" = .{ - .subject = subject, - .reply_to = reply_to, - .payload = in.take(total_bytes) catch unreachable, - }, - }; + if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') { + const header_bytes = try parseUnsigned(usize, header_bytes_str, 10); + const total_bytes = try parseUnsigned(usize, total_bytes_str, 10); + + if (in.buffered().len < iter.index + total_bytes + 4) { + try in.fill(iter.index + total_bytes + 4); + continue; } + + // 4 bytes for CRLF on either side of headers and payload. + in.toss(iter.index + 2); + defer in.toss(2); + return .{ + .header_bytes = header_bytes, + .@"pub" = .{ + .subject = subject, + .reply_to = reply_to, + .payload = in.take(total_bytes) catch unreachable, + }, + }; } } } @@ -593,7 +624,12 @@ test hpub { // TODO: more tests } -pub fn connect(alloc: Allocator, in: *Reader) !Message.Connect { +pub fn connect(alloc: Allocator, in: *Reader) error{ + EndOfStream, + ReadFailed, + OutOfMemory, + InvalidStream, +}!Message.Connect { // for storing the json string var connect_string_writer_allocating: AllocatingWriter = .init(alloc); defer connect_string_writer_allocating.deinit(); @@ -607,18 +643,21 @@ pub fn connect(alloc: Allocator, in: *Reader) !Message.Connect { try in.discardAll(1); // throw away space // Should read the next JSON object to the fixed buffer writer. - _ = try in.streamDelimiter(connect_string_writer, '}'); - try connect_string_writer.writeByte('}'); + _ = in.streamDelimiter(connect_string_writer, '}') catch |err| switch (err) { + error.WriteFailed => return error.OutOfMemory, + else => |e| return e, + }; + connect_string_writer.writeByte('}') catch return error.OutOfMemory; try expectStreamBytes(in, "}\r\n"); // discard '}\r\n' const connect_str = try connect_string_writer_allocating.toOwnedSlice(); defer alloc.free(connect_str); - const res = try std.json.parseFromSliceLeaky( + const res = std.json.parseFromSliceLeaky( Message.Connect, connect_allocator, connect_str, .{ .allocate = .alloc_always }, - ); + ) catch return error.InvalidStream; return res.dupe(alloc); } |
