diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2025-12-31 03:11:42 +0000 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-01 05:22:02 +0000 |
| commit | a78187603871238c154d2fec105eecbb15cee225 (patch) | |
| tree | 03dbd800f403eb51d9173166bbeb9dbf5bbe3341 /src/server | |
| parent | 0f138e5984cbebe64bc398513597d62f4e879b05 (diff) | |
Simplified a ton of things and cleaned up ownership
Diffstat (limited to 'src/server')
| -rw-r--r-- | src/server/client.zig | 17 | ||||
| -rw-r--r-- | src/server/main.zig | 90 | ||||
| -rw-r--r-- | src/server/message_parser.zig | 28 |
3 files changed, 89 insertions, 46 deletions
diff --git a/src/server/client.zig b/src/server/client.zig index 9acc3b1..69e655b 100644 --- a/src/server/client.zig +++ b/src/server/client.zig @@ -7,6 +7,8 @@ pub const ClientState = struct { /// Messages that this client should receive. recv_queue: std.Io.Queue(Message) = undefined, recv_queue_buffer: [1024]Message = undefined, + // Used to take ownership of values as they are put in the queue. + recv_alloc: std.mem.Allocator, from_client: *std.Io.Reader, to_client: *std.Io.Writer, @@ -15,11 +17,13 @@ pub const ClientState = struct { pub fn init( connect: Message.AllocatedConnect, + alloc: std.mem.Allocator, in: *std.Io.Reader, out: *std.Io.Writer, ) !ClientState { var res: ClientState = .{ .connect = connect, + .recv_alloc = alloc, .from_client = in, .to_client = out, }; @@ -49,6 +53,7 @@ pub const ClientState = struct { writeInfo(self.to_client, info) catch break; }, .msg => |m| { + defer m.deinit(self.recv_alloc); writeMsg(self.to_client, m) catch break; }, else => { @@ -68,8 +73,16 @@ pub const ClientState = struct { } /// Return true if the value was put in the clients buffer to process, else false. - pub fn send(self: *ClientState, io: std.Io, msg: Message) (std.Io.Cancelable || std.Io.QueueClosedError)!void { - try self.recv_queue.putOne(io, msg); + pub fn send(self: *ClientState, io: std.Io, msg: Message) !void { + // Client needs to own msg that is put in its queue + switch (msg) { + .msg => |m| { + try self.recv_queue.putOne(io, .{ .msg = try m.dupe(self.recv_alloc) }); + }, + else => { + try self.recv_queue.putOne(io, msg); + }, + } } pub fn next(self: *ClientState, allocator: std.mem.Allocator) !Message { diff --git a/src/server/main.zig b/src/server/main.zig index f68e5d3..7328258 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -5,10 +5,15 @@ pub const ServerInfo = Message.ServerInfo; const ClientState = @import("./client.zig").ClientState; const Server = @This(); +const Subscription = struct { + subject: []const u8, + client_id: usize, + sid: []const u8, +}; + info: ServerInfo, clients: std.AutoHashMapUnmanaged(usize, *ClientState) = .empty, -/// Map of subjects to a map of (client ID => SID) -subscriptions: std.StringHashMapUnmanaged(std.AutoHashMapUnmanaged(usize, []const u8)) = .empty, +subscriptions: std.ArrayList(Subscription) = .empty, var keep_running = std.atomic.Value(bool).init(true); @@ -100,14 +105,19 @@ fn handleConnection( var connect_arena: std.heap.ArenaAllocator = .init(allocator); defer connect_arena.deinit(); const connect = (Message.next(connect_arena.allocator(), in) catch return).connect; - var client_state: ClientState = try .init(connect, in, out); + var client_state: ClientState = try .init(connect, allocator, in, out); try client_state.start(io); defer client_state.deinit(io, allocator); try server.addClient(allocator, id, &client_state); defer server.removeClient(allocator, id); - while (client_state.next(allocator)) |msg| { + var msg_arena: std.heap.ArenaAllocator = .init(allocator); + defer msg_arena.deinit(); + const msg_allocator = msg_arena.allocator(); + + while (client_state.next(msg_allocator)) |msg| { + defer _ = msg_arena.reset(.retain_capacity); switch (msg) { .ping => { // Respond to ping with pong. @@ -115,10 +125,10 @@ fn handleConnection( }, .@"pub" => |pb| { defer { - allocator.free(pb.payload); - allocator.free(pb.subject); + msg_allocator.free(pb.payload); + msg_allocator.free(pb.subject); if (pb.reply_to) |r| { - allocator.free(r); + msg_allocator.free(r); } } try server.publishMessage(io, pb); @@ -128,19 +138,19 @@ fn handleConnection( }, .sub => |sub| { defer { - allocator.free(sub.subject); - allocator.free(sub.sid); + msg_allocator.free(sub.subject); + msg_allocator.free(sub.sid); if (sub.queue_group) |q| { - allocator.free(q); + msg_allocator.free(q); } } - try server.subscribe(allocator, id, sub); + try server.subscribe(server_allocator, id, sub); }, .unsub => |unsub| { defer { - allocator.free(unsub.sid); + msg_allocator.free(unsub.sid); } - try server.unsubscribe(id, unsub); + try server.unsubscribe(server_allocator, id, unsub); }, else => |e| { std.debug.panic("Unimplemented message: {any}\n", .{e}); @@ -161,52 +171,44 @@ fn handleConnection( // return acc.toOwnedSlice(); // } +fn subjectMatches(expected: []const u8, actual: []const u8) bool { + return std.mem.eql(u8, expected, actual); +} + fn publishMessage(server: *Server, io: std.Io, msg: Message.Pub) !void { - if (server.subscriptions.get(msg.subject)) |subs| { - var subs_iter = subs.iterator(); - while (subs_iter.next()) |sub| { - const client_id = sub.key_ptr.*; - const sid = sub.value_ptr.*; - - const client = server.clients.getPtr(client_id) orelse { - std.debug.print("trying to publish to a client that no longer exists: {d}", .{client_id}); + for (server.subscriptions.items) |subscription| { + if (subjectMatches(subscription.subject, msg.subject)) { + const client = server.clients.get(subscription.client_id) orelse { + std.debug.print("trying to publish to a client that no longer exists: {d}", .{subscription.client_id}); continue; }; - - _ = try client.*.send(io, .{ .msg = .{ + try client.send(io, .{ .msg = .{ .subject = msg.subject, - .sid = sid, + .sid = subscription.sid, .reply_to = msg.reply_to, .payload = msg.payload, } }); } - } else { - std.debug.print("no subs on {s}\n", .{msg.subject}); } } fn subscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void { std.debug.print("Recieved SUBSCRIBE message: {any}\n\n", .{msg}); - var subs_for_subject: std.AutoHashMapUnmanaged(usize, []const u8) = if (server.subscriptions.fetchRemove(msg.subject)) |s| s.value else .empty; - try subs_for_subject.put(gpa, id, msg.sid); - try server.subscriptions.put(gpa, msg.subject, subs_for_subject); + try server.subscriptions.append(gpa, .{ + .subject = try gpa.dupe(u8, msg.subject), + .client_id = id, + .sid = try gpa.dupe(u8, msg.sid), + }); } -fn unsubscribe(server: *Server, id: usize, msg: Message.Unsub) !void { - // Get the subscription in subscriptions by looping over all the subjects, - // and getting the SID for that subject for the current client ID. - // If the SID matches, remove the kv for the client ID from subscriptions for that subject. - // If the value for that subject is empty, remove the subject. - var subscriptions_iter = server.subscriptions.iterator(); - while (subscriptions_iter.next()) |*subs_for_sub| { - if (subs_for_sub.value_ptr.get(id)) |client_sub| { - if (std.mem.eql(u8, client_sub, msg.sid)) { - _ = subs_for_sub.value_ptr.*.remove(id); - if (subs_for_sub.value_ptr.count() == 0) { - _ = server.subscriptions.remove(subs_for_sub.key_ptr.*); - } - break; - } +fn unsubscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Unsub) !void { + const len = server.subscriptions.items.len; + for (0..len) |i| { + const sub = server.subscriptions.items[len - i]; + if (sub.client_id == id and std.mem.eql(u8, sub.sid, msg.sid)) { + gpa.free(sub.sid); + gpa.free(sub.subject); + _ = server.subscriptions.swapRemove(i); } } } diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index d238120..2165691 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -104,6 +104,12 @@ pub const Message = union(MessageType) { reply_to: ?[]const u8 = null, /// The message payload data. payload: []const u8, + + pub fn deinit(self: Pub, alloc: std.mem.Allocator) void { + alloc.free(self.subject); + alloc.free(self.payload); + if (self.reply_to) |r| alloc.free(r); + } }; pub const Sub = struct { /// The subject name to subscribe to. @@ -112,6 +118,12 @@ pub const Message = union(MessageType) { queue_group: ?[]const u8, /// A unique alphanumeric subscription ID, generated by the client. sid: []const u8, + + pub fn deinit(self: Sub, alloc: std.mem.Allocator) void { + alloc.free(self.subject); + alloc.free(self.sid); + if (self.queue_group) |q| alloc.free(q); + } }; pub const Unsub = struct { /// The unique alphanumeric subscription ID of the subject to unsubscribe from. @@ -124,6 +136,22 @@ pub const Message = union(MessageType) { sid: []const u8, reply_to: ?[]const u8, payload: []const u8, + + pub fn deinit(self: Msg, alloc: std.mem.Allocator) void { + alloc.free(self.subject); + alloc.free(self.sid); + alloc.free(self.payload); + if (self.reply_to) |r| alloc.free(r); + } + + pub fn dupe(self: Msg, alloc: std.mem.Allocator) !Msg { + return .{ + .subject = try alloc.dupe(u8, self.subject), + .sid = try alloc.dupe(u8, self.sid), + .reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null, + .payload = try alloc.dupe(u8, self.payload), + }; + } }; const client_types = std.StaticStringMap(MessageType).initComptime( |
