diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-09 15:43:40 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-10 00:12:52 -0500 |
| commit | 0ebc39b5e872478e4d3059966ffc44a13231660e (patch) | |
| tree | ca5569cb52df85b0773362dbdf56921640072cf9 | |
| parent | f4b545f852fc52060f3b3fb55f69a8d7c52f8aa5 (diff) | |
parsing cleanup
| -rw-r--r-- | src/Server.zig | 7 | ||||
| -rw-r--r-- | src/Server/parse.zig | 115 |
2 files changed, 62 insertions, 60 deletions
diff --git a/src/Server.zig b/src/Server.zig index 68a7c49..d183593 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -195,7 +195,7 @@ fn handleConnection( // Respond to ping with pong. try client.recv_queue_write_lock.lock(io); defer client.recv_queue_write_lock.unlock(io); - _ = try client.from_client.take(2); + _ = try client.from_client.take(2); // throw out \r\n try client.recv_queue.putAll(io, "PONG\r\n"); }, .PUB => { @@ -355,13 +355,12 @@ fn publishMessage( }, else => {}, } - try line_writer.print("{d}\r\n", .{msg.payload.len}); + try line_writer.print("{d}\r\n", .{msg.payload.len - 2}); try subscription.queue_lock.lock(io); defer subscription.queue_lock.unlock(io); try subscription.queue.putAll(io, line_writer.buffered()); try subscription.queue.putAll(io, msg.payload); - try subscription.queue.putAll(io, "\r\n"); } } @@ -426,7 +425,7 @@ fn unsubscribe( /// Try to match the kernel socket buffers to maximize /// the amount of data we push through each syscall. fn getBufferSizes(io: Io) @Tuple(&.{ usize, usize }) { - const default_size = 4 * 1024; + const default_size = 8 * 1024 * 1024; const default = .{ default_size, default_size }; const dir = Dir.openDirAbsolute(io, "/proc/sys/net/core", .{}) catch { diff --git a/src/Server/parse.zig b/src/Server/parse.zig index b8d979a..cda5985 100644 --- a/src/Server/parse.zig +++ b/src/Server/parse.zig @@ -126,54 +126,59 @@ pub fn @"pub"(in: *Reader) Error!Message.Pub { // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1 while (true) { - var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); - - if (iter.next()) |subject| { - if (iter.next()) |second| { - if (in.buffered().len > iter.index) { - if (in.buffered()[iter.index] == '\r') { - const bytes_str = second; - const bytes = parseUnsigned(usize, bytes_str, 10) catch return error.InvalidStream; - - if (in.buffered().len < iter.index + bytes + 4) { - try in.fill(iter.index + bytes + 4); - // Fill may shift buffer, so we have to retokenize it. - continue; - } - - // 4 bytes for CRLF on either side of the payload. - in.toss(iter.index + 2); - defer in.toss(2); - return .{ - .subject = subject, - .reply_to = null, - .payload = in.take(bytes) catch unreachable, - }; - } - - const reply_to = second; - if (iter.next()) |bytes_str| { - const bytes = parseUnsigned(usize, bytes_str, 10) catch return error.InvalidStream; + var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r\n"); + const subject = iter.next() orelse { + try in.fillMore(); + continue; + }; + const second = iter.next() orelse { + try in.fillMore(); + continue; + }; + if (in.buffered().len == iter.index) { + try in.fillMore(); + continue; + } + if (in.buffered()[iter.index] == '\r') { + const bytes = parseUnsigned(usize, second, 10) catch return error.InvalidStream; + if (in.buffered().len < iter.index + bytes + "\r\n".len + "\r\n".len) { + try in.fill(iter.index + bytes + "\r\n".len + "\r\n".len); + continue; + } + in.toss(iter.index + "\r\n".len); + return .{ + .subject = subject, + .reply_to = null, + .payload = in.take(bytes + 2) catch unreachable, + }; + } - if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') { - if (in.buffered().len < iter.index + bytes + 4) { - try in.fill(iter.index + bytes + 4); - // Fill may shift buffer, so we have to retokenize it. - continue; - } + switch (in.buffered()[iter.index]) { + '\t', ' ' => { + const reply_to = second; + const bytes = parseUnsigned(usize, iter.next() orelse { + try in.fillMore(); + continue; + }, 10) catch return error.InvalidStream; + + if (in.buffered().len == iter.index or in.buffered()[iter.index] != '\r') { + try in.fillMore(); + continue; + } - // 4 bytes for CRLF on either side of the payload. - in.toss(iter.index + 2); - defer in.toss(2); - return .{ - .subject = subject, - .reply_to = reply_to, - .payload = in.take(bytes) catch unreachable, - }; - } - } + if (in.buffered().len < iter.index + bytes + "\r\n".len + "\r\n".len) { + try in.fillMore(); + continue; } - } + + in.toss(iter.index + "\r\n".len); + return .{ + .subject = subject, + .reply_to = reply_to, + .payload = in.take(bytes + 2) catch unreachable, + }; + }, + else => {}, } try in.fillMore(); @@ -190,7 +195,7 @@ test @"pub" { Message.Pub{ .subject = "foo", .reply_to = "bar", - .payload = "hi", + .payload = "hi\r\n", }, try @"pub"(&in.interface), ); @@ -205,7 +210,7 @@ test @"pub" { Message.Pub{ .subject = "foo", .reply_to = null, - .payload = "hi", + .payload = "hi\r\n", }, try @"pub"(&in.interface), ); @@ -221,7 +226,7 @@ test @"pub" { Message.Pub{ .subject = "foo", .reply_to = null, - .payload = "hi", + .payload = "hi\r\n", }, try @"pub"(&in.interface), ); @@ -238,7 +243,7 @@ test @"pub" { Message.Pub{ .subject = "foo", .reply_to = null, - .payload = "hi", + .payload = "hi\r\n", }, try @"pub"(&in.interface), ); @@ -256,7 +261,7 @@ test @"pub" { Message.Pub{ .subject = "foo", .reply_to = null, - .payload = "hi", + .payload = "hi\r\n", }, try @"pub"(&in.interface), ); @@ -545,13 +550,12 @@ pub fn hpub(in: *Reader) Error!Message.HPub { // 4 bytes for CRLF on either side of headers and payload. in.toss(iter.index + 2); - defer in.toss(2); return .{ .header_bytes = header_bytes, .@"pub" = .{ .subject = subject, .reply_to = null, - .payload = in.take(total_bytes) catch unreachable, + .payload = in.take(total_bytes + 2) catch unreachable, }, }; } @@ -571,13 +575,12 @@ pub fn hpub(in: *Reader) Error!Message.HPub { // 4 bytes for CRLF on either side of headers and payload. in.toss(iter.index + 2); - defer in.toss(2); return .{ .header_bytes = header_bytes, .@"pub" = .{ .subject = subject, .reply_to = reply_to, - .payload = in.take(total_bytes) catch unreachable, + .payload = in.take(total_bytes + 2) catch unreachable, }, }; } @@ -602,7 +605,7 @@ test hpub { .@"pub" = .{ .subject = "foo", .reply_to = null, - .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!", + .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n", }, }, try hpub(&in.interface), @@ -620,7 +623,7 @@ test hpub { .@"pub" = .{ .subject = "foo", .reply_to = "reply", - .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!", + .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n", }, }, try hpub(&in.interface), |
