const std = @import("std"); const Message = @import("./message_parser.zig").Message; pub const ServerInfo = Message.ServerInfo; const ClientState = @import("./client.zig").ClientState; const Server = @This(); info: ServerInfo, clients: std.AutoHashMapUnmanaged(usize, ClientState) = .empty, /// Map of subjects to a map of (client ID => SID) subscriptions: std.StringHashMapUnmanaged(std.AutoHashMapUnmanaged(usize, []const u8)) = .empty, pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void { var server: Server = .{ .info = server_config, }; var threaded: std.Io.Threaded = .init(gpa); defer threaded.deinit(); const io = threaded.io(); var tcp_server = try std.Io.net.IpAddress.listen(try std.Io.net.IpAddress.parse( server.info.host, server.info.port, ), io, .{}); defer tcp_server.deinit(io); var id: usize = 0; while (true) : (id +%= 1) { if (server.clients.contains(id)) continue; const stream = try tcp_server.accept(io); _ = io.async(handleConnection, .{ gpa, io, id, stream, &server }); } } fn handleConnection(allocator: std.mem.Allocator, io: std.Io, id: usize, stream: std.Io.net.Stream, server: *Server) !void { defer stream.close(io); var w_buffer: [1024]u8 = undefined; var writer = stream.writer(io, &w_buffer); const out = &writer.interface; var r_buffer: [8192]u8 = undefined; var reader = stream.reader(io, &r_buffer); const in = &reader.interface; writeInfo(out, server.info) catch return; var connect_arena: std.heap.ArenaAllocator = .init(allocator); defer connect_arena.deinit(); const connect = (Message.next(connect_arena.allocator(), in) catch return).connect; var client_state: ClientState = .init(id, connect); // server.clients.lockPointers(); try server.clients.put(allocator, id, client_state); // server.clients.unlockPointers(); // defer { // server.clients.lockPointers(); // server.clients.remove(allocator, id); // server.clients.unlockPointers(); // server.subscriptions.lockPointers(); // var sub_iter = server.subscriptions.iterator(); // var to_free: std.ArrayList(usize) = .empty; // defer to_free.deinit(allocator); // while (sub_iter.next()) |sub| { // while (std.simd.firstIndexOfValue(sub.value_ptr.*, id)) |i| { // sub.value_ptr.*.orderedRemove(i); // } // if (sub.value_ptr.items.len == 0) { // to_free.append(allocator, sub.index); // } // } // server.subscriptions.orderedRemoveAtMany(allocator, to_free.items); // server.subscriptions.unlockPointers(); // } processClient(allocator, io, in, out, server, &client_state) catch |err| { std.debug.panic("Error processing client: {}\n", .{err}); }; } fn processClient(gpa: std.mem.Allocator, io: std.Io, in: *std.Io.Reader, out: *std.Io.Writer, server: *Server, client_state: *ClientState) !void { var parse_task = io.async(parseMessages, .{ gpa, io, in, client_state }); defer if (parse_task.cancel(io)) {} else |err| { std.debug.print("Error canceling parse_task for {d}: {any}", .{ client_state.id, err }); }; var write_task = io.async(writeMessages, .{ io, out, server.*, client_state }); defer if (write_task.cancel(io)) {} else |err| { std.debug.print("Error canceling write_task for {d}: {any}", .{ client_state.id, err }); }; while (!io.cancelRequested()) { if (client_state.send_queue.getOne(io)) |msg| { switch (msg) { // TODO: REMOVE .not_real => {}, // Respond to ping with pong. .ping => { try client_state.recv_queue.putOne(io, .pong); }, .@"pub" => |p| { std.debug.print("subs (in pub): {any}\n", .{server.subscriptions}); std.debug.print("subs size: {d}\n", .{server.subscriptions.size}); std.debug.print("subs subjects:\n", .{}); var key_iter = server.subscriptions.keyIterator(); while (key_iter.next()) |k| { std.debug.print("- {s}\n", .{k.*}); } else std.debug.print("\n", .{}); std.debug.print("pub subject: '{s}'\n", .{p.subject}); std.debug.print("pub: {any}\n", .{p}); errdefer _ = client_state.recv_queue.put(io, &.{.@"-err"}, 1) catch {}; // Just publishing to exact matches right now. // TODO: Publish to pattern matching subjects. if (server.subscriptions.get(p.subject)) |subs| { var subs_iter = subs.iterator(); while (subs_iter.next()) |sub| { var client = server.clients.get(sub.key_ptr.*) orelse std.debug.panic("Trying to pub to a client that doesn't exist!\n", .{}); std.debug.print("{d} is pubbing to {d}\n", .{ client_state.id, client.id }); if ((try client.recv_queue.put( io, &.{ .{ .msg = .{ .subject = p.subject, .sid = sub.value_ptr.*, .reply_to = p.reply_to, .payload = p.payload, }, }, }, 0, )) > 0) { std.debug.print("published message!\n", .{}); } else { std.debug.print("skipped publishing for some reason\n", .{}); } } try client_state.recv_queue.putOne(io, .@"+ok"); } else { std.debug.print("no subs with subject\n", .{}); } }, .sub => |s| { var subscribers = try server.subscriptions.getOrPut(gpa, s.subject); if (!subscribers.found_existing) { subscribers.value_ptr.* = .empty; } try subscribers.value_ptr.*.put(gpa, client_state.id, s.sid); std.debug.print("subs: {any}\n", .{server.subscriptions}); }, .info => |info| { std.debug.panic("got an info message? : {any}\n", .{info}); }, else => |m| { std.debug.panic("Unimplemented: {any}\n", .{m}); }, } } else |err| return err; } // while (true) { // switch (next_message) { // .connect => |connect| { // std.debug.panic("Connection message after already connected: {any}\n", .{connect}); // }, // .ping => try writePong(out), // .@"pub" => try writeOk(out), // else => |msg| std.debug.panic("Message type not implemented: {any}\n", .{msg}), // } // } } fn parseMessages(gpa: std.mem.Allocator, io: std.Io, in: *std.Io.Reader, client_state: *ClientState) !void { // var message_parsing_arena: std.heap.ArenaAllocator = .init(); // defer message_parsing_arena.deinit(); // const message_parsing_allocator = message_parsing_arena.allocator(); while (!io.cancelRequested()) { // defer _ = message_parsing_arena.reset(.retain_capacity); const next_message = // Message.next(message_parsing_allocator, in) Message.next(gpa, in) catch |err| { switch (err) { error.EndOfStream => { break; }, else => { return err; }, } }; std.debug.print("received message from client {d}: {any}\n'{s}'\n", .{ client_state.id, next_message, in.buffered() }); try client_state.send_queue.putOne(io, next_message); } } fn writeMessages(io: std.Io, out: *std.Io.Writer, server: Server, client_state: *ClientState) !void { while (true) { std.debug.print("in writeMessage loop for {d}\n", .{client_state.id}); if (client_state.recv_queue.getOne(io)) |msg| { std.debug.print("attempting to write message for {d}: {any}\n", .{ client_state.id, msg }); switch (msg) { .@"+ok" => try writeOk(out), .pong => try writePong(out), .info => try writeInfo(out, server.info), .msg => |m| try writeMsg(out, m), else => std.debug.panic("unimplemented write", .{}), } } else |err| return err; } } fn writeOk(out: *std.Io.Writer) !void { _ = try out.write("+OK\r\n"); try out.flush(); } fn writePong(out: *std.Io.Writer) !void { _ = try out.write("PONG\r\n"); try out.flush(); } fn writeInfo(out: *std.Io.Writer, info: ServerInfo) !void { _ = try out.write("INFO "); try std.json.Stringify.value(info, .{}, out); _ = try out.write("\r\n"); try out.flush(); } fn writeMsg(out: *std.Io.Writer, msg: Message.Msg) !void { std.debug.print("PRINTING MESSAGE\n\n\n\n", .{}); try out.print( "MSG {s} {s} {s} {d}\r\n{s}\r\n", .{ msg.subject, msg.sid, msg.reply_to orelse "", msg.payload.len, msg.payload, }, ); try out.flush(); } pub fn createId() []const u8 { return "SERVERID"; } pub fn createName() []const u8 { return "SERVERNAME"; } // test "handle pub" { // const io = std.testing.io; // }