summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-01 02:40:51 +0000
committerRobby Zambito <contact@robbyzambito.me>2026-01-01 05:22:02 +0000
commit0233bc278c263d356d427ca9df48028bfac42515 (patch)
treea326f13577ce11a65c124eea42b3cb0f55e1c300 /src
parentb447883d106f0ac427b0b0a00a8015be8eb4730c (diff)
Avoid queues completely
This is quite slow
Diffstat (limited to 'src')
-rw-r--r--src/server/main.zig43
1 files changed, 21 insertions, 22 deletions
diff --git a/src/server/main.zig b/src/server/main.zig
index e5f6e59..0efefc3 100644
--- a/src/server/main.zig
+++ b/src/server/main.zig
@@ -17,8 +17,6 @@ clients: std.AutoHashMapUnmanaged(usize, *ClientState) = .empty,
subs_lock: std.Io.Mutex = .init,
subscriptions: std.ArrayList(Subscription) = .empty,
-msg_queue: std.Io.Queue(Message.Pub),
-
var keep_running = std.atomic.Value(bool).init(true);
fn handleSigInt(sig: std.os.linux.SIG) callconv(.c) void {
@@ -37,22 +35,14 @@ pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void {
// // Register the handler for SIGINT (Ctrl+C)
// std.posix.sigaction(std.posix.SIG.INT, &act, null);
- // 64 mb buffer for messages
- const queue_buf = try gpa.alloc(Message.Pub, 1024 * 1024);
- defer gpa.free(queue_buf);
-
var server: Server = .{
.info = server_config,
- .msg_queue = .init(queue_buf),
};
var threaded: std.Io.Threaded = .init(gpa, .{});
defer threaded.deinit();
const io = threaded.io();
- var msgProcess = try io.concurrent(processMsgs, .{ &server, io, gpa });
- defer msgProcess.cancel(io) catch {};
-
var tcp_server = try std.Io.net.IpAddress.listen(try std.Io.net.IpAddress.parse(
server.info.host,
server.info.port,
@@ -75,9 +65,9 @@ pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void {
std.debug.print("Exiting gracefully\n", .{});
}
-fn processMsgs(server: *Server, io: std.Io, alloc: std.mem.Allocator) !void {
+fn processMsgs(server: *Server, io: std.Io, alloc: std.mem.Allocator) void {
while (true) {
- const msg = try server.msg_queue.getOne(io);
+ const msg = server.msg_queue.getOne(io) catch break;
defer msg.deinit(alloc);
for (server.subscriptions.items) |subscription| {
@@ -86,12 +76,12 @@ fn processMsgs(server: *Server, io: std.Io, alloc: std.mem.Allocator) !void {
std.debug.print("trying to publish to a client that no longer exists: {d}", .{subscription.client_id});
continue;
};
- try client.send(io, .{ .msg = .{
+ client.send(io, .{ .msg = .{
.subject = msg.subject,
.sid = subscription.sid,
.reply_to = msg.reply_to,
.payload = msg.payload,
- } });
+ } }) catch continue;
}
}
}
@@ -157,8 +147,7 @@ fn handleConnection(
try client_state.send(io, .pong);
},
.@"pub" => |pb| {
- // Do not free pb, server.publishMessage takes ownership.
- try server.publishMessage(io, pb);
+ try server.publishMessage(io, server_allocator, pb);
if (client_state.connect) |c| {
if (c.verbose) {
try client_state.send(io, .@"+ok");
@@ -194,12 +183,22 @@ fn subjectMatches(expected: []const u8, actual: []const u8) bool {
return std.mem.eql(u8, expected, actual);
}
-fn publishMessage(server: *Server, io: std.Io, msg: Message.Pub) !void {
- try server.msg_queue.putOne(io, .{
- .payload = msg.payload,
- .reply_to = msg.reply_to,
- .subject = msg.subject,
- });
+fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, msg: Message.Pub) !void {
+ defer msg.deinit(gpa);
+ for (server.subscriptions.items) |subscription| {
+ if (subjectMatches(subscription.subject, msg.subject)) {
+ const client = server.clients.get(subscription.client_id) orelse {
+ std.debug.print("trying to publish to a client that no longer exists: {d}", .{subscription.client_id});
+ continue;
+ };
+ client.send(io, .{ .msg = .{
+ .subject = msg.subject,
+ .sid = subscription.sid,
+ .reply_to = msg.reply_to,
+ .payload = msg.payload,
+ } }) catch continue;
+ }
+ }
}
fn subscribe(server: *Server, io: std.Io, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void {