diff options
Diffstat (limited to 'src/server/main.zig')
| -rw-r--r-- | src/server/main.zig | 43 |
1 files changed, 21 insertions, 22 deletions
diff --git a/src/server/main.zig b/src/server/main.zig index e5f6e59..0efefc3 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -17,8 +17,6 @@ clients: std.AutoHashMapUnmanaged(usize, *ClientState) = .empty, subs_lock: std.Io.Mutex = .init, subscriptions: std.ArrayList(Subscription) = .empty, -msg_queue: std.Io.Queue(Message.Pub), - var keep_running = std.atomic.Value(bool).init(true); fn handleSigInt(sig: std.os.linux.SIG) callconv(.c) void { @@ -37,22 +35,14 @@ pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void { // // Register the handler for SIGINT (Ctrl+C) // std.posix.sigaction(std.posix.SIG.INT, &act, null); - // 64 mb buffer for messages - const queue_buf = try gpa.alloc(Message.Pub, 1024 * 1024); - defer gpa.free(queue_buf); - var server: Server = .{ .info = server_config, - .msg_queue = .init(queue_buf), }; var threaded: std.Io.Threaded = .init(gpa, .{}); defer threaded.deinit(); const io = threaded.io(); - var msgProcess = try io.concurrent(processMsgs, .{ &server, io, gpa }); - defer msgProcess.cancel(io) catch {}; - var tcp_server = try std.Io.net.IpAddress.listen(try std.Io.net.IpAddress.parse( server.info.host, server.info.port, @@ -75,9 +65,9 @@ pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void { std.debug.print("Exiting gracefully\n", .{}); } -fn processMsgs(server: *Server, io: std.Io, alloc: std.mem.Allocator) !void { +fn processMsgs(server: *Server, io: std.Io, alloc: std.mem.Allocator) void { while (true) { - const msg = try server.msg_queue.getOne(io); + const msg = server.msg_queue.getOne(io) catch break; defer msg.deinit(alloc); for (server.subscriptions.items) |subscription| { @@ -86,12 +76,12 @@ fn processMsgs(server: *Server, io: std.Io, alloc: std.mem.Allocator) !void { std.debug.print("trying to publish to a client that no longer exists: {d}", .{subscription.client_id}); continue; }; - try client.send(io, .{ .msg = .{ + client.send(io, .{ .msg = .{ .subject = msg.subject, .sid = subscription.sid, .reply_to = msg.reply_to, .payload = msg.payload, - } }); + } }) catch continue; } } } @@ -157,8 +147,7 @@ fn handleConnection( try client_state.send(io, .pong); }, .@"pub" => |pb| { - // Do not free pb, server.publishMessage takes ownership. - try server.publishMessage(io, pb); + try server.publishMessage(io, server_allocator, pb); if (client_state.connect) |c| { if (c.verbose) { try client_state.send(io, .@"+ok"); @@ -194,12 +183,22 @@ fn subjectMatches(expected: []const u8, actual: []const u8) bool { return std.mem.eql(u8, expected, actual); } -fn publishMessage(server: *Server, io: std.Io, msg: Message.Pub) !void { - try server.msg_queue.putOne(io, .{ - .payload = msg.payload, - .reply_to = msg.reply_to, - .subject = msg.subject, - }); +fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, msg: Message.Pub) !void { + defer msg.deinit(gpa); + for (server.subscriptions.items) |subscription| { + if (subjectMatches(subscription.subject, msg.subject)) { + const client = server.clients.get(subscription.client_id) orelse { + std.debug.print("trying to publish to a client that no longer exists: {d}", .{subscription.client_id}); + continue; + }; + client.send(io, .{ .msg = .{ + .subject = msg.subject, + .sid = subscription.sid, + .reply_to = msg.reply_to, + .payload = msg.payload, + } }) catch continue; + } + } } fn subscribe(server: *Server, io: std.Io, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void { |
