diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2025-12-09 21:19:12 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2025-12-10 00:11:12 -0500 |
| commit | 50870da1d9d5b80b01bb56461fc0c035bcd24b5b (patch) | |
| tree | a0b6afe1577a9f2ae6b4b6435ac8702c256d99fd /src/server/main.zig | |
| parent | 5fd580045d7a9b700005bbd9b85a1b3f40d1f374 (diff) | |
made some progress on subscriptions
Diffstat (limited to 'src/server/main.zig')
| -rw-r--r-- | src/server/main.zig | 211 |
1 files changed, 64 insertions, 147 deletions
diff --git a/src/server/main.zig b/src/server/main.zig index 2ebf96d..e1f9891 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -53,11 +53,18 @@ fn removeClient(server: *Server, allocator: std.mem.Allocator, id: usize) void { fn handleConnection( server: *Server, - allocator: std.mem.Allocator, + server_allocator: std.mem.Allocator, io: std.Io, id: usize, stream: std.Io.net.Stream, ) !void { + _ = server_allocator; + var client_allocator: std.heap.DebugAllocator(.{}) = .init; + defer { + std.debug.print("deinitializing debug allocator\n", .{}); + _ = client_allocator.deinit(); + } + const allocator = client_allocator.allocator(); defer stream.close(io); var w_buffer: [1024]u8 = undefined; var writer = stream.writer(io, &w_buffer); @@ -79,155 +86,46 @@ fn handleConnection( try server.addClient(allocator, id, &client_state); defer server.removeClient(allocator, id); - // defer { - // server.clients.lockPointers(); - // server.clients.remove(allocator, id); - // server.clients.unlockPointers(); - // server.subscriptions.lockPointers(); - // var sub_iter = server.subscriptions.iterator(); - // var to_free: std.ArrayList(usize) = .empty; - // defer to_free.deinit(allocator); - // while (sub_iter.next()) |sub| { - // while (std.simd.firstIndexOfValue(sub.value_ptr.*, id)) |i| { - // sub.value_ptr.*.orderedRemove(i); - // } - // if (sub.value_ptr.items.len == 0) { - // to_free.append(allocator, sub.index); - // } - // } - // server.subscriptions.orderedRemoveAtMany(allocator, to_free.items); - // server.subscriptions.unlockPointers(); - // } - - { - defer std.debug.print("done processing client??\n", .{}); - std.debug.print("processing client: {d}\n", .{client_state.id}); - - std.debug.print("awaiting next message from client\n", .{}); - while (client_state.next(allocator)) |msg| { - std.debug.print("message from client!: {any}\n", .{msg}); - switch (msg) { - .ping => { - std.debug.print("got a ping! sending a pong.\n", .{}); - - std.debug.print("recv queue in server loop: {*}\n", .{&client_state.recv_queue}); - // @import("./client.zig").writePong(out) catch return; - for (0..5) |_| { - if (try client_state.send(io, .pong)) { - std.debug.print("sent pong\n", .{}); - break; - } - std.debug.print("trying to send a pong again.\n", .{}); - } else { - std.debug.print("could not pong to client {d}\n", .{client_state.id}); + while (client_state.next(allocator)) |msg| { + std.debug.print("message from client: {any}\n", .{msg}); + switch (msg) { + .ping => { + // Respond to ping with pong. + for (0..5) |_| { + if (try client_state.send(io, .pong)) { + break; } - }, - .@"pub" => |@"pub"| { - std.debug.print("pub: {any}\n", .{@"pub"}); - try server.publishMessage(io, @"pub"); - if (client_state.connect.connect.verbose) { - std.debug.print("server IS sending +ok\n", .{}); - _ = try client_state.send(io, .@"+ok"); - } else { - std.debug.print("server NOT sending +ok\n", .{}); - } - }, - .sub => |sub| { - try server.subscribe(allocator, client_state.id, sub); - }, - .eos => { - break; - }, - else => |e| { - std.debug.panic("Unimplemented message: {any}\n", .{e}); - }, - } - - std.debug.print("processed message from client\n", .{}); - std.debug.print("awaiting next message from client\n", .{}); - } else |_| {} - - // while (!io.cancelRequested()) { - // if (client_state.send_queue.getOne(io)) |msg| { - // switch (msg) { - // // Respond to ping with pong. - // .ping => { - // try client_state.recv_queue.putOne(io, .pong); - // }, - // .@"pub" => |p| { - // std.debug.print("subs (in pub): {any}\n", .{server.subscriptions}); - // std.debug.print("subs size: {d}\n", .{server.subscriptions.size}); - // std.debug.print("subs subjects:\n", .{}); - // var key_iter = server.subscriptions.keyIterator(); - // while (key_iter.next()) |k| { - // std.debug.print("- {s}\n", .{k.*}); - // } else std.debug.print("<none>\n", .{}); - // std.debug.print("pub subject: '{s}'\n", .{p.subject}); - // std.debug.print("pub: {any}\n", .{p}); - // errdefer _ = client_state.recv_queue.put(io, &.{.@"-err"}, 1) catch {}; - // // Just publishing to exact matches right now. - // // TODO: Publish to pattern matching subjects. - // if (server.subscriptions.get(p.subject)) |subs| { - // var subs_iter = subs.iterator(); - // while (subs_iter.next()) |sub| { - // var client = server.clients.get(sub.key_ptr.*) orelse std.debug.panic("Trying to pub to a client that doesn't exist!\n", .{}); - // std.debug.print("{d} is pubbing to {d}\n", .{ client_state.id, client.id }); - // if ((try client.recv_queue.put( - // io, - // &.{ - // .{ - // .msg = .{ - // .subject = p.subject, - // .sid = sub.value_ptr.*, - // .reply_to = p.reply_to, - // .payload = p.payload, - // }, - // }, - // }, - // 0, - // )) > 0) { - // std.debug.print("published message!\n", .{}); - // } else { - // std.debug.print("skipped publishing for some reason\n", .{}); - // } - // } - // try client_state.recv_queue.putOne(io, .@"+ok"); - // } else { - // std.debug.print("no subs with subject\n", .{}); - // } - // }, - // .sub => |s| { - // var subscribers = try server.subscriptions.getOrPut(gpa, s.subject); - // if (!subscribers.found_existing) { - // subscribers.value_ptr.* = .empty; - // } - // try subscribers.value_ptr.*.put(gpa, client_state.id, s.sid); - - // std.debug.print("subs: {any}\n", .{server.subscriptions}); - // }, - // .info => |info| { - // std.debug.panic("got an info message? : {any}\n", .{info}); - // }, - // else => |m| { - // std.debug.panic("Unimplemented: {any}\n", .{m}); - // }, - // } - // } else |err| return err; - // } + } else {} + }, + .@"pub" => |@"pub"| { + std.debug.print("pub: {any}\n", .{@"pub"}); + try server.publishMessage(io, @"pub"); + if (client_state.connect.connect.verbose) { + std.debug.print("server IS sending +ok\n", .{}); + _ = try client_state.send(io, .@"+ok"); + } else { + std.debug.print("server NOT sending +ok\n", .{}); + } + }, + .sub => |sub| { + try server.subscribe(allocator, client_state.id, sub); + }, + .unsub => |unsub| { + try server.unsubscribe(client_state.id, unsub); + }, + .eos => { + break; + }, + else => |e| { + std.debug.panic("Unimplemented message: {any}\n", .{e}); + }, + } - // while (true) { - // switch (next_message) { - // .connect => |connect| { - // std.debug.panic("Connection message after already connected: {any}\n", .{connect}); - // }, - // .ping => try writePong(out), - // .@"pub" => try writeOk(out), - // else => |msg| std.debug.panic("Message type not implemented: {any}\n", .{msg}), - // } - // } - } + std.debug.print("processed message from client\n", .{}); + std.debug.print("awaiting next message from client\n", .{}); + } else |_| {} - client_state.task.await(io); + // client_state.task.await(io); } // // Result is owned by the caller @@ -267,6 +165,25 @@ fn subscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Su try server.subscriptions.put(gpa, msg.subject, subs_for_subject); } +fn unsubscribe(server: *Server, id: usize, msg: Message.Unsub) !void { + // Get the subscription in subscriptions by looping over all the subjects, + // and getting the SID for that subject for the current client ID. + // If the SID matches, remove the kv for the client ID from subscriptions for that subject. + // If the value for that subject is empty, remove the subject. + var subscriptions_iter = server.subscriptions.iterator(); + while (subscriptions_iter.next()) |*subs_for_sub| { + if (subs_for_sub.value_ptr.get(id)) |client_sub| { + if (std.mem.eql(u8, client_sub, msg.sid)) { + _ = subs_for_sub.value_ptr.*.remove(id); + if (subs_for_sub.value_ptr.count() == 0) { + _ = server.subscriptions.remove(subs_for_sub.key_ptr.*); + } + break; + } + } + } +} + pub fn createId() []const u8 { return "SERVERID"; } |
