summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/server/main.zig28
1 files changed, 21 insertions, 7 deletions
diff --git a/src/server/main.zig b/src/server/main.zig
index 7328258..a78bab2 100644
--- a/src/server/main.zig
+++ b/src/server/main.zig
@@ -13,6 +13,8 @@ const Subscription = struct {
info: ServerInfo,
clients: std.AutoHashMapUnmanaged(usize, *ClientState) = .empty,
+
+subs_lock: std.Thread.Mutex = .{},
subscriptions: std.ArrayList(Subscription) = .empty,
var keep_running = std.atomic.Value(bool).init(true);
@@ -70,10 +72,18 @@ fn addClient(server: *Server, allocator: std.mem.Allocator, id: usize, client: *
}
fn removeClient(server: *Server, allocator: std.mem.Allocator, id: usize) void {
- // TODO: implement
- _ = server;
- _ = allocator;
- _ = id;
+ server.subs_lock.lock();
+ defer server.subs_lock.unlock();
+ _ = server.clients.remove(id);
+ const len = server.subscriptions.items.len;
+ for (0..len) |i| {
+ const sub = server.subscriptions.items[len - i - 1];
+ if (sub.client_id == id) {
+ allocator.free(sub.sid);
+ allocator.free(sub.subject);
+ _ = server.subscriptions.swapRemove(i);
+ }
+ }
}
fn handleConnection(
@@ -109,8 +119,8 @@ fn handleConnection(
try client_state.start(io);
defer client_state.deinit(io, allocator);
- try server.addClient(allocator, id, &client_state);
- defer server.removeClient(allocator, id);
+ try server.addClient(server_allocator, id, &client_state);
+ defer server.removeClient(server_allocator, id);
var msg_arena: std.heap.ArenaAllocator = .init(allocator);
defer msg_arena.deinit();
@@ -194,6 +204,8 @@ fn publishMessage(server: *Server, io: std.Io, msg: Message.Pub) !void {
fn subscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void {
std.debug.print("Recieved SUBSCRIBE message: {any}\n\n", .{msg});
+ server.subs_lock.lock();
+ defer server.subs_lock.unlock();
try server.subscriptions.append(gpa, .{
.subject = try gpa.dupe(u8, msg.subject),
.client_id = id,
@@ -202,9 +214,11 @@ fn subscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Su
}
fn unsubscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Unsub) !void {
+ server.subs_lock.lock();
+ defer server.subs_lock.unlock();
const len = server.subscriptions.items.len;
for (0..len) |i| {
- const sub = server.subscriptions.items[len - i];
+ const sub = server.subscriptions.items[len - i - 1];
if (sub.client_id == id and std.mem.eql(u8, sub.sid, msg.sid)) {
gpa.free(sub.sid);
gpa.free(sub.subject);