summaryrefslogtreecommitdiff
path: root/src/Server
diff options
context:
space:
mode:
Diffstat (limited to 'src/Server')
-rw-r--r--src/Server/parse.zig109
1 files changed, 83 insertions, 26 deletions
diff --git a/src/Server/parse.zig b/src/Server/parse.zig
index 6f8281b..f47b671 100644
--- a/src/Server/parse.zig
+++ b/src/Server/parse.zig
@@ -51,6 +51,12 @@ pub fn control(in: *Reader) Error!message.Control {
break :blk min_len;
};
std.debug.assert(in.buffer.len >= longest_ctrl);
+ if (in.seek == in.end) {
+ // If there is nothing in the read buffer, reset it to start from the beginning.
+ // This will minimize rebases.
+ in.seek = 0;
+ in.end = 0;
+ }
// Wait until at least the enough text to parse the shortest control value is available
try in.fill(3);
while (true) {
@@ -60,10 +66,10 @@ pub fn control(in: *Reader) Error!message.Control {
in.toss(str.len);
return ctrl;
} else if (str.len >= longest_ctrl) {
+ log.debug("ctrl too long: '{s}'\tbytes: {d}", .{ str, str.len });
return error.InvalidStream;
}
}
- log.debug("filling more in control.", .{});
try in.fillMore();
}
}
@@ -121,7 +127,9 @@ test control {
/// The return value is owned by the reader passed to this function.
/// Operations that modify the readers buffer invalidates this value.
-pub fn @"pub"(in: *Reader) Error!Message.Pub {
+/// The arena_allocator is used to store the payload if it can't fit
+/// in the readers buffer.
+pub fn @"pub"(in: *Reader, arena_allocator: *std.heap.ArenaAllocator) (error{OutOfMemory} || Error)!Message.Pub {
// TODO: Add pedantic option.
// See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1
@@ -140,29 +148,53 @@ pub fn @"pub"(in: *Reader) Error!Message.Pub {
continue;
}
if (in.buffered()[iter.index] == '\r') {
- const bytes = parseUnsigned(usize, second, 10) catch return error.InvalidStream;
- log.debug("received len: {d}", .{in.buffered().len});
- log.debug("headers len: {d}\tbytes: {d}", .{ iter.index, bytes });
- log.debug("buffer len: {d}", .{in.buffer.len});
- if (in.buffered().len < iter.index + bytes + "\r\n".len + "\r\n".len) {
- try in.fill(iter.index + bytes + "\r\n".len + "\r\n".len);
- continue;
- }
- in.toss(iter.index + "\r\n".len);
- return .{
- .subject = subject,
- .reply_to = null,
- .payload = in.take(bytes + 2) catch unreachable,
+ const bytes = parseUnsigned(usize, second, 10) catch {
+ log.debug("pub can't parse bytes: '{s}'", .{second});
+ return error.InvalidStream;
};
+ // if we can fit the payload and the headers in our read buffer
+ // reference the read buffer.
+ if (in.buffer.len > iter.index + bytes + "\r\n".len + "\r\n".len) {
+ // TODO: Can we use >=?
+ if (in.buffered().len < iter.index + bytes + "\r\n".len + "\r\n".len) {
+ try in.fill(iter.index + bytes + "\r\n".len + "\r\n".len);
+ continue;
+ }
+ in.toss(iter.index + "\r\n".len);
+ return .{
+ .subject = subject,
+ .reply_to = null,
+ .payload = in.take(bytes + 2) catch unreachable,
+ };
+ } else {
+ // else alloc the payload
+ const alloc = arena_allocator.allocator();
+ // We have to dupe the subject because we will not retain it in the read buffer
+ // as we accumulate the payload.
+ const subject_alloc = try alloc.dupe(u8, subject);
+ in.toss(iter.index + "\r\n".len);
+ const payload = try in.readAlloc(alloc, bytes + 2);
+ return .{
+ .subject = subject_alloc,
+ .reply_to = null,
+ .payload = payload,
+ };
+ }
+
+ if (in.end - in.seek - "\r\n".len > bytes) {} else {}
}
switch (in.buffered()[iter.index]) {
'\t', ' ' => {
const reply_to = second;
- const bytes = parseUnsigned(usize, iter.next() orelse {
+ const third = iter.next() orelse {
try in.fillMore();
continue;
- }, 10) catch return error.InvalidStream;
+ };
+ const bytes = parseUnsigned(usize, third, 10) catch {
+ log.debug("pub can't parse bytes (with reply): '{s}'", .{third});
+ return error.InvalidStream;
+ };
if (in.buffered().len == iter.index or in.buffered()[iter.index] != '\r') {
try in.fillMore();
@@ -417,7 +449,10 @@ pub fn unsub(in: *Reader) Error!Message.Unsub {
}
if (iter.next()) |max_msgs_str| {
if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') {
- const max_msgs = parseUnsigned(usize, max_msgs_str, 10) catch return error.InvalidStream;
+ const max_msgs = parseUnsigned(usize, max_msgs_str, 10) catch {
+ log.debug("unsub can't parse max_msgs: '{s}'", .{max_msgs_str});
+ return error.InvalidStream;
+ };
if (in.buffered().len < iter.index + 2) {
try in.fill(iter.index + 2);
@@ -443,7 +478,10 @@ pub fn unsub(in: *Reader) Error!Message.Unsub {
const sid = iter.next() orelse return error.EndOfStream;
const max_msgs = if (iter.next()) |max_msgs_str| blk: {
log.debug("max_msgs: {any}", .{max_msgs_str});
- break :blk parseUnsigned(usize, max_msgs_str, 10) catch return error.InvalidStream;
+ break :blk parseUnsigned(usize, max_msgs_str, 10) catch {
+ log.debug("unsub can't parse bytes (eos): '{s}'", .{max_msgs_str});
+ return error.InvalidStream;
+ };
} else null;
return .{
.sid = sid,
@@ -529,7 +567,10 @@ test unsub {
/// The return value is owned by the reader passed to this function.
/// Operations that modify the readers buffer invalidates this value.
-pub fn hpub(in: *Reader) Error!Message.HPub {
+/// The arena_allocator is used to store the payload if it can't fit
+/// in the readers buffer.
+pub fn hpub(in: *Reader, arena_allocator: *std.heap.ArenaAllocator) (error{OutOfMemory} || Error)!Message.HPub {
+ _ = arena_allocator;
// TODO: Add pedantic option.
// See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1
while (true) {
@@ -543,8 +584,14 @@ pub fn hpub(in: *Reader) Error!Message.HPub {
const header_bytes_str = second;
const total_bytes_str = third;
- const header_bytes = parseUnsigned(usize, header_bytes_str, 10) catch return error.InvalidStream;
- const total_bytes = parseUnsigned(usize, total_bytes_str, 10) catch return error.InvalidStream;
+ const header_bytes = parseUnsigned(usize, header_bytes_str, 10) catch {
+ log.debug("hpub can't parse header bytes: '{s}'", .{header_bytes_str});
+ return error.InvalidStream;
+ };
+ const total_bytes = parseUnsigned(usize, total_bytes_str, 10) catch {
+ log.debug("hpub can't parse total bytes: '{s}'", .{header_bytes_str});
+ return error.InvalidStream;
+ };
if (in.buffered().len < iter.index + total_bytes + 4) {
try in.fill(iter.index + total_bytes + 4);
@@ -568,8 +615,14 @@ pub fn hpub(in: *Reader) Error!Message.HPub {
const header_bytes_str = third;
if (iter.next()) |total_bytes_str| {
if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') {
- const header_bytes = parseUnsigned(usize, header_bytes_str, 10) catch return error.InvalidStream;
- const total_bytes = parseUnsigned(usize, total_bytes_str, 10) catch return error.InvalidStream;
+ const header_bytes = parseUnsigned(usize, header_bytes_str, 10) catch {
+ log.debug("hpub can't parse header bytes (with reply): '{s}'", .{header_bytes_str});
+ return error.InvalidStream;
+ };
+ const total_bytes = parseUnsigned(usize, total_bytes_str, 10) catch {
+ log.debug("hpub can't parse total bytes (with reply): '{s}'", .{header_bytes_str});
+ return error.InvalidStream;
+ };
if (in.buffered().len < iter.index + total_bytes + 4) {
try in.fill(iter.index + total_bytes + 4);
@@ -664,14 +717,18 @@ pub fn connect(alloc: Allocator, in: *Reader) (error{OutOfMemory} || Error)!Mess
connect_allocator,
connect_str,
.{ .allocate = .alloc_always },
- ) catch return error.InvalidStream;
+ ) catch {
+ log.debug("connect can't parse json body: '{s}'", .{connect_str});
+ return error.InvalidStream;
+ };
return res.dupe(alloc);
}
inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void {
if (!std.mem.eql(u8, try reader.take(expected.len), expected)) {
- @branchHint(.unlikely);
+ @branchHint(.cold);
+ log.debug("expectStreamBytes wrong bytes", .{});
return error.InvalidStream;
}
}