diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2025-12-02 22:03:17 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2025-12-02 22:03:17 -0500 |
| commit | 4afdf32bebe14dc36f5eb6e4944ad69809f01537 (patch) | |
| tree | 4fdd7c17c4cd3e9f0c599de6f4f768add472c765 | |
| parent | aceb671ddc3e4ff3ce15c2e9814538e4f21d7d12 (diff) | |
| -rw-r--r-- | src/server/client.zig | 127 | ||||
| -rw-r--r-- | src/server/main.zig | 315 | ||||
| -rw-r--r-- | src/server/message_parser.zig | 2 |
3 files changed, 263 insertions, 181 deletions
diff --git a/src/server/client.zig b/src/server/client.zig index 037a4fa..c8a9239 100644 --- a/src/server/client.zig +++ b/src/server/client.zig @@ -11,41 +11,130 @@ pub const ClientState = struct { recv_queue: std.Io.Queue(Message), recv_queue_buffer: [1024]Message = undefined, - /// Mapping of the subjects to the IDs. - subscription_ids: std.StringHashMapUnmanaged([]const u8) = .empty, + from_client: *std.Io.Reader, + to_client: *std.Io.Writer, - pub fn init(id: usize, connect: Message.Connect) ClientState { + write_task: std.Io.Future(void), + read_task: std.Io.Future(void), + + pub fn init( + io: std.Io, + allocator: std.mem.Allocator, + id: usize, + connect: Message.Connect, + in: *std.Io.Reader, + out: *std.Io.Writer, + ) ClientState { var res: ClientState = .{ .id = id, .connect = connect, .send_queue = undefined, .recv_queue = undefined, + .from_client = in, + .to_client = out, + .write_task = undefined, + .read_task = undefined, }; res.send_queue = .init(&res.send_queue_buffer); res.recv_queue = .init(&res.recv_queue_buffer); + const write_task = io.async(processWrite, .{ &res, io, out }); + // @compileLog(@TypeOf(write_task)); + const read_task = io.async(processRead, .{ &res, io, allocator, in }); + // @compileLog(@TypeOf(read_task)); + res.write_task = write_task; + res.read_task = read_task; + return res; } + fn processWrite( + self: *ClientState, + io: std.Io, + out: *std.Io.Writer, + ) void { + while (true) { + const message = self.recv_queue.getOne(io) catch break; + switch (message) { + .@"+ok" => writeOk(out) catch break, + .pong => writePong(out) catch break, + .info => |info| writeInfo(out, info) catch break, + .msg => |m| writeMsg(out, m) catch break, + else => std.debug.panic("unimplemented write", .{}), + } + } + } + + fn processRead( + self: *ClientState, + io: std.Io, + allocator: std.mem.Allocator, + in: *std.Io.Reader, + ) void { + while (true) { + const next_message = Message.next(allocator, in) catch |err| switch (err) { + error.EndOfStream => { + break; + }, + else => { + std.debug.panic("guh: {any}\n", .{err}); + break; + // return err; + }, + }; + self.send_queue.putOne(io, next_message) catch break; + } + } + pub fn deinit(self: *ClientState, alloc: std.mem.Allocator) void { - self.subscription_ids.clearAndFree(alloc); + _ = self; + // self.write_task.cancel() catch { + // std.debug.print("failed to cancel write task of {}\n", .{self.id}); + // }; + // self.read_taks.cancel() catch { + // std.debug.print("failed to cancel read task of {}\n", .{self.id}); + // }; + // TODO: dealloc things. + _ = alloc; } - pub fn subscribe(self: *ClientState, gpa: std.mem.Allocator, sub: Message.Sub) !void { - self.subscription_ids.lockPointers(); - defer self.subscription_ids.unlockPointers(); - try self.subscription_ids.putNoClobber(gpa, sub.subject, sub.sid); + /// Return true if the value was put in the clients buffer to process, else false. + pub fn send(self: *ClientState, io: std.Io, msg: Message) std.Io.Cancelable!bool { + return (try self.recv_queue.put(io, &.{msg}, 0)) > 0; } - pub fn unsubscribe(self: *ClientState, gpa: std.mem.Allocator, sid: []const u8) void { - _ = gpa; - self.subscription_ids.lockPointers(); - defer self.subscription_ids.unlockPointers(); - var iter = self.subscription_ids.iterator(); - while (iter.next()) |entry| { - if (std.mem.eql(u8, sid, entry.value_ptr.*)) { - self.subscription_ids.swapRemove(entry.value_ptr.*); - break; - } - } else unreachable; // Assert that the SID is in the subscriptions. + pub fn next(self: *ClientState, io: std.Io) std.Io.Cancelable!Message { + return self.send_queue.getOne(io); } }; + +fn writeOk(out: *std.Io.Writer) !void { + _ = try out.write("+OK\r\n"); + try out.flush(); +} + +fn writePong(out: *std.Io.Writer) !void { + _ = try out.write("PONG\r\n"); + try out.flush(); +} + +pub fn writeInfo(out: *std.Io.Writer, info: Message.ServerInfo) !void { + _ = try out.write("INFO "); + try std.json.Stringify.value(info, .{}, out); + _ = try out.write("\r\n"); + try out.flush(); +} + +fn writeMsg(out: *std.Io.Writer, msg: Message.Msg) !void { + std.debug.print("PRINTING MESSAGE\n\n\n\n", .{}); + try out.print( + "MSG {s} {s} {s} {d}\r\n{s}\r\n", + .{ + msg.subject, + msg.sid, + msg.reply_to orelse "", + msg.payload.len, + msg.payload, + }, + ); + try out.flush(); +} diff --git a/src/server/main.zig b/src/server/main.zig index 6d82e2d..d90bea8 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -29,11 +29,30 @@ pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void { while (true) : (id +%= 1) { if (server.clients.contains(id)) continue; const stream = try tcp_server.accept(io); - _ = io.async(handleConnection, .{ gpa, io, id, stream, &server }); + _ = io.async(handleConnection, .{ &server, gpa, io, id, stream }); } } -fn handleConnection(allocator: std.mem.Allocator, io: std.Io, id: usize, stream: std.Io.net.Stream, server: *Server) !void { +fn addClient(server: *Server, allocator: std.mem.Allocator, id: usize, client: ClientState) !void { + // server.clients.lockPointers(); + try server.clients.put(allocator, id, client); + // server.clients.unlockPointers(); +} + +fn removeClient(server: *Server, allocator: std.mem.Allocator, id: usize) void { + // TODO: implement + _ = server; + _ = allocator; + _ = id; +} + +fn handleConnection( + server: *Server, + allocator: std.mem.Allocator, + io: std.Io, + id: usize, + stream: std.Io.net.Stream, +) !void { defer stream.close(io); var w_buffer: [1024]u8 = undefined; var writer = stream.writer(io, &w_buffer); @@ -43,15 +62,14 @@ fn handleConnection(allocator: std.mem.Allocator, io: std.Io, id: usize, stream: var reader = stream.reader(io, &r_buffer); const in = &reader.interface; - writeInfo(out, server.info) catch return; + @import("./client.zig").writeInfo(out, server.info) catch return; var connect_arena: std.heap.ArenaAllocator = .init(allocator); defer connect_arena.deinit(); const connect = (Message.next(connect_arena.allocator(), in) catch return).connect; - var client_state: ClientState = .init(id, connect); + var client_state: ClientState = .init(io, allocator, id, connect, in, out); - // server.clients.lockPointers(); - try server.clients.put(allocator, id, client_state); - // server.clients.unlockPointers(); + try server.addClient(allocator, id, client_state); + defer server.removeClient(allocator, id); // defer { // server.clients.lockPointers(); @@ -73,91 +91,143 @@ fn handleConnection(allocator: std.mem.Allocator, io: std.Io, id: usize, stream: // server.subscriptions.unlockPointers(); // } - processClient(allocator, io, in, out, server, &client_state) catch |err| { + server.processClient(allocator, io, &client_state) catch |err| { std.debug.panic("Error processing client: {}\n", .{err}); }; } -fn processClient(gpa: std.mem.Allocator, io: std.Io, in: *std.Io.Reader, out: *std.Io.Writer, server: *Server, client_state: *ClientState) !void { - var parse_task = io.async(parseMessages, .{ gpa, io, in, client_state }); - defer if (parse_task.cancel(io)) {} else |err| { - std.debug.print("Error canceling parse_task for {d}: {any}", .{ client_state.id, err }); - }; - var write_task = io.async(writeMessages, .{ io, out, server.*, client_state }); - defer if (write_task.cancel(io)) {} else |err| { - std.debug.print("Error canceling write_task for {d}: {any}", .{ client_state.id, err }); - }; +// // Result is owned by the caller +// fn subscribers(server: *Server, gpa: std.mem.Allocator, subject: []const u8) []ClientState { +// var acc: std.ArrayList(ClientState) = .empty; + +// return acc.toOwnedSlice(); +// } - while (!io.cancelRequested()) { - if (client_state.send_queue.getOne(io)) |msg| { - switch (msg) { - // TODO: REMOVE - .not_real => {}, - // Respond to ping with pong. - .ping => { - try client_state.recv_queue.putOne(io, .pong); - }, - .@"pub" => |p| { - std.debug.print("subs (in pub): {any}\n", .{server.subscriptions}); - std.debug.print("subs size: {d}\n", .{server.subscriptions.size}); - std.debug.print("subs subjects:\n", .{}); - var key_iter = server.subscriptions.keyIterator(); - while (key_iter.next()) |k| { - std.debug.print("- {s}\n", .{k.*}); - } else std.debug.print("<none>\n", .{}); - std.debug.print("pub subject: '{s}'\n", .{p.subject}); - std.debug.print("pub: {any}\n", .{p}); - errdefer _ = client_state.recv_queue.put(io, &.{.@"-err"}, 1) catch {}; - // Just publishing to exact matches right now. - // TODO: Publish to pattern matching subjects. - if (server.subscriptions.get(p.subject)) |subs| { - var subs_iter = subs.iterator(); - while (subs_iter.next()) |sub| { - var client = server.clients.get(sub.key_ptr.*) orelse std.debug.panic("Trying to pub to a client that doesn't exist!\n", .{}); - std.debug.print("{d} is pubbing to {d}\n", .{ client_state.id, client.id }); - if ((try client.recv_queue.put( - io, - &.{ - .{ - .msg = .{ - .subject = p.subject, - .sid = sub.value_ptr.*, - .reply_to = p.reply_to, - .payload = p.payload, - }, - }, - }, - 0, - )) > 0) { - std.debug.print("published message!\n", .{}); - } else { - std.debug.print("skipped publishing for some reason\n", .{}); - } - } - try client_state.recv_queue.putOne(io, .@"+ok"); - } else { - std.debug.print("no subs with subject\n", .{}); - } - }, - .sub => |s| { - var subscribers = try server.subscriptions.getOrPut(gpa, s.subject); - if (!subscribers.found_existing) { - subscribers.value_ptr.* = .empty; - } - try subscribers.value_ptr.*.put(gpa, client_state.id, s.sid); - - std.debug.print("subs: {any}\n", .{server.subscriptions}); - }, - .info => |info| { - std.debug.panic("got an info message? : {any}\n", .{info}); - }, - else => |m| { - std.debug.panic("Unimplemented: {any}\n", .{m}); - }, - } - } else |err| return err; +fn publishMessage(server: *Server, io: std.Io, msg: Message.Pub) !void { + if (server.subscriptions.get(msg.subject)) |subs| { + var subs_iter = subs.iterator(); + while (subs_iter.next()) |sub| { + const client_id = sub.key_ptr.*; + const sid = sub.value_ptr.*; + + var client = server.clients.getPtr(client_id) orelse { + std.debug.print("trying to publish to a client that no longer exists: {d}", .{client_id}); + continue; + }; + + _ = try client.send(io, .{ .msg = .{ + .subject = msg.subject, + .sid = sid, + .reply_to = msg.reply_to, + .payload = msg.payload, + } }); + } + } else { + std.debug.print("no subs on {s}\n", .{msg.subject}); + } +} + +fn subscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void { + var subs_for_subject: std.AutoHashMapUnmanaged(usize, []const u8) = if (server.subscriptions.fetchRemove(msg.subject)) |s| s.value else .empty; + try subs_for_subject.put(gpa, id, msg.sid); + try server.subscriptions.put(gpa, msg.subject, subs_for_subject); +} + +fn processClient(server: *Server, gpa: std.mem.Allocator, io: std.Io, client_state: *ClientState) !void { + defer client_state.deinit(gpa); + + while (true) { + switch (try client_state.next(io)) { + .ping => { + for (0..5) |_| { + if (try client_state.send(io, .pong)) break; + } else { + std.debug.print("could not pong to client {}\n", .{client_state.id}); + } + }, + .@"pub" => |msg| { + try server.publishMessage(io, msg); + if (client_state.connect.verbose) { + _ = try client_state.send(io, .@"+ok"); + } + }, + .sub => |msg| { + try server.subscribe(gpa, client_state.id, msg); + }, + else => |msg| { + std.debug.panic("Unimplemented message: {any}\n", .{msg}); + }, + } } + // while (!io.cancelRequested()) { + // if (client_state.send_queue.getOne(io)) |msg| { + // switch (msg) { + // // Respond to ping with pong. + // .ping => { + // try client_state.recv_queue.putOne(io, .pong); + // }, + // .@"pub" => |p| { + // std.debug.print("subs (in pub): {any}\n", .{server.subscriptions}); + // std.debug.print("subs size: {d}\n", .{server.subscriptions.size}); + // std.debug.print("subs subjects:\n", .{}); + // var key_iter = server.subscriptions.keyIterator(); + // while (key_iter.next()) |k| { + // std.debug.print("- {s}\n", .{k.*}); + // } else std.debug.print("<none>\n", .{}); + // std.debug.print("pub subject: '{s}'\n", .{p.subject}); + // std.debug.print("pub: {any}\n", .{p}); + // errdefer _ = client_state.recv_queue.put(io, &.{.@"-err"}, 1) catch {}; + // // Just publishing to exact matches right now. + // // TODO: Publish to pattern matching subjects. + // if (server.subscriptions.get(p.subject)) |subs| { + // var subs_iter = subs.iterator(); + // while (subs_iter.next()) |sub| { + // var client = server.clients.get(sub.key_ptr.*) orelse std.debug.panic("Trying to pub to a client that doesn't exist!\n", .{}); + // std.debug.print("{d} is pubbing to {d}\n", .{ client_state.id, client.id }); + // if ((try client.recv_queue.put( + // io, + // &.{ + // .{ + // .msg = .{ + // .subject = p.subject, + // .sid = sub.value_ptr.*, + // .reply_to = p.reply_to, + // .payload = p.payload, + // }, + // }, + // }, + // 0, + // )) > 0) { + // std.debug.print("published message!\n", .{}); + // } else { + // std.debug.print("skipped publishing for some reason\n", .{}); + // } + // } + // try client_state.recv_queue.putOne(io, .@"+ok"); + // } else { + // std.debug.print("no subs with subject\n", .{}); + // } + // }, + // .sub => |s| { + // var subscribers = try server.subscriptions.getOrPut(gpa, s.subject); + // if (!subscribers.found_existing) { + // subscribers.value_ptr.* = .empty; + // } + // try subscribers.value_ptr.*.put(gpa, client_state.id, s.sid); + + // std.debug.print("subs: {any}\n", .{server.subscriptions}); + // }, + // .info => |info| { + // std.debug.panic("got an info message? : {any}\n", .{info}); + // }, + // else => |m| { + // std.debug.panic("Unimplemented: {any}\n", .{m}); + // }, + // } + // } else |err| return err; + // } + // while (true) { // switch (next_message) { // .connect => |connect| { @@ -170,77 +240,6 @@ fn processClient(gpa: std.mem.Allocator, io: std.Io, in: *std.Io.Reader, out: *s // } } -fn parseMessages(gpa: std.mem.Allocator, io: std.Io, in: *std.Io.Reader, client_state: *ClientState) !void { - // var message_parsing_arena: std.heap.ArenaAllocator = .init(); - // defer message_parsing_arena.deinit(); - // const message_parsing_allocator = message_parsing_arena.allocator(); - - while (!io.cancelRequested()) { - // defer _ = message_parsing_arena.reset(.retain_capacity); - const next_message = // Message.next(message_parsing_allocator, in) - Message.next(gpa, in) catch |err| { - switch (err) { - error.EndOfStream => { - break; - }, - else => { - return err; - }, - } - }; - std.debug.print("received message from client {d}: {any}\n'{s}'\n", .{ client_state.id, next_message, in.buffered() }); - try client_state.send_queue.putOne(io, next_message); - } -} - -fn writeMessages(io: std.Io, out: *std.Io.Writer, server: Server, client_state: *ClientState) !void { - while (true) { - std.debug.print("in writeMessage loop for {d}\n", .{client_state.id}); - if (client_state.recv_queue.getOne(io)) |msg| { - std.debug.print("attempting to write message for {d}: {any}\n", .{ client_state.id, msg }); - switch (msg) { - .@"+ok" => try writeOk(out), - .pong => try writePong(out), - .info => try writeInfo(out, server.info), - .msg => |m| try writeMsg(out, m), - else => std.debug.panic("unimplemented write", .{}), - } - } else |err| return err; - } -} - -fn writeOk(out: *std.Io.Writer) !void { - _ = try out.write("+OK\r\n"); - try out.flush(); -} - -fn writePong(out: *std.Io.Writer) !void { - _ = try out.write("PONG\r\n"); - try out.flush(); -} - -fn writeInfo(out: *std.Io.Writer, info: ServerInfo) !void { - _ = try out.write("INFO "); - try std.json.Stringify.value(info, .{}, out); - _ = try out.write("\r\n"); - try out.flush(); -} - -fn writeMsg(out: *std.Io.Writer, msg: Message.Msg) !void { - std.debug.print("PRINTING MESSAGE\n\n\n\n", .{}); - try out.print( - "MSG {s} {s} {s} {d}\r\n{s}\r\n", - .{ - msg.subject, - msg.sid, - msg.reply_to orelse "", - msg.payload.len, - msg.payload, - }, - ); - try out.flush(); -} - pub fn createId() []const u8 { return "SERVERID"; } @@ -248,7 +247,3 @@ pub fn createId() []const u8 { pub fn createName() []const u8 { return "SERVERNAME"; } - -// test "handle pub" { -// const io = std.testing.io; -// } diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index 1a6b213..bee11bc 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -32,8 +32,6 @@ pub const MessageType = enum { }; pub const Message = union(enum) { - /// TODO: REMOVE - not_real: void, info: ServerInfo, connect: Connect, @"pub": Pub, |
