diff options
Diffstat (limited to 'src/Server.zig')
| -rw-r--r-- | src/Server.zig | 22 |
1 files changed, 10 insertions, 12 deletions
diff --git a/src/Server.zig b/src/Server.zig index 7e30b76..21eecb4 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -200,32 +200,30 @@ fn handleConnection( var client_task = try io.concurrent(Client.start, .{ &client, io }); defer client_task.cancel(io) catch {}; - // Messages are owned by the server after they are received from the client - while (client.next(server_allocator)) |msg| { - switch (msg) { + while (client.next(server_allocator)) |ctrl| { + switch (ctrl) { .PING => { // Respond to ping with pong. - try client.send(io, .PONG); + try client.recv_queue_write_lock.lock(io); + defer client.recv_queue_write_lock.unlock(io); + try client.send(io, "PONG\r\n"); }, - .PUB => |pb| { + .PUB => { @branchHint(.likely); - defer pb.deinit(server_allocator); try server.publishMessage(io, server_allocator, &client, msg); }, - .HPUB => |hp| { + .HPUB => { @branchHint(.likely); - defer hp.deinit(server_allocator); try server.publishMessage(io, server_allocator, &client, msg); }, - .SUB => |sub| { - defer sub.deinit(server_allocator); + .SUB => { try server.subscribe(io, server_allocator, client, id, sub); }, - .UNSUB => |unsub| { + .UNSUB => { defer unsub.deinit(server_allocator); try server.unsubscribe(io, server_allocator, id, unsub); }, - .CONNECT => |connect| { + .CONNECT => { if (client.connect) |*current| { current.deinit(server_allocator); } |
