diff options
Diffstat (limited to 'src/server/Server.zig')
| -rw-r--r-- | src/server/Server.zig | 42 |
1 files changed, 38 insertions, 4 deletions
diff --git a/src/server/Server.zig b/src/server/Server.zig index 29ebab9..e5f199f 100644 --- a/src/server/Server.zig +++ b/src/server/Server.zig @@ -26,6 +26,7 @@ pub const Subscription = struct { subject: []const u8, client_id: usize, sid: []const u8, + queue_group: ?[]const u8, queue: *Queue(Msgs), // used to alloc messages in the queue alloc: Allocator, @@ -33,6 +34,7 @@ pub const Subscription = struct { fn deinit(self: Subscription, alloc: Allocator) void { alloc.free(self.subject); alloc.free(self.sid); + if (self.queue_group) |g| alloc.free(g); } }; @@ -201,12 +203,12 @@ fn handleConnection( .PUB => |pb| { @branchHint(.likely); defer pb.deinit(server_allocator); - try server.publishMessage(io, &client, msg); + try server.publishMessage(io, server_allocator, &client, msg); }, .HPUB => |hp| { @branchHint(.likely); defer hp.deinit(server_allocator); - try server.publishMessage(io, &client, msg); + try server.publishMessage(io, server_allocator, &client, msg); }, .SUB => |sub| { defer sub.deinit(server_allocator); @@ -270,7 +272,13 @@ test subjectMatches { try expect(subjectMatches("foo.>", "foo.bar.baz")); } -fn publishMessage(server: *Server, io: Io, source_client: *Client, msg: Message) !void { +fn publishMessage( + server: *Server, + io: Io, + alloc: Allocator, + source_client: *Client, + msg: Message, +) !void { defer if (source_client.connect) |c| { if (c.verbose) { source_client.send(io, .@"+OK") catch {}; @@ -284,8 +292,26 @@ fn publishMessage(server: *Server, io: Io, source_client: *Client, msg: Message) }; try server.subs_lock.lock(io); defer server.subs_lock.unlock(io); - for (server.subscriptions.items) |subscription| { + var published_queue_groups: ArrayList([]const u8) = .empty; + defer published_queue_groups.deinit(alloc); + var published_queue_sub_idxs: ArrayList(usize) = .empty; + defer published_queue_sub_idxs.deinit(alloc); + + subs: for (0..server.subscriptions.items.len) |i| { + const subscription = server.subscriptions.items[i]; if (subjectMatches(subscription.subject, subject)) { + if (subscription.queue_group) |sg| { + for (published_queue_groups.items) |g| { + if (eql(u8, g, sg)) { + continue :subs; + } + } + // Don't republish to the same queue + try published_queue_groups.append(alloc, sg); + // Move this index to the end of the subscription list, + // to prioritize other subscriptions in the queue next time. + try published_queue_sub_idxs.append(alloc, i); + } switch (msg) { .PUB => |pb| { try subscription.queue.putOne(io, .{ @@ -301,6 +327,11 @@ fn publishMessage(server: *Server, io: Io, source_client: *Client, msg: Message) } } } + + for (0..published_queue_sub_idxs.items.len) |from_end| { + const i = published_queue_sub_idxs.items.len - from_end - 1; + server.subscriptions.appendAssumeCapacity(server.subscriptions.orderedRemove(i)); + } } fn subscribe( @@ -317,10 +348,13 @@ fn subscribe( errdefer gpa.free(subject); const sid = try gpa.dupe(u8, msg.sid); errdefer gpa.free(sid); + const queue_group = if (msg.queue_group) |q| try gpa.dupe(u8, q) else null; + errdefer if (queue_group) |q| gpa.free(q); try server.subscriptions.append(gpa, .{ .subject = subject, .client_id = id, .sid = sid, + .queue_group = queue_group, .queue = client.msg_queue, .alloc = client.alloc, }); |
