summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-06 13:42:30 -0500
committerRobby Zambito <contact@robbyzambito.me>2026-01-06 14:06:22 -0500
commitc676a8218e9d491f96947c6c41e645a77bb99480 (patch)
tree3156c6a145bc42cd5cb5e510628c2f509ebe1b05 /src
parent81a93654a18d63b287fafda3f893cecf66e6edad (diff)
Support queue groups
Diffstat (limited to 'src')
-rw-r--r--src/server/Server.zig42
1 files changed, 38 insertions, 4 deletions
diff --git a/src/server/Server.zig b/src/server/Server.zig
index 29ebab9..e5f199f 100644
--- a/src/server/Server.zig
+++ b/src/server/Server.zig
@@ -26,6 +26,7 @@ pub const Subscription = struct {
subject: []const u8,
client_id: usize,
sid: []const u8,
+ queue_group: ?[]const u8,
queue: *Queue(Msgs),
// used to alloc messages in the queue
alloc: Allocator,
@@ -33,6 +34,7 @@ pub const Subscription = struct {
fn deinit(self: Subscription, alloc: Allocator) void {
alloc.free(self.subject);
alloc.free(self.sid);
+ if (self.queue_group) |g| alloc.free(g);
}
};
@@ -201,12 +203,12 @@ fn handleConnection(
.PUB => |pb| {
@branchHint(.likely);
defer pb.deinit(server_allocator);
- try server.publishMessage(io, &client, msg);
+ try server.publishMessage(io, server_allocator, &client, msg);
},
.HPUB => |hp| {
@branchHint(.likely);
defer hp.deinit(server_allocator);
- try server.publishMessage(io, &client, msg);
+ try server.publishMessage(io, server_allocator, &client, msg);
},
.SUB => |sub| {
defer sub.deinit(server_allocator);
@@ -270,7 +272,13 @@ test subjectMatches {
try expect(subjectMatches("foo.>", "foo.bar.baz"));
}
-fn publishMessage(server: *Server, io: Io, source_client: *Client, msg: Message) !void {
+fn publishMessage(
+ server: *Server,
+ io: Io,
+ alloc: Allocator,
+ source_client: *Client,
+ msg: Message,
+) !void {
defer if (source_client.connect) |c| {
if (c.verbose) {
source_client.send(io, .@"+OK") catch {};
@@ -284,8 +292,26 @@ fn publishMessage(server: *Server, io: Io, source_client: *Client, msg: Message)
};
try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io);
- for (server.subscriptions.items) |subscription| {
+ var published_queue_groups: ArrayList([]const u8) = .empty;
+ defer published_queue_groups.deinit(alloc);
+ var published_queue_sub_idxs: ArrayList(usize) = .empty;
+ defer published_queue_sub_idxs.deinit(alloc);
+
+ subs: for (0..server.subscriptions.items.len) |i| {
+ const subscription = server.subscriptions.items[i];
if (subjectMatches(subscription.subject, subject)) {
+ if (subscription.queue_group) |sg| {
+ for (published_queue_groups.items) |g| {
+ if (eql(u8, g, sg)) {
+ continue :subs;
+ }
+ }
+ // Don't republish to the same queue
+ try published_queue_groups.append(alloc, sg);
+ // Move this index to the end of the subscription list,
+ // to prioritize other subscriptions in the queue next time.
+ try published_queue_sub_idxs.append(alloc, i);
+ }
switch (msg) {
.PUB => |pb| {
try subscription.queue.putOne(io, .{
@@ -301,6 +327,11 @@ fn publishMessage(server: *Server, io: Io, source_client: *Client, msg: Message)
}
}
}
+
+ for (0..published_queue_sub_idxs.items.len) |from_end| {
+ const i = published_queue_sub_idxs.items.len - from_end - 1;
+ server.subscriptions.appendAssumeCapacity(server.subscriptions.orderedRemove(i));
+ }
}
fn subscribe(
@@ -317,10 +348,13 @@ fn subscribe(
errdefer gpa.free(subject);
const sid = try gpa.dupe(u8, msg.sid);
errdefer gpa.free(sid);
+ const queue_group = if (msg.queue_group) |q| try gpa.dupe(u8, q) else null;
+ errdefer if (queue_group) |q| gpa.free(q);
try server.subscriptions.append(gpa, .{
.subject = subject,
.client_id = id,
.sid = sid,
+ .queue_group = queue_group,
.queue = client.msg_queue,
.alloc = client.alloc,
});