From 4afdf32bebe14dc36f5eb6e4944ad69809f01537 Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Tue, 2 Dec 2025 22:03:17 -0500 Subject: --- src/server/main.zig | 315 ++++++++++++++++++++++++++-------------------------- 1 file changed, 155 insertions(+), 160 deletions(-) (limited to 'src/server/main.zig') diff --git a/src/server/main.zig b/src/server/main.zig index 6d82e2d..d90bea8 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -29,11 +29,30 @@ pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void { while (true) : (id +%= 1) { if (server.clients.contains(id)) continue; const stream = try tcp_server.accept(io); - _ = io.async(handleConnection, .{ gpa, io, id, stream, &server }); + _ = io.async(handleConnection, .{ &server, gpa, io, id, stream }); } } -fn handleConnection(allocator: std.mem.Allocator, io: std.Io, id: usize, stream: std.Io.net.Stream, server: *Server) !void { +fn addClient(server: *Server, allocator: std.mem.Allocator, id: usize, client: ClientState) !void { + // server.clients.lockPointers(); + try server.clients.put(allocator, id, client); + // server.clients.unlockPointers(); +} + +fn removeClient(server: *Server, allocator: std.mem.Allocator, id: usize) void { + // TODO: implement + _ = server; + _ = allocator; + _ = id; +} + +fn handleConnection( + server: *Server, + allocator: std.mem.Allocator, + io: std.Io, + id: usize, + stream: std.Io.net.Stream, +) !void { defer stream.close(io); var w_buffer: [1024]u8 = undefined; var writer = stream.writer(io, &w_buffer); @@ -43,15 +62,14 @@ fn handleConnection(allocator: std.mem.Allocator, io: std.Io, id: usize, stream: var reader = stream.reader(io, &r_buffer); const in = &reader.interface; - writeInfo(out, server.info) catch return; + @import("./client.zig").writeInfo(out, server.info) catch return; var connect_arena: std.heap.ArenaAllocator = .init(allocator); defer connect_arena.deinit(); const connect = (Message.next(connect_arena.allocator(), in) catch return).connect; - var client_state: ClientState = .init(id, connect); + var client_state: ClientState = .init(io, allocator, id, connect, in, out); - // server.clients.lockPointers(); - try server.clients.put(allocator, id, client_state); - // server.clients.unlockPointers(); + try server.addClient(allocator, id, client_state); + defer server.removeClient(allocator, id); // defer { // server.clients.lockPointers(); @@ -73,91 +91,143 @@ fn handleConnection(allocator: std.mem.Allocator, io: std.Io, id: usize, stream: // server.subscriptions.unlockPointers(); // } - processClient(allocator, io, in, out, server, &client_state) catch |err| { + server.processClient(allocator, io, &client_state) catch |err| { std.debug.panic("Error processing client: {}\n", .{err}); }; } -fn processClient(gpa: std.mem.Allocator, io: std.Io, in: *std.Io.Reader, out: *std.Io.Writer, server: *Server, client_state: *ClientState) !void { - var parse_task = io.async(parseMessages, .{ gpa, io, in, client_state }); - defer if (parse_task.cancel(io)) {} else |err| { - std.debug.print("Error canceling parse_task for {d}: {any}", .{ client_state.id, err }); - }; - var write_task = io.async(writeMessages, .{ io, out, server.*, client_state }); - defer if (write_task.cancel(io)) {} else |err| { - std.debug.print("Error canceling write_task for {d}: {any}", .{ client_state.id, err }); - }; +// // Result is owned by the caller +// fn subscribers(server: *Server, gpa: std.mem.Allocator, subject: []const u8) []ClientState { +// var acc: std.ArrayList(ClientState) = .empty; + +// return acc.toOwnedSlice(); +// } - while (!io.cancelRequested()) { - if (client_state.send_queue.getOne(io)) |msg| { - switch (msg) { - // TODO: REMOVE - .not_real => {}, - // Respond to ping with pong. - .ping => { - try client_state.recv_queue.putOne(io, .pong); - }, - .@"pub" => |p| { - std.debug.print("subs (in pub): {any}\n", .{server.subscriptions}); - std.debug.print("subs size: {d}\n", .{server.subscriptions.size}); - std.debug.print("subs subjects:\n", .{}); - var key_iter = server.subscriptions.keyIterator(); - while (key_iter.next()) |k| { - std.debug.print("- {s}\n", .{k.*}); - } else std.debug.print("\n", .{}); - std.debug.print("pub subject: '{s}'\n", .{p.subject}); - std.debug.print("pub: {any}\n", .{p}); - errdefer _ = client_state.recv_queue.put(io, &.{.@"-err"}, 1) catch {}; - // Just publishing to exact matches right now. - // TODO: Publish to pattern matching subjects. - if (server.subscriptions.get(p.subject)) |subs| { - var subs_iter = subs.iterator(); - while (subs_iter.next()) |sub| { - var client = server.clients.get(sub.key_ptr.*) orelse std.debug.panic("Trying to pub to a client that doesn't exist!\n", .{}); - std.debug.print("{d} is pubbing to {d}\n", .{ client_state.id, client.id }); - if ((try client.recv_queue.put( - io, - &.{ - .{ - .msg = .{ - .subject = p.subject, - .sid = sub.value_ptr.*, - .reply_to = p.reply_to, - .payload = p.payload, - }, - }, - }, - 0, - )) > 0) { - std.debug.print("published message!\n", .{}); - } else { - std.debug.print("skipped publishing for some reason\n", .{}); - } - } - try client_state.recv_queue.putOne(io, .@"+ok"); - } else { - std.debug.print("no subs with subject\n", .{}); - } - }, - .sub => |s| { - var subscribers = try server.subscriptions.getOrPut(gpa, s.subject); - if (!subscribers.found_existing) { - subscribers.value_ptr.* = .empty; - } - try subscribers.value_ptr.*.put(gpa, client_state.id, s.sid); - - std.debug.print("subs: {any}\n", .{server.subscriptions}); - }, - .info => |info| { - std.debug.panic("got an info message? : {any}\n", .{info}); - }, - else => |m| { - std.debug.panic("Unimplemented: {any}\n", .{m}); - }, - } - } else |err| return err; +fn publishMessage(server: *Server, io: std.Io, msg: Message.Pub) !void { + if (server.subscriptions.get(msg.subject)) |subs| { + var subs_iter = subs.iterator(); + while (subs_iter.next()) |sub| { + const client_id = sub.key_ptr.*; + const sid = sub.value_ptr.*; + + var client = server.clients.getPtr(client_id) orelse { + std.debug.print("trying to publish to a client that no longer exists: {d}", .{client_id}); + continue; + }; + + _ = try client.send(io, .{ .msg = .{ + .subject = msg.subject, + .sid = sid, + .reply_to = msg.reply_to, + .payload = msg.payload, + } }); + } + } else { + std.debug.print("no subs on {s}\n", .{msg.subject}); + } +} + +fn subscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void { + var subs_for_subject: std.AutoHashMapUnmanaged(usize, []const u8) = if (server.subscriptions.fetchRemove(msg.subject)) |s| s.value else .empty; + try subs_for_subject.put(gpa, id, msg.sid); + try server.subscriptions.put(gpa, msg.subject, subs_for_subject); +} + +fn processClient(server: *Server, gpa: std.mem.Allocator, io: std.Io, client_state: *ClientState) !void { + defer client_state.deinit(gpa); + + while (true) { + switch (try client_state.next(io)) { + .ping => { + for (0..5) |_| { + if (try client_state.send(io, .pong)) break; + } else { + std.debug.print("could not pong to client {}\n", .{client_state.id}); + } + }, + .@"pub" => |msg| { + try server.publishMessage(io, msg); + if (client_state.connect.verbose) { + _ = try client_state.send(io, .@"+ok"); + } + }, + .sub => |msg| { + try server.subscribe(gpa, client_state.id, msg); + }, + else => |msg| { + std.debug.panic("Unimplemented message: {any}\n", .{msg}); + }, + } } + // while (!io.cancelRequested()) { + // if (client_state.send_queue.getOne(io)) |msg| { + // switch (msg) { + // // Respond to ping with pong. + // .ping => { + // try client_state.recv_queue.putOne(io, .pong); + // }, + // .@"pub" => |p| { + // std.debug.print("subs (in pub): {any}\n", .{server.subscriptions}); + // std.debug.print("subs size: {d}\n", .{server.subscriptions.size}); + // std.debug.print("subs subjects:\n", .{}); + // var key_iter = server.subscriptions.keyIterator(); + // while (key_iter.next()) |k| { + // std.debug.print("- {s}\n", .{k.*}); + // } else std.debug.print("\n", .{}); + // std.debug.print("pub subject: '{s}'\n", .{p.subject}); + // std.debug.print("pub: {any}\n", .{p}); + // errdefer _ = client_state.recv_queue.put(io, &.{.@"-err"}, 1) catch {}; + // // Just publishing to exact matches right now. + // // TODO: Publish to pattern matching subjects. + // if (server.subscriptions.get(p.subject)) |subs| { + // var subs_iter = subs.iterator(); + // while (subs_iter.next()) |sub| { + // var client = server.clients.get(sub.key_ptr.*) orelse std.debug.panic("Trying to pub to a client that doesn't exist!\n", .{}); + // std.debug.print("{d} is pubbing to {d}\n", .{ client_state.id, client.id }); + // if ((try client.recv_queue.put( + // io, + // &.{ + // .{ + // .msg = .{ + // .subject = p.subject, + // .sid = sub.value_ptr.*, + // .reply_to = p.reply_to, + // .payload = p.payload, + // }, + // }, + // }, + // 0, + // )) > 0) { + // std.debug.print("published message!\n", .{}); + // } else { + // std.debug.print("skipped publishing for some reason\n", .{}); + // } + // } + // try client_state.recv_queue.putOne(io, .@"+ok"); + // } else { + // std.debug.print("no subs with subject\n", .{}); + // } + // }, + // .sub => |s| { + // var subscribers = try server.subscriptions.getOrPut(gpa, s.subject); + // if (!subscribers.found_existing) { + // subscribers.value_ptr.* = .empty; + // } + // try subscribers.value_ptr.*.put(gpa, client_state.id, s.sid); + + // std.debug.print("subs: {any}\n", .{server.subscriptions}); + // }, + // .info => |info| { + // std.debug.panic("got an info message? : {any}\n", .{info}); + // }, + // else => |m| { + // std.debug.panic("Unimplemented: {any}\n", .{m}); + // }, + // } + // } else |err| return err; + // } + // while (true) { // switch (next_message) { // .connect => |connect| { @@ -170,77 +240,6 @@ fn processClient(gpa: std.mem.Allocator, io: std.Io, in: *std.Io.Reader, out: *s // } } -fn parseMessages(gpa: std.mem.Allocator, io: std.Io, in: *std.Io.Reader, client_state: *ClientState) !void { - // var message_parsing_arena: std.heap.ArenaAllocator = .init(); - // defer message_parsing_arena.deinit(); - // const message_parsing_allocator = message_parsing_arena.allocator(); - - while (!io.cancelRequested()) { - // defer _ = message_parsing_arena.reset(.retain_capacity); - const next_message = // Message.next(message_parsing_allocator, in) - Message.next(gpa, in) catch |err| { - switch (err) { - error.EndOfStream => { - break; - }, - else => { - return err; - }, - } - }; - std.debug.print("received message from client {d}: {any}\n'{s}'\n", .{ client_state.id, next_message, in.buffered() }); - try client_state.send_queue.putOne(io, next_message); - } -} - -fn writeMessages(io: std.Io, out: *std.Io.Writer, server: Server, client_state: *ClientState) !void { - while (true) { - std.debug.print("in writeMessage loop for {d}\n", .{client_state.id}); - if (client_state.recv_queue.getOne(io)) |msg| { - std.debug.print("attempting to write message for {d}: {any}\n", .{ client_state.id, msg }); - switch (msg) { - .@"+ok" => try writeOk(out), - .pong => try writePong(out), - .info => try writeInfo(out, server.info), - .msg => |m| try writeMsg(out, m), - else => std.debug.panic("unimplemented write", .{}), - } - } else |err| return err; - } -} - -fn writeOk(out: *std.Io.Writer) !void { - _ = try out.write("+OK\r\n"); - try out.flush(); -} - -fn writePong(out: *std.Io.Writer) !void { - _ = try out.write("PONG\r\n"); - try out.flush(); -} - -fn writeInfo(out: *std.Io.Writer, info: ServerInfo) !void { - _ = try out.write("INFO "); - try std.json.Stringify.value(info, .{}, out); - _ = try out.write("\r\n"); - try out.flush(); -} - -fn writeMsg(out: *std.Io.Writer, msg: Message.Msg) !void { - std.debug.print("PRINTING MESSAGE\n\n\n\n", .{}); - try out.print( - "MSG {s} {s} {s} {d}\r\n{s}\r\n", - .{ - msg.subject, - msg.sid, - msg.reply_to orelse "", - msg.payload.len, - msg.payload, - }, - ); - try out.flush(); -} - pub fn createId() []const u8 { return "SERVERID"; } @@ -248,7 +247,3 @@ pub fn createId() []const u8 { pub fn createName() []const u8 { return "SERVERNAME"; } - -// test "handle pub" { -// const io = std.testing.io; -// } -- cgit