diff options
Diffstat (limited to 'src/server/main.zig')
| -rw-r--r-- | src/server/main.zig | 46 |
1 files changed, 35 insertions, 11 deletions
diff --git a/src/server/main.zig b/src/server/main.zig index 81c8fbd..ba74987 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -83,6 +83,10 @@ pub fn start(server: *Server, io: std.Io, gpa: std.mem.Allocator) !void { server.info.port, ), io, .{ .reuse_address = true }); defer tcp_server.deinit(io); + std.log.debug("Server headers: {s}", .{if (server.info.headers) "true" else "false"}); + std.log.debug("Server max payload: {d}", .{server.info.max_payload}); + std.log.info("Server ID: {s}", .{server.info.server_id}); + std.log.info("Server name: {s}", .{server.info.server_name}); std.log.info("Server listening on {s}:{d}", .{ server.info.host, server.info.port }); var client_group: std.Io.Group = .init; @@ -185,9 +189,13 @@ fn handleConnection( // Respond to ping with pong. try client.send(io, .pong); }, - .@"pub" => |pb| { - defer pb.deinit(server_allocator); - try server.publishMessage(io, server_allocator, &client, pb); + .@"pub", .hpub => { + defer switch (msg) { + .@"pub" => |pb| pb.deinit(server_allocator), + .hpub => |hp| hp.deinit(server_allocator), + else => unreachable, + }; + try server.publishMessage(io, server_allocator, &client, msg); }, .sub => |sub| { try server.subscribe(io, server_allocator, id, sub); @@ -248,7 +256,7 @@ test subjectMatches { try std.testing.expect(subjectMatches("foo.>", "foo.bar.baz")); } -fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void { +fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message) !void { errdefer { if (source_client.connect) |c| { if (c.verbose) { @@ -256,20 +264,36 @@ fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_ } } } + const subject = switch (msg) { + .@"pub" => |pb| pb.subject, + .hpub => |hp| hp.@"pub".subject, + else => unreachable, + }; try server.subs_lock.lock(io); defer server.subs_lock.unlock(io); for (server.subscriptions.items) |subscription| { - if (subjectMatches(subscription.subject, msg.subject)) { + if (subjectMatches(subscription.subject, subject)) { const client = server.clients.get(subscription.client_id) orelse { std.debug.print("trying to publish to a client that no longer exists: {d}\n", .{subscription.client_id}); continue; }; - client.send(io, .{ - .msg = try msg.toMsg(alloc, subscription.sid), - }) catch |err| switch (err) { - error.Canceled => return err, - else => {}, - }; + + switch (msg) { + .@"pub" => |pb| client.send(io, .{ + .msg = try pb.toMsg(alloc, subscription.sid), + }) catch |err| switch (err) { + error.Canceled => return err, + else => {}, + }, + .hpub => |hp| client.send(io, .{ .hmsg = try hp.toHMsg( + alloc, + subscription.sid, + ) }) catch |err| switch (err) { + error.Canceled => return err, + else => {}, + }, + else => unreachable, + } } } if (source_client.connect) |c| { |
