summaryrefslogtreecommitdiff
path: root/src/Server.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/Server.zig')
-rw-r--r--src/Server.zig22
1 files changed, 10 insertions, 12 deletions
diff --git a/src/Server.zig b/src/Server.zig
index 7e30b76..21eecb4 100644
--- a/src/Server.zig
+++ b/src/Server.zig
@@ -200,32 +200,30 @@ fn handleConnection(
var client_task = try io.concurrent(Client.start, .{ &client, io });
defer client_task.cancel(io) catch {};
- // Messages are owned by the server after they are received from the client
- while (client.next(server_allocator)) |msg| {
- switch (msg) {
+ while (client.next(server_allocator)) |ctrl| {
+ switch (ctrl) {
.PING => {
// Respond to ping with pong.
- try client.send(io, .PONG);
+ try client.recv_queue_write_lock.lock(io);
+ defer client.recv_queue_write_lock.unlock(io);
+ try client.send(io, "PONG\r\n");
},
- .PUB => |pb| {
+ .PUB => {
@branchHint(.likely);
- defer pb.deinit(server_allocator);
try server.publishMessage(io, server_allocator, &client, msg);
},
- .HPUB => |hp| {
+ .HPUB => {
@branchHint(.likely);
- defer hp.deinit(server_allocator);
try server.publishMessage(io, server_allocator, &client, msg);
},
- .SUB => |sub| {
- defer sub.deinit(server_allocator);
+ .SUB => {
try server.subscribe(io, server_allocator, client, id, sub);
},
- .UNSUB => |unsub| {
+ .UNSUB => {
defer unsub.deinit(server_allocator);
try server.unsubscribe(io, server_allocator, id, unsub);
},
- .CONNECT => |connect| {
+ .CONNECT => {
if (client.connect) |*current| {
current.deinit(server_allocator);
}