summaryrefslogtreecommitdiff
path: root/src/server/main.zig
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-02 16:01:35 +0000
committerRobby Zambito <contact@robbyzambito.me>2026-01-02 18:10:25 +0000
commitcd5281030ee6cede5a39f8360d47c6c9ed9269d3 (patch)
treeb639b5f6d9e9fdbdb6d6a0b080f8886f3eec05f8 /src/server/main.zig
parent2be370e379959e2763e70851cf14ecfca07754fc (diff)
Diffstat (limited to 'src/server/main.zig')
-rw-r--r--src/server/main.zig46
1 files changed, 35 insertions, 11 deletions
diff --git a/src/server/main.zig b/src/server/main.zig
index 81c8fbd..ba74987 100644
--- a/src/server/main.zig
+++ b/src/server/main.zig
@@ -83,6 +83,10 @@ pub fn start(server: *Server, io: std.Io, gpa: std.mem.Allocator) !void {
server.info.port,
), io, .{ .reuse_address = true });
defer tcp_server.deinit(io);
+ std.log.debug("Server headers: {s}", .{if (server.info.headers) "true" else "false"});
+ std.log.debug("Server max payload: {d}", .{server.info.max_payload});
+ std.log.info("Server ID: {s}", .{server.info.server_id});
+ std.log.info("Server name: {s}", .{server.info.server_name});
std.log.info("Server listening on {s}:{d}", .{ server.info.host, server.info.port });
var client_group: std.Io.Group = .init;
@@ -185,9 +189,13 @@ fn handleConnection(
// Respond to ping with pong.
try client.send(io, .pong);
},
- .@"pub" => |pb| {
- defer pb.deinit(server_allocator);
- try server.publishMessage(io, server_allocator, &client, pb);
+ .@"pub", .hpub => {
+ defer switch (msg) {
+ .@"pub" => |pb| pb.deinit(server_allocator),
+ .hpub => |hp| hp.deinit(server_allocator),
+ else => unreachable,
+ };
+ try server.publishMessage(io, server_allocator, &client, msg);
},
.sub => |sub| {
try server.subscribe(io, server_allocator, id, sub);
@@ -248,7 +256,7 @@ test subjectMatches {
try std.testing.expect(subjectMatches("foo.>", "foo.bar.baz"));
}
-fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void {
+fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message) !void {
errdefer {
if (source_client.connect) |c| {
if (c.verbose) {
@@ -256,20 +264,36 @@ fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_
}
}
}
+ const subject = switch (msg) {
+ .@"pub" => |pb| pb.subject,
+ .hpub => |hp| hp.@"pub".subject,
+ else => unreachable,
+ };
try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io);
for (server.subscriptions.items) |subscription| {
- if (subjectMatches(subscription.subject, msg.subject)) {
+ if (subjectMatches(subscription.subject, subject)) {
const client = server.clients.get(subscription.client_id) orelse {
std.debug.print("trying to publish to a client that no longer exists: {d}\n", .{subscription.client_id});
continue;
};
- client.send(io, .{
- .msg = try msg.toMsg(alloc, subscription.sid),
- }) catch |err| switch (err) {
- error.Canceled => return err,
- else => {},
- };
+
+ switch (msg) {
+ .@"pub" => |pb| client.send(io, .{
+ .msg = try pb.toMsg(alloc, subscription.sid),
+ }) catch |err| switch (err) {
+ error.Canceled => return err,
+ else => {},
+ },
+ .hpub => |hp| client.send(io, .{ .hmsg = try hp.toHMsg(
+ alloc,
+ subscription.sid,
+ ) }) catch |err| switch (err) {
+ error.Canceled => return err,
+ else => {},
+ },
+ else => unreachable,
+ }
}
}
if (source_client.connect) |c| {