diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-08 21:41:49 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-08 22:35:35 -0500 |
| commit | ed9911596915bfaefaae7b784af30502a13027b0 (patch) | |
| tree | 768c1ad445321f0641efbdac79c8585da5a709b0 /src/Server.zig | |
| parent | d8488fde4902565f4ac8519565f234918dab6b11 (diff) | |
More robust parsing and error propagation
Diffstat (limited to 'src/Server.zig')
| -rw-r--r-- | src/Server.zig | 41 |
1 files changed, 30 insertions, 11 deletions
diff --git a/src/Server.zig b/src/Server.zig index 4ae959f..68a7c49 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -197,28 +197,49 @@ fn handleConnection( defer client.recv_queue_write_lock.unlock(io); _ = try client.from_client.take(2); try client.recv_queue.putAll(io, "PONG\r\n"); - // try client.send(io, "PONG\r\n"); }, .PUB => { @branchHint(.likely); // log.debug("received a pub msg", .{}); - try server.publishMessage(io, server_allocator, &client, .@"pub"); + server.publishMessage(io, server_allocator, &client, .@"pub") catch |err| switch (err) { + error.WriteFailed => return writer.err.?, + error.ReadFailed => return reader.err.?, + error.EndOfStream => return error.ClientDisconnected, + else => |e| return e, + }; }, .HPUB => { @branchHint(.likely); - try server.publishMessage(io, server_allocator, &client, .hpub); + server.publishMessage(io, server_allocator, &client, .hpub) catch |err| switch (err) { + error.WriteFailed => return writer.err.?, + error.ReadFailed => return reader.err.?, + error.EndOfStream => return error.ClientDisconnected, + else => |e| return e, + }; }, .SUB => { - try server.subscribe(io, server_allocator, &client, id); + server.subscribe(io, server_allocator, &client, id) catch |err| switch (err) { + error.ReadFailed => return reader.err.?, + error.EndOfStream => return error.ClientDisconnected, + else => |e| return e, + }; }, .UNSUB => { - try server.unsubscribe(io, server_allocator, client, id); + server.unsubscribe(io, server_allocator, client, id) catch |err| switch (err) { + error.ReadFailed => return reader.err.?, + error.EndOfStream => return error.ClientDisconnected, + else => |e| return e, + }; }, .CONNECT => { if (client.connect) |*current| { current.deinit(server_allocator); } - client.connect = try parse.connect(server_allocator, client.from_client); + client.connect = parse.connect(server_allocator, client.from_client) catch |err| switch (err) { + error.ReadFailed => return reader.err.?, + error.EndOfStream => return error.ClientDisconnected, + else => |e| return e, + }; }, else => |e| { panic("Unimplemented message: {any}\n", .{e}); @@ -291,11 +312,6 @@ fn publishMessage( .hpub => hpubmsg.@"pub", }; - // const subject = switch (pub_or_hpub) { - // .PUB => |pb| pb.subject, - // .HPUB => |hp| hp.@"pub".subject, - // else => unreachable, - // }; try server.subs_lock.lock(io); defer server.subs_lock.unlock(io); var published_queue_groups: ArrayList([]const u8) = .empty; @@ -380,6 +396,7 @@ fn subscribe( .queue_lock = &client.recv_queue_write_lock, .queue = client.recv_queue, }); + log.debug("Client {d} subscribed to {s}", .{ id, msg.subject }); } fn unsubscribe( @@ -397,8 +414,10 @@ fn unsubscribe( const i = len - from_end - 1; const sub = server.subscriptions.items[i]; if (sub.client_id == id and eql(u8, sub.sid, msg.sid)) { + log.debug("Client {d} unsubscribed from {s}", .{ id, server.subscriptions.items[i].subject }); sub.deinit(gpa); _ = server.subscriptions.swapRemove(i); + break; } } } |
