From ed9911596915bfaefaae7b784af30502a13027b0 Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Thu, 8 Jan 2026 21:41:49 -0500 Subject: More robust parsing and error propagation --- src/Server.zig | 41 ++++++++++----- src/Server/parse.zig | 137 +++++++++++++++++++++++++++++++++------------------ 2 files changed, 118 insertions(+), 60 deletions(-) diff --git a/src/Server.zig b/src/Server.zig index 4ae959f..68a7c49 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -197,28 +197,49 @@ fn handleConnection( defer client.recv_queue_write_lock.unlock(io); _ = try client.from_client.take(2); try client.recv_queue.putAll(io, "PONG\r\n"); - // try client.send(io, "PONG\r\n"); }, .PUB => { @branchHint(.likely); // log.debug("received a pub msg", .{}); - try server.publishMessage(io, server_allocator, &client, .@"pub"); + server.publishMessage(io, server_allocator, &client, .@"pub") catch |err| switch (err) { + error.WriteFailed => return writer.err.?, + error.ReadFailed => return reader.err.?, + error.EndOfStream => return error.ClientDisconnected, + else => |e| return e, + }; }, .HPUB => { @branchHint(.likely); - try server.publishMessage(io, server_allocator, &client, .hpub); + server.publishMessage(io, server_allocator, &client, .hpub) catch |err| switch (err) { + error.WriteFailed => return writer.err.?, + error.ReadFailed => return reader.err.?, + error.EndOfStream => return error.ClientDisconnected, + else => |e| return e, + }; }, .SUB => { - try server.subscribe(io, server_allocator, &client, id); + server.subscribe(io, server_allocator, &client, id) catch |err| switch (err) { + error.ReadFailed => return reader.err.?, + error.EndOfStream => return error.ClientDisconnected, + else => |e| return e, + }; }, .UNSUB => { - try server.unsubscribe(io, server_allocator, client, id); + server.unsubscribe(io, server_allocator, client, id) catch |err| switch (err) { + error.ReadFailed => return reader.err.?, + error.EndOfStream => return error.ClientDisconnected, + else => |e| return e, + }; }, .CONNECT => { if (client.connect) |*current| { current.deinit(server_allocator); } - client.connect = try parse.connect(server_allocator, client.from_client); + client.connect = parse.connect(server_allocator, client.from_client) catch |err| switch (err) { + error.ReadFailed => return reader.err.?, + error.EndOfStream => return error.ClientDisconnected, + else => |e| return e, + }; }, else => |e| { panic("Unimplemented message: {any}\n", .{e}); @@ -291,11 +312,6 @@ fn publishMessage( .hpub => hpubmsg.@"pub", }; - // const subject = switch (pub_or_hpub) { - // .PUB => |pb| pb.subject, - // .HPUB => |hp| hp.@"pub".subject, - // else => unreachable, - // }; try server.subs_lock.lock(io); defer server.subs_lock.unlock(io); var published_queue_groups: ArrayList([]const u8) = .empty; @@ -380,6 +396,7 @@ fn subscribe( .queue_lock = &client.recv_queue_write_lock, .queue = client.recv_queue, }); + log.debug("Client {d} subscribed to {s}", .{ id, msg.subject }); } fn unsubscribe( @@ -397,8 +414,10 @@ fn unsubscribe( const i = len - from_end - 1; const sub = server.subscriptions.items[i]; if (sub.client_id == id and eql(u8, sub.sid, msg.sid)) { + log.debug("Client {d} unsubscribed from {s}", .{ id, server.subscriptions.items[i].subject }); sub.deinit(gpa); _ = server.subscriptions.swapRemove(i); + break; } } } diff --git a/src/Server/parse.zig b/src/Server/parse.zig index 6ae99a5..2aca6e8 100644 --- a/src/Server/parse.zig +++ b/src/Server/parse.zig @@ -45,6 +45,8 @@ pub fn control(in: *Reader) !message.Control { break :blk min_len; }; std.debug.assert(in.buffer.len >= longest_ctrl); + // Wait until at least the enough text to parse the shortest control value is available + try in.fill(3); while (true) { var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); if (iter.next()) |str| { @@ -55,6 +57,7 @@ pub fn control(in: *Reader) !message.Control { return error.InvalidControl; } } + log.debug("filling more in control.", .{}); try in.fillMore(); } } @@ -146,7 +149,7 @@ pub fn @"pub"(in: *Reader) !Message.Pub { if (iter.next()) |bytes_str| { const bytes = try parseUnsigned(usize, bytes_str, 10); - if (in.buffered()[iter.index] == '\r') { + if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') { if (in.buffered().len < iter.index + bytes + 4) { try in.fill(iter.index + bytes + 4); // Fill may shift buffer, so we have to retokenize it. @@ -287,7 +290,7 @@ pub fn sub(in: *Reader) !Message.Sub { const queue_group = second; if (iter.next()) |sid| { - if (in.buffered()[iter.index] == '\r') { + if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') { if (in.buffered().len < iter.index + 2) { try in.fill(iter.index + 2); // Fill may shift buffer, so we have to retokenize it. @@ -380,27 +383,11 @@ pub fn unsub(in: *Reader) !Message.Unsub { // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1 while (true) { - var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); + var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r\n"); if (iter.next()) |sid| { - if (in.buffered()[iter.index] == '\r') { - if (in.buffered().len < iter.index + 2) { - try in.fill(iter.index + 2); - // Fill may shift buffer, so we have to retokenize it. - continue; - } - - // 2 bytes for CRLF at the end. - in.toss(iter.index + 2); - return .{ - .sid = sid, - .max_msgs = null, - }; - } - if (iter.next()) |max_msgs_str| { + if (in.buffered().len > iter.index) { if (in.buffered()[iter.index] == '\r') { - const max_msgs = try parseUnsigned(usize, max_msgs_str, 10); - if (in.buffered().len < iter.index + 2) { try in.fill(iter.index + 2); // Fill may shift buffer, so we have to retokenize it. @@ -409,16 +396,48 @@ pub fn unsub(in: *Reader) !Message.Unsub { // 2 bytes for CRLF at the end. in.toss(iter.index + 2); - return .{ .sid = sid, - .max_msgs = max_msgs, + .max_msgs = null, }; } + if (iter.next()) |max_msgs_str| { + if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') { + const max_msgs = try parseUnsigned(usize, max_msgs_str, 10); + + if (in.buffered().len < iter.index + 2) { + try in.fill(iter.index + 2); + // Fill may shift buffer, so we have to retokenize it. + continue; + } + + // 2 bytes for CRLF at the end. + in.toss(iter.index + 2); + + return .{ + .sid = sid, + .max_msgs = max_msgs, + }; + } + } } } - try in.fillMore(); + in.fillMore() catch |err| switch (err) { + error.EndOfStream => { + iter.reset(); + const sid = iter.next() orelse return error.EndOfStream; + const max_msgs = if (iter.next()) |max_msgs_str| blk: { + log.debug("max_msgs: {any}", .{max_msgs_str}); + break :blk try parseUnsigned(usize, max_msgs_str, 10); + } else null; + return .{ + .sid = sid, + .max_msgs = max_msgs, + }; + }, + else => |e| return e, + }; } } @@ -478,6 +497,20 @@ test unsub { try unsub(&in.interface), ); } + { + var buf: [64]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = " 1\r" }, + .{ .buffer = "\n" }, + }); + try std.testing.expectEqualDeep( + Message.Unsub{ + .sid = "1", + .max_msgs = null, + }, + try unsub(&in.interface), + ); + } } /// The return value is owned by the reader passed to this function. @@ -521,28 +554,26 @@ pub fn hpub(in: *Reader) !Message.HPub { const reply_to = second; const header_bytes_str = third; if (iter.next()) |total_bytes_str| { - if (in.buffered().len > iter.index) { - if (in.buffered()[iter.index] == '\r') { - const header_bytes = try parseUnsigned(usize, header_bytes_str, 10); - const total_bytes = try parseUnsigned(usize, total_bytes_str, 10); - - if (in.buffered().len < iter.index + total_bytes + 4) { - try in.fill(iter.index + total_bytes + 4); - continue; - } - - // 4 bytes for CRLF on either side of headers and payload. - in.toss(iter.index + 2); - defer in.toss(2); - return .{ - .header_bytes = header_bytes, - .@"pub" = .{ - .subject = subject, - .reply_to = reply_to, - .payload = in.take(total_bytes) catch unreachable, - }, - }; + if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') { + const header_bytes = try parseUnsigned(usize, header_bytes_str, 10); + const total_bytes = try parseUnsigned(usize, total_bytes_str, 10); + + if (in.buffered().len < iter.index + total_bytes + 4) { + try in.fill(iter.index + total_bytes + 4); + continue; } + + // 4 bytes for CRLF on either side of headers and payload. + in.toss(iter.index + 2); + defer in.toss(2); + return .{ + .header_bytes = header_bytes, + .@"pub" = .{ + .subject = subject, + .reply_to = reply_to, + .payload = in.take(total_bytes) catch unreachable, + }, + }; } } } @@ -593,7 +624,12 @@ test hpub { // TODO: more tests } -pub fn connect(alloc: Allocator, in: *Reader) !Message.Connect { +pub fn connect(alloc: Allocator, in: *Reader) error{ + EndOfStream, + ReadFailed, + OutOfMemory, + InvalidStream, +}!Message.Connect { // for storing the json string var connect_string_writer_allocating: AllocatingWriter = .init(alloc); defer connect_string_writer_allocating.deinit(); @@ -607,18 +643,21 @@ pub fn connect(alloc: Allocator, in: *Reader) !Message.Connect { try in.discardAll(1); // throw away space // Should read the next JSON object to the fixed buffer writer. - _ = try in.streamDelimiter(connect_string_writer, '}'); - try connect_string_writer.writeByte('}'); + _ = in.streamDelimiter(connect_string_writer, '}') catch |err| switch (err) { + error.WriteFailed => return error.OutOfMemory, + else => |e| return e, + }; + connect_string_writer.writeByte('}') catch return error.OutOfMemory; try expectStreamBytes(in, "}\r\n"); // discard '}\r\n' const connect_str = try connect_string_writer_allocating.toOwnedSlice(); defer alloc.free(connect_str); - const res = try std.json.parseFromSliceLeaky( + const res = std.json.parseFromSliceLeaky( Message.Connect, connect_allocator, connect_str, .{ .allocate = .alloc_always }, - ); + ) catch return error.InvalidStream; return res.dupe(alloc); } -- cgit