diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2025-12-31 03:11:42 +0000 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-01 05:22:02 +0000 |
| commit | a78187603871238c154d2fec105eecbb15cee225 (patch) | |
| tree | 03dbd800f403eb51d9173166bbeb9dbf5bbe3341 /src/server/main.zig | |
| parent | 0f138e5984cbebe64bc398513597d62f4e879b05 (diff) | |
Simplified a ton of things and cleaned up ownership
Diffstat (limited to 'src/server/main.zig')
| -rw-r--r-- | src/server/main.zig | 90 |
1 files changed, 46 insertions, 44 deletions
diff --git a/src/server/main.zig b/src/server/main.zig index f68e5d3..7328258 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -5,10 +5,15 @@ pub const ServerInfo = Message.ServerInfo; const ClientState = @import("./client.zig").ClientState; const Server = @This(); +const Subscription = struct { + subject: []const u8, + client_id: usize, + sid: []const u8, +}; + info: ServerInfo, clients: std.AutoHashMapUnmanaged(usize, *ClientState) = .empty, -/// Map of subjects to a map of (client ID => SID) -subscriptions: std.StringHashMapUnmanaged(std.AutoHashMapUnmanaged(usize, []const u8)) = .empty, +subscriptions: std.ArrayList(Subscription) = .empty, var keep_running = std.atomic.Value(bool).init(true); @@ -100,14 +105,19 @@ fn handleConnection( var connect_arena: std.heap.ArenaAllocator = .init(allocator); defer connect_arena.deinit(); const connect = (Message.next(connect_arena.allocator(), in) catch return).connect; - var client_state: ClientState = try .init(connect, in, out); + var client_state: ClientState = try .init(connect, allocator, in, out); try client_state.start(io); defer client_state.deinit(io, allocator); try server.addClient(allocator, id, &client_state); defer server.removeClient(allocator, id); - while (client_state.next(allocator)) |msg| { + var msg_arena: std.heap.ArenaAllocator = .init(allocator); + defer msg_arena.deinit(); + const msg_allocator = msg_arena.allocator(); + + while (client_state.next(msg_allocator)) |msg| { + defer _ = msg_arena.reset(.retain_capacity); switch (msg) { .ping => { // Respond to ping with pong. @@ -115,10 +125,10 @@ fn handleConnection( }, .@"pub" => |pb| { defer { - allocator.free(pb.payload); - allocator.free(pb.subject); + msg_allocator.free(pb.payload); + msg_allocator.free(pb.subject); if (pb.reply_to) |r| { - allocator.free(r); + msg_allocator.free(r); } } try server.publishMessage(io, pb); @@ -128,19 +138,19 @@ fn handleConnection( }, .sub => |sub| { defer { - allocator.free(sub.subject); - allocator.free(sub.sid); + msg_allocator.free(sub.subject); + msg_allocator.free(sub.sid); if (sub.queue_group) |q| { - allocator.free(q); + msg_allocator.free(q); } } - try server.subscribe(allocator, id, sub); + try server.subscribe(server_allocator, id, sub); }, .unsub => |unsub| { defer { - allocator.free(unsub.sid); + msg_allocator.free(unsub.sid); } - try server.unsubscribe(id, unsub); + try server.unsubscribe(server_allocator, id, unsub); }, else => |e| { std.debug.panic("Unimplemented message: {any}\n", .{e}); @@ -161,52 +171,44 @@ fn handleConnection( // return acc.toOwnedSlice(); // } +fn subjectMatches(expected: []const u8, actual: []const u8) bool { + return std.mem.eql(u8, expected, actual); +} + fn publishMessage(server: *Server, io: std.Io, msg: Message.Pub) !void { - if (server.subscriptions.get(msg.subject)) |subs| { - var subs_iter = subs.iterator(); - while (subs_iter.next()) |sub| { - const client_id = sub.key_ptr.*; - const sid = sub.value_ptr.*; - - const client = server.clients.getPtr(client_id) orelse { - std.debug.print("trying to publish to a client that no longer exists: {d}", .{client_id}); + for (server.subscriptions.items) |subscription| { + if (subjectMatches(subscription.subject, msg.subject)) { + const client = server.clients.get(subscription.client_id) orelse { + std.debug.print("trying to publish to a client that no longer exists: {d}", .{subscription.client_id}); continue; }; - - _ = try client.*.send(io, .{ .msg = .{ + try client.send(io, .{ .msg = .{ .subject = msg.subject, - .sid = sid, + .sid = subscription.sid, .reply_to = msg.reply_to, .payload = msg.payload, } }); } - } else { - std.debug.print("no subs on {s}\n", .{msg.subject}); } } fn subscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void { std.debug.print("Recieved SUBSCRIBE message: {any}\n\n", .{msg}); - var subs_for_subject: std.AutoHashMapUnmanaged(usize, []const u8) = if (server.subscriptions.fetchRemove(msg.subject)) |s| s.value else .empty; - try subs_for_subject.put(gpa, id, msg.sid); - try server.subscriptions.put(gpa, msg.subject, subs_for_subject); + try server.subscriptions.append(gpa, .{ + .subject = try gpa.dupe(u8, msg.subject), + .client_id = id, + .sid = try gpa.dupe(u8, msg.sid), + }); } -fn unsubscribe(server: *Server, id: usize, msg: Message.Unsub) !void { - // Get the subscription in subscriptions by looping over all the subjects, - // and getting the SID for that subject for the current client ID. - // If the SID matches, remove the kv for the client ID from subscriptions for that subject. - // If the value for that subject is empty, remove the subject. - var subscriptions_iter = server.subscriptions.iterator(); - while (subscriptions_iter.next()) |*subs_for_sub| { - if (subs_for_sub.value_ptr.get(id)) |client_sub| { - if (std.mem.eql(u8, client_sub, msg.sid)) { - _ = subs_for_sub.value_ptr.*.remove(id); - if (subs_for_sub.value_ptr.count() == 0) { - _ = server.subscriptions.remove(subs_for_sub.key_ptr.*); - } - break; - } +fn unsubscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Unsub) !void { + const len = server.subscriptions.items.len; + for (0..len) |i| { + const sub = server.subscriptions.items[len - i]; + if (sub.client_id == id and std.mem.eql(u8, sub.sid, msg.sid)) { + gpa.free(sub.sid); + gpa.free(sub.subject); + _ = server.subscriptions.swapRemove(i); } } } |
