diff options
Diffstat (limited to 'src/Server')
| -rw-r--r-- | src/Server/parse.zig | 109 |
1 files changed, 83 insertions, 26 deletions
diff --git a/src/Server/parse.zig b/src/Server/parse.zig index 6f8281b..f47b671 100644 --- a/src/Server/parse.zig +++ b/src/Server/parse.zig @@ -51,6 +51,12 @@ pub fn control(in: *Reader) Error!message.Control { break :blk min_len; }; std.debug.assert(in.buffer.len >= longest_ctrl); + if (in.seek == in.end) { + // If there is nothing in the read buffer, reset it to start from the beginning. + // This will minimize rebases. + in.seek = 0; + in.end = 0; + } // Wait until at least the enough text to parse the shortest control value is available try in.fill(3); while (true) { @@ -60,10 +66,10 @@ pub fn control(in: *Reader) Error!message.Control { in.toss(str.len); return ctrl; } else if (str.len >= longest_ctrl) { + log.debug("ctrl too long: '{s}'\tbytes: {d}", .{ str, str.len }); return error.InvalidStream; } } - log.debug("filling more in control.", .{}); try in.fillMore(); } } @@ -121,7 +127,9 @@ test control { /// The return value is owned by the reader passed to this function. /// Operations that modify the readers buffer invalidates this value. -pub fn @"pub"(in: *Reader) Error!Message.Pub { +/// The arena_allocator is used to store the payload if it can't fit +/// in the readers buffer. +pub fn @"pub"(in: *Reader, arena_allocator: *std.heap.ArenaAllocator) (error{OutOfMemory} || Error)!Message.Pub { // TODO: Add pedantic option. // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1 @@ -140,29 +148,53 @@ pub fn @"pub"(in: *Reader) Error!Message.Pub { continue; } if (in.buffered()[iter.index] == '\r') { - const bytes = parseUnsigned(usize, second, 10) catch return error.InvalidStream; - log.debug("received len: {d}", .{in.buffered().len}); - log.debug("headers len: {d}\tbytes: {d}", .{ iter.index, bytes }); - log.debug("buffer len: {d}", .{in.buffer.len}); - if (in.buffered().len < iter.index + bytes + "\r\n".len + "\r\n".len) { - try in.fill(iter.index + bytes + "\r\n".len + "\r\n".len); - continue; - } - in.toss(iter.index + "\r\n".len); - return .{ - .subject = subject, - .reply_to = null, - .payload = in.take(bytes + 2) catch unreachable, + const bytes = parseUnsigned(usize, second, 10) catch { + log.debug("pub can't parse bytes: '{s}'", .{second}); + return error.InvalidStream; }; + // if we can fit the payload and the headers in our read buffer + // reference the read buffer. + if (in.buffer.len > iter.index + bytes + "\r\n".len + "\r\n".len) { + // TODO: Can we use >=? + if (in.buffered().len < iter.index + bytes + "\r\n".len + "\r\n".len) { + try in.fill(iter.index + bytes + "\r\n".len + "\r\n".len); + continue; + } + in.toss(iter.index + "\r\n".len); + return .{ + .subject = subject, + .reply_to = null, + .payload = in.take(bytes + 2) catch unreachable, + }; + } else { + // else alloc the payload + const alloc = arena_allocator.allocator(); + // We have to dupe the subject because we will not retain it in the read buffer + // as we accumulate the payload. + const subject_alloc = try alloc.dupe(u8, subject); + in.toss(iter.index + "\r\n".len); + const payload = try in.readAlloc(alloc, bytes + 2); + return .{ + .subject = subject_alloc, + .reply_to = null, + .payload = payload, + }; + } + + if (in.end - in.seek - "\r\n".len > bytes) {} else {} } switch (in.buffered()[iter.index]) { '\t', ' ' => { const reply_to = second; - const bytes = parseUnsigned(usize, iter.next() orelse { + const third = iter.next() orelse { try in.fillMore(); continue; - }, 10) catch return error.InvalidStream; + }; + const bytes = parseUnsigned(usize, third, 10) catch { + log.debug("pub can't parse bytes (with reply): '{s}'", .{third}); + return error.InvalidStream; + }; if (in.buffered().len == iter.index or in.buffered()[iter.index] != '\r') { try in.fillMore(); @@ -417,7 +449,10 @@ pub fn unsub(in: *Reader) Error!Message.Unsub { } if (iter.next()) |max_msgs_str| { if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') { - const max_msgs = parseUnsigned(usize, max_msgs_str, 10) catch return error.InvalidStream; + const max_msgs = parseUnsigned(usize, max_msgs_str, 10) catch { + log.debug("unsub can't parse max_msgs: '{s}'", .{max_msgs_str}); + return error.InvalidStream; + }; if (in.buffered().len < iter.index + 2) { try in.fill(iter.index + 2); @@ -443,7 +478,10 @@ pub fn unsub(in: *Reader) Error!Message.Unsub { const sid = iter.next() orelse return error.EndOfStream; const max_msgs = if (iter.next()) |max_msgs_str| blk: { log.debug("max_msgs: {any}", .{max_msgs_str}); - break :blk parseUnsigned(usize, max_msgs_str, 10) catch return error.InvalidStream; + break :blk parseUnsigned(usize, max_msgs_str, 10) catch { + log.debug("unsub can't parse bytes (eos): '{s}'", .{max_msgs_str}); + return error.InvalidStream; + }; } else null; return .{ .sid = sid, @@ -529,7 +567,10 @@ test unsub { /// The return value is owned by the reader passed to this function. /// Operations that modify the readers buffer invalidates this value. -pub fn hpub(in: *Reader) Error!Message.HPub { +/// The arena_allocator is used to store the payload if it can't fit +/// in the readers buffer. +pub fn hpub(in: *Reader, arena_allocator: *std.heap.ArenaAllocator) (error{OutOfMemory} || Error)!Message.HPub { + _ = arena_allocator; // TODO: Add pedantic option. // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1 while (true) { @@ -543,8 +584,14 @@ pub fn hpub(in: *Reader) Error!Message.HPub { const header_bytes_str = second; const total_bytes_str = third; - const header_bytes = parseUnsigned(usize, header_bytes_str, 10) catch return error.InvalidStream; - const total_bytes = parseUnsigned(usize, total_bytes_str, 10) catch return error.InvalidStream; + const header_bytes = parseUnsigned(usize, header_bytes_str, 10) catch { + log.debug("hpub can't parse header bytes: '{s}'", .{header_bytes_str}); + return error.InvalidStream; + }; + const total_bytes = parseUnsigned(usize, total_bytes_str, 10) catch { + log.debug("hpub can't parse total bytes: '{s}'", .{header_bytes_str}); + return error.InvalidStream; + }; if (in.buffered().len < iter.index + total_bytes + 4) { try in.fill(iter.index + total_bytes + 4); @@ -568,8 +615,14 @@ pub fn hpub(in: *Reader) Error!Message.HPub { const header_bytes_str = third; if (iter.next()) |total_bytes_str| { if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') { - const header_bytes = parseUnsigned(usize, header_bytes_str, 10) catch return error.InvalidStream; - const total_bytes = parseUnsigned(usize, total_bytes_str, 10) catch return error.InvalidStream; + const header_bytes = parseUnsigned(usize, header_bytes_str, 10) catch { + log.debug("hpub can't parse header bytes (with reply): '{s}'", .{header_bytes_str}); + return error.InvalidStream; + }; + const total_bytes = parseUnsigned(usize, total_bytes_str, 10) catch { + log.debug("hpub can't parse total bytes (with reply): '{s}'", .{header_bytes_str}); + return error.InvalidStream; + }; if (in.buffered().len < iter.index + total_bytes + 4) { try in.fill(iter.index + total_bytes + 4); @@ -664,14 +717,18 @@ pub fn connect(alloc: Allocator, in: *Reader) (error{OutOfMemory} || Error)!Mess connect_allocator, connect_str, .{ .allocate = .alloc_always }, - ) catch return error.InvalidStream; + ) catch { + log.debug("connect can't parse json body: '{s}'", .{connect_str}); + return error.InvalidStream; + }; return res.dupe(alloc); } inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void { if (!std.mem.eql(u8, try reader.take(expected.len), expected)) { - @branchHint(.unlikely); + @branchHint(.cold); + log.debug("expectStreamBytes wrong bytes", .{}); return error.InvalidStream; } } |
