From 6e9f6998bd15f3c4dabd0ba28a1973573acdc765 Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Tue, 6 Jan 2026 10:03:03 -0500 Subject: Use client allocator to own incoming messages to a client --- src/server/Client.zig | 14 ++++++++----- src/server/Server.zig | 55 +++++++++++++++++++++++++++++---------------------- 2 files changed, 40 insertions(+), 29 deletions(-) (limited to 'src') diff --git a/src/server/Client.zig b/src/server/Client.zig index d099ecb..690cabf 100644 --- a/src/server/Client.zig +++ b/src/server/Client.zig @@ -10,6 +10,8 @@ pub const Msgs = union(enum) { }; connect: ?Message.Connect, +// Used to own messages that we receive in our queues. +alloc: std.mem.Allocator, // Messages for this client to receive. recv_queue: *Queue(Message), @@ -20,6 +22,7 @@ to_client: *std.Io.Writer, pub fn init( connect: ?Message.Connect, + alloc: std.mem.Allocator, recv_queue: *Queue(Message), msg_queue: *Queue(Msgs), in: *std.Io.Reader, @@ -27,6 +30,7 @@ pub fn init( ) Client { return .{ .connect = connect, + .alloc = alloc, .recv_queue = recv_queue, .msg_queue = msg_queue, .from_client = in, @@ -41,7 +45,7 @@ pub fn deinit(self: *Client, alloc: std.mem.Allocator) void { self.* = undefined; } -pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { +pub fn start(self: *Client, io: std.Io) !void { var msgs_buf: [1024]Msgs = undefined; var recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch @panic("Concurrency unavailable"); @@ -58,15 +62,15 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { for (0..msgs.len) |i| { const msg = msgs[i]; defer switch (msg) { - .MSG => |m| m.deinit(alloc), - .HMSG => |h| h.deinit(alloc), + .MSG => |m| m.deinit(self.alloc), + .HMSG => |h| h.deinit(self.alloc), }; errdefer for (msgs[i + 1 ..]) |mg| switch (mg) { .MSG => |m| { - m.deinit(alloc); + m.deinit(self.alloc); }, .HMSG => |h| { - h.deinit(alloc); + h.deinit(self.alloc); }, }; switch (msg) { diff --git a/src/server/Server.zig b/src/server/Server.zig index 6b1eda5..d570c21 100644 --- a/src/server/Server.zig +++ b/src/server/Server.zig @@ -19,11 +19,16 @@ pub const Client = @import("./Client.zig"); const Msgs = Client.Msgs; const Server = @This(); +const builtin = @import("builtin"); +const safe_build = builtin.mode == .Debug or builtin.mode == .ReleaseSafe; + pub const Subscription = struct { subject: []const u8, client_id: usize, sid: []const u8, queue: *Queue(Msgs), + // used to alloc messages in the queue + alloc: Allocator, fn deinit(self: Subscription, alloc: Allocator) void { alloc.free(self.subject); @@ -47,6 +52,7 @@ pub fn deinit(server: *Server, io: Io, alloc: Allocator) void { for (server.subscriptions.items) |sub| { sub.deinit(alloc); } + // TODO drain subscription queues server.subscriptions.deinit(alloc); server.clients.deinit(alloc); } @@ -135,44 +141,44 @@ fn handleConnection( ) !void { defer stream.close(io); - // TODO: use a client allocator for things that should only live for as long as the client? - // I had this before, but it seemed to have made lifetimes harder to track. - // Messages made sense to parse using a client allocator, but that makes it hard to free - // messages when done processing them (usually outside the client process, ie: publish). + var dba: std.heap.DebugAllocator(.{}) = .init; + dba.backing_allocator = server_allocator; + defer _ = dba.deinit(); + const alloc = if (safe_build) dba.allocator() else server_allocator; // Set up client writer - const w_buffer: []u8 = try server_allocator.alloc(u8, w_buf_size); - defer server_allocator.free(w_buffer); + const w_buffer: []u8 = try alloc.alloc(u8, w_buf_size); + defer alloc.free(w_buffer); var writer = stream.writer(io, w_buffer); const out = &writer.interface; // Set up client reader - const r_buffer: []u8 = try server_allocator.alloc(u8, r_buf_size); - defer server_allocator.free(r_buffer); + const r_buffer: []u8 = try alloc.alloc(u8, r_buf_size); + defer alloc.free(r_buffer); var reader = stream.reader(io, r_buffer); const in = &reader.interface; // Set up buffer queue - const qbuf: []Message = try server_allocator.alloc(Message, 16); - defer server_allocator.free(qbuf); + const qbuf: []Message = try alloc.alloc(Message, 16); + defer alloc.free(qbuf); var recv_queue: Queue(Message) = .init(qbuf); defer recv_queue.close(io); - const mbuf: []Msgs = try server_allocator.alloc(Msgs, w_buf_size / @sizeOf(Msgs)); - defer server_allocator.free(mbuf); + const mbuf: []Msgs = try alloc.alloc(Msgs, w_buf_size / @sizeOf(Msgs)); + defer alloc.free(mbuf); var msgs_queue: Queue(Msgs) = .init(mbuf); defer { msgs_queue.close(io); while (msgs_queue.getOne(io)) |msg| { switch (msg) { - .MSG => |m| m.deinit(server_allocator), - .HMSG => |h| h.deinit(server_allocator), + .MSG => |m| m.deinit(alloc), + .HMSG => |h| h.deinit(alloc), } } else |_| {} } // Create client - var client: Client = .init(null, &recv_queue, &msgs_queue, in, out); + var client: Client = .init(null, alloc, &recv_queue, &msgs_queue, in, out); defer client.deinit(server_allocator); try server.addClient(server_allocator, id, &client); @@ -182,7 +188,7 @@ fn handleConnection( // try recv_queue.putOne(io, .PONG); try recv_queue.putOne(io, .{ .INFO = server.info }); - var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator }); + var client_task = try io.concurrent(Client.start, .{ &client, io }); defer client_task.cancel(io) catch {}; // Messages are owned by the server after they are received from the client @@ -195,16 +201,16 @@ fn handleConnection( .PUB => |pb| { @branchHint(.likely); defer pb.deinit(server_allocator); - try server.publishMessage(io, server_allocator, &client, msg); + try server.publishMessage(io, &client, msg); }, .HPUB => |hp| { @branchHint(.likely); defer hp.deinit(server_allocator); - try server.publishMessage(io, server_allocator, &client, msg); + try server.publishMessage(io, &client, msg); }, .SUB => |sub| { defer sub.deinit(server_allocator); - try server.subscribe(io, server_allocator, id, &msgs_queue, sub); + try server.subscribe(io, server_allocator, client, id, sub); }, .UNSUB => |unsub| { defer unsub.deinit(server_allocator); @@ -264,7 +270,7 @@ test subjectMatches { try expect(subjectMatches("foo.>", "foo.bar.baz")); } -fn publishMessage(server: *Server, io: Io, alloc: Allocator, source_client: *Client, msg: Message) !void { +fn publishMessage(server: *Server, io: Io, source_client: *Client, msg: Message) !void { defer if (source_client.connect) |c| { if (c.verbose) { source_client.send(io, .@"+OK") catch {}; @@ -283,12 +289,12 @@ fn publishMessage(server: *Server, io: Io, alloc: Allocator, source_client: *Cli switch (msg) { .PUB => |pb| { try subscription.queue.putOne(io, .{ - .MSG = try pb.toMsg(alloc, subscription.sid), + .MSG = try pb.toMsg(subscription.alloc, subscription.sid), }); }, .HPUB => |hp| { try subscription.queue.putOne(io, .{ - .HMSG = try hp.toHMsg(alloc, subscription.sid), + .HMSG = try hp.toHMsg(subscription.alloc, subscription.sid), }); }, else => unreachable, @@ -301,8 +307,8 @@ fn subscribe( server: *Server, io: Io, gpa: Allocator, + client: Client, id: usize, - queue: *Queue(Msgs), msg: Message.Sub, ) !void { try server.subs_lock.lock(io); @@ -315,7 +321,8 @@ fn subscribe( .subject = subject, .client_id = id, .sid = sid, - .queue = queue, + .queue = client.msg_queue, + .alloc = client.alloc, }); } -- cgit