summaryrefslogtreecommitdiff
path: root/src/Server.zig
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-08 21:41:49 -0500
committerRobby Zambito <contact@robbyzambito.me>2026-01-08 22:35:35 -0500
commited9911596915bfaefaae7b784af30502a13027b0 (patch)
tree768c1ad445321f0641efbdac79c8585da5a709b0 /src/Server.zig
parentd8488fde4902565f4ac8519565f234918dab6b11 (diff)
More robust parsing and error propagation
Diffstat (limited to 'src/Server.zig')
-rw-r--r--src/Server.zig41
1 files changed, 30 insertions, 11 deletions
diff --git a/src/Server.zig b/src/Server.zig
index 4ae959f..68a7c49 100644
--- a/src/Server.zig
+++ b/src/Server.zig
@@ -197,28 +197,49 @@ fn handleConnection(
defer client.recv_queue_write_lock.unlock(io);
_ = try client.from_client.take(2);
try client.recv_queue.putAll(io, "PONG\r\n");
- // try client.send(io, "PONG\r\n");
},
.PUB => {
@branchHint(.likely);
// log.debug("received a pub msg", .{});
- try server.publishMessage(io, server_allocator, &client, .@"pub");
+ server.publishMessage(io, server_allocator, &client, .@"pub") catch |err| switch (err) {
+ error.WriteFailed => return writer.err.?,
+ error.ReadFailed => return reader.err.?,
+ error.EndOfStream => return error.ClientDisconnected,
+ else => |e| return e,
+ };
},
.HPUB => {
@branchHint(.likely);
- try server.publishMessage(io, server_allocator, &client, .hpub);
+ server.publishMessage(io, server_allocator, &client, .hpub) catch |err| switch (err) {
+ error.WriteFailed => return writer.err.?,
+ error.ReadFailed => return reader.err.?,
+ error.EndOfStream => return error.ClientDisconnected,
+ else => |e| return e,
+ };
},
.SUB => {
- try server.subscribe(io, server_allocator, &client, id);
+ server.subscribe(io, server_allocator, &client, id) catch |err| switch (err) {
+ error.ReadFailed => return reader.err.?,
+ error.EndOfStream => return error.ClientDisconnected,
+ else => |e| return e,
+ };
},
.UNSUB => {
- try server.unsubscribe(io, server_allocator, client, id);
+ server.unsubscribe(io, server_allocator, client, id) catch |err| switch (err) {
+ error.ReadFailed => return reader.err.?,
+ error.EndOfStream => return error.ClientDisconnected,
+ else => |e| return e,
+ };
},
.CONNECT => {
if (client.connect) |*current| {
current.deinit(server_allocator);
}
- client.connect = try parse.connect(server_allocator, client.from_client);
+ client.connect = parse.connect(server_allocator, client.from_client) catch |err| switch (err) {
+ error.ReadFailed => return reader.err.?,
+ error.EndOfStream => return error.ClientDisconnected,
+ else => |e| return e,
+ };
},
else => |e| {
panic("Unimplemented message: {any}\n", .{e});
@@ -291,11 +312,6 @@ fn publishMessage(
.hpub => hpubmsg.@"pub",
};
- // const subject = switch (pub_or_hpub) {
- // .PUB => |pb| pb.subject,
- // .HPUB => |hp| hp.@"pub".subject,
- // else => unreachable,
- // };
try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io);
var published_queue_groups: ArrayList([]const u8) = .empty;
@@ -380,6 +396,7 @@ fn subscribe(
.queue_lock = &client.recv_queue_write_lock,
.queue = client.recv_queue,
});
+ log.debug("Client {d} subscribed to {s}", .{ id, msg.subject });
}
fn unsubscribe(
@@ -397,8 +414,10 @@ fn unsubscribe(
const i = len - from_end - 1;
const sub = server.subscriptions.items[i];
if (sub.client_id == id and eql(u8, sub.sid, msg.sid)) {
+ log.debug("Client {d} unsubscribed from {s}", .{ id, server.subscriptions.items[i].subject });
sub.deinit(gpa);
_ = server.subscriptions.swapRemove(i);
+ break;
}
}
}