diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2025-12-31 03:29:32 +0000 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-01 05:22:02 +0000 |
| commit | 7af7a30ed21663d510d368d0f936b08285bf092b (patch) | |
| tree | 8b86f1173b658d62a81715ef272933a48d0e0fa1 | |
| parent | a78187603871238c154d2fec105eecbb15cee225 (diff) | |
things are running quite smoothly!!!compare-to-this
coder@08714a4174bb:~$ nats bench sub foo -s localhost:4223
03:28:04 Starting Core NATS subscriber benchmark [clients=1, msg-size=128 B, msgs=100,000, multi-subject=false, subject=foo]
03:28:04 [1] Starting Core NATS subscriber, expecting 100,000 messages
Finished 6s [====================================================================================] 100%
NATS Core NATS subscriber stats: 14,691 msgs/sec ~ 1.8 MiB/sec ~ 68.06us
| -rw-r--r-- | src/server/main.zig | 28 |
1 files changed, 21 insertions, 7 deletions
diff --git a/src/server/main.zig b/src/server/main.zig index 7328258..a78bab2 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -13,6 +13,8 @@ const Subscription = struct { info: ServerInfo, clients: std.AutoHashMapUnmanaged(usize, *ClientState) = .empty, + +subs_lock: std.Thread.Mutex = .{}, subscriptions: std.ArrayList(Subscription) = .empty, var keep_running = std.atomic.Value(bool).init(true); @@ -70,10 +72,18 @@ fn addClient(server: *Server, allocator: std.mem.Allocator, id: usize, client: * } fn removeClient(server: *Server, allocator: std.mem.Allocator, id: usize) void { - // TODO: implement - _ = server; - _ = allocator; - _ = id; + server.subs_lock.lock(); + defer server.subs_lock.unlock(); + _ = server.clients.remove(id); + const len = server.subscriptions.items.len; + for (0..len) |i| { + const sub = server.subscriptions.items[len - i - 1]; + if (sub.client_id == id) { + allocator.free(sub.sid); + allocator.free(sub.subject); + _ = server.subscriptions.swapRemove(i); + } + } } fn handleConnection( @@ -109,8 +119,8 @@ fn handleConnection( try client_state.start(io); defer client_state.deinit(io, allocator); - try server.addClient(allocator, id, &client_state); - defer server.removeClient(allocator, id); + try server.addClient(server_allocator, id, &client_state); + defer server.removeClient(server_allocator, id); var msg_arena: std.heap.ArenaAllocator = .init(allocator); defer msg_arena.deinit(); @@ -194,6 +204,8 @@ fn publishMessage(server: *Server, io: std.Io, msg: Message.Pub) !void { fn subscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void { std.debug.print("Recieved SUBSCRIBE message: {any}\n\n", .{msg}); + server.subs_lock.lock(); + defer server.subs_lock.unlock(); try server.subscriptions.append(gpa, .{ .subject = try gpa.dupe(u8, msg.subject), .client_id = id, @@ -202,9 +214,11 @@ fn subscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Su } fn unsubscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Unsub) !void { + server.subs_lock.lock(); + defer server.subs_lock.unlock(); const len = server.subscriptions.items.len; for (0..len) |i| { - const sub = server.subscriptions.items[len - i]; + const sub = server.subscriptions.items[len - i - 1]; if (sub.client_id == id and std.mem.eql(u8, sub.sid, msg.sid)) { gpa.free(sub.sid); gpa.free(sub.subject); |
