const std = @import("std"); const Message = @import("./message_parser.zig").Message; pub const ServerInfo = Message.ServerInfo; const ClientState = @import("./client.zig").ClientState; const Server = @This(); info: ServerInfo, clients: std.AutoHashMapUnmanaged(usize, *ClientState) = .empty, /// Map of subjects to a map of (client ID => SID) subscriptions: std.StringHashMapUnmanaged(std.AutoHashMapUnmanaged(usize, []const u8)) = .empty, pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void { var server: Server = .{ .info = server_config, }; var threaded: std.Io.Threaded = .init(gpa); defer threaded.deinit(); const io = threaded.io(); var tcp_server = try std.Io.net.IpAddress.listen(try std.Io.net.IpAddress.parse( server.info.host, server.info.port, ), io, .{}); defer tcp_server.deinit(io); var id: usize = 0; while (true) : (id +%= 1) { std.debug.print("in server loop\n", .{}); if (server.clients.contains(id)) continue; const stream = try tcp_server.accept(io); std.debug.print("accepted connection\n", .{}); _ = io.concurrent(handleConnection, .{ &server, gpa, io, id, stream }) catch { std.debug.print("could not start concurrent handler for {d}\n", .{id}); stream.close(io); }; } } fn addClient(server: *Server, allocator: std.mem.Allocator, id: usize, client: *ClientState) !void { // server.clients.lockPointers(); try server.clients.put(allocator, id, client); // server.clients.unlockPointers(); } fn removeClient(server: *Server, allocator: std.mem.Allocator, id: usize) void { // TODO: implement _ = server; _ = allocator; _ = id; } fn handleConnection( server: *Server, allocator: std.mem.Allocator, io: std.Io, id: usize, stream: std.Io.net.Stream, ) !void { defer stream.close(io); var w_buffer: [1024]u8 = undefined; var writer = stream.writer(io, &w_buffer); const out = &writer.interface; var r_buffer: [8192]u8 = undefined; var reader = stream.reader(io, &r_buffer); const in = &reader.interface; @import("./client.zig").writeInfo(out, server.info) catch return; var connect_arena: std.heap.ArenaAllocator = .init(allocator); defer connect_arena.deinit(); const connect = (Message.next(connect_arena.allocator(), in) catch return).connect; var client_state: ClientState = try .init(io, allocator, id, connect, in, out); try server.addClient(allocator, id, &client_state); defer server.removeClient(allocator, id); // defer { // server.clients.lockPointers(); // server.clients.remove(allocator, id); // server.clients.unlockPointers(); // server.subscriptions.lockPointers(); // var sub_iter = server.subscriptions.iterator(); // var to_free: std.ArrayList(usize) = .empty; // defer to_free.deinit(allocator); // while (sub_iter.next()) |sub| { // while (std.simd.firstIndexOfValue(sub.value_ptr.*, id)) |i| { // sub.value_ptr.*.orderedRemove(i); // } // if (sub.value_ptr.items.len == 0) { // to_free.append(allocator, sub.index); // } // } // server.subscriptions.orderedRemoveAtMany(allocator, to_free.items); // server.subscriptions.unlockPointers(); // } server.processClient(allocator, io, &client_state) catch |err| { std.debug.panic("Error processing client: {}\n", .{err}); }; } // // Result is owned by the caller // fn subscribers(server: *Server, gpa: std.mem.Allocator, subject: []const u8) []ClientState { // var acc: std.ArrayList(ClientState) = .empty; // return acc.toOwnedSlice(); // } fn publishMessage(server: *Server, io: std.Io, msg: Message.Pub) !void { if (server.subscriptions.get(msg.subject)) |subs| { var subs_iter = subs.iterator(); while (subs_iter.next()) |sub| { const client_id = sub.key_ptr.*; const sid = sub.value_ptr.*; const client = server.clients.getPtr(client_id) orelse { std.debug.print("trying to publish to a client that no longer exists: {d}", .{client_id}); continue; }; _ = try client.*.send(io, .{ .msg = .{ .subject = msg.subject, .sid = sid, .reply_to = msg.reply_to, .payload = msg.payload, } }); } } else { std.debug.print("no subs on {s}\n", .{msg.subject}); } } fn subscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void { var subs_for_subject: std.AutoHashMapUnmanaged(usize, []const u8) = if (server.subscriptions.fetchRemove(msg.subject)) |s| s.value else .empty; try subs_for_subject.put(gpa, id, msg.sid); try server.subscriptions.put(gpa, msg.subject, subs_for_subject); } pub fn processClient(server: *Server, gpa: std.mem.Allocator, io: std.Io, client_state: *ClientState) !void { defer std.debug.print("done processing client??\n", .{}); defer client_state.deinit(gpa); std.debug.print("processing client: {d}\n", .{client_state.id}); std.debug.print("awaiting next message from client\n", .{}); while (client_state.next(io)) |msg| { std.debug.print("message from client!: {any}\n", .{msg}); switch (msg) { .ping => { std.debug.print("got a ping! sending a pong.\n", .{}); for (0..5) |_| { if (try client_state.send(io, .pong)) { std.debug.print("sent pong\n", .{}); break; } std.debug.print("trying to send a pong again.\n", .{}); } else { std.debug.print("could not pong to client {d}\n", .{client_state.id}); } }, .@"pub" => |@"pub"| { std.debug.print("pub: {any}\n", .{@"pub"}); try server.publishMessage(io, @"pub"); if (client_state.connect.connect.verbose) { _ = try client_state.send(io, .@"+ok"); } }, .sub => |sub| { try server.subscribe(gpa, client_state.id, sub); }, .eos => { client_state.io_group.wait(io); break; }, else => |e| { std.debug.panic("Unimplemented message: {any}\n", .{e}); }, } std.debug.print("processed message from client\n", .{}); std.debug.print("awaiting next message from client\n", .{}); } else |_| {} // while (!io.cancelRequested()) { // if (client_state.send_queue.getOne(io)) |msg| { // switch (msg) { // // Respond to ping with pong. // .ping => { // try client_state.recv_queue.putOne(io, .pong); // }, // .@"pub" => |p| { // std.debug.print("subs (in pub): {any}\n", .{server.subscriptions}); // std.debug.print("subs size: {d}\n", .{server.subscriptions.size}); // std.debug.print("subs subjects:\n", .{}); // var key_iter = server.subscriptions.keyIterator(); // while (key_iter.next()) |k| { // std.debug.print("- {s}\n", .{k.*}); // } else std.debug.print("\n", .{}); // std.debug.print("pub subject: '{s}'\n", .{p.subject}); // std.debug.print("pub: {any}\n", .{p}); // errdefer _ = client_state.recv_queue.put(io, &.{.@"-err"}, 1) catch {}; // // Just publishing to exact matches right now. // // TODO: Publish to pattern matching subjects. // if (server.subscriptions.get(p.subject)) |subs| { // var subs_iter = subs.iterator(); // while (subs_iter.next()) |sub| { // var client = server.clients.get(sub.key_ptr.*) orelse std.debug.panic("Trying to pub to a client that doesn't exist!\n", .{}); // std.debug.print("{d} is pubbing to {d}\n", .{ client_state.id, client.id }); // if ((try client.recv_queue.put( // io, // &.{ // .{ // .msg = .{ // .subject = p.subject, // .sid = sub.value_ptr.*, // .reply_to = p.reply_to, // .payload = p.payload, // }, // }, // }, // 0, // )) > 0) { // std.debug.print("published message!\n", .{}); // } else { // std.debug.print("skipped publishing for some reason\n", .{}); // } // } // try client_state.recv_queue.putOne(io, .@"+ok"); // } else { // std.debug.print("no subs with subject\n", .{}); // } // }, // .sub => |s| { // var subscribers = try server.subscriptions.getOrPut(gpa, s.subject); // if (!subscribers.found_existing) { // subscribers.value_ptr.* = .empty; // } // try subscribers.value_ptr.*.put(gpa, client_state.id, s.sid); // std.debug.print("subs: {any}\n", .{server.subscriptions}); // }, // .info => |info| { // std.debug.panic("got an info message? : {any}\n", .{info}); // }, // else => |m| { // std.debug.panic("Unimplemented: {any}\n", .{m}); // }, // } // } else |err| return err; // } // while (true) { // switch (next_message) { // .connect => |connect| { // std.debug.panic("Connection message after already connected: {any}\n", .{connect}); // }, // .ping => try writePong(out), // .@"pub" => try writeOk(out), // else => |msg| std.debug.panic("Message type not implemented: {any}\n", .{msg}), // } // } } pub fn createId() []const u8 { return "SERVERID"; } pub fn createName() []const u8 { return "SERVERNAME"; } // TESTING // fn initTestServer() Server { // return .{ // .info = .{ // .server_id = "ABCD", // .server_name = "test server", // .version = "0.1.2", // .max_payload = 1234, // }, // }; // } // fn initTestClient( // io: std.Io, // allocator: std.mem.Allocator, // id: usize, // data_from: []const u8, // ) !struct { // Client, // *std.Io.Reader, // *std.Io.Writer, // } { // return .init(io, allocator, id, .{}, in, out); // } // test { // const gpa = std.testing.allocator; // const io = std.testing.io; // const server = initTestServer(); // const client: Client = .init( // io, // gpa, // 1, // .{}, // ); // }