diff options
Diffstat (limited to 'src/server/main.zig')
| -rw-r--r-- | src/server/main.zig | 257 |
1 files changed, 129 insertions, 128 deletions
diff --git a/src/server/main.zig b/src/server/main.zig index 155a455..a732525 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -71,7 +71,8 @@ fn handleConnection( var connect_arena: std.heap.ArenaAllocator = .init(allocator); defer connect_arena.deinit(); const connect = (Message.next(connect_arena.allocator(), in) catch return).connect; - var client_state: ClientState = try .init(io, allocator, id, connect, in, out); + var client_state: ClientState = try .init(io, id, connect, in, out); + defer client_state.deinit(io); try server.addClient(allocator, id, &client_state); defer server.removeClient(allocator, id); @@ -96,9 +97,133 @@ fn handleConnection( // server.subscriptions.unlockPointers(); // } - server.processClient(allocator, io, &client_state) catch |err| { - std.debug.panic("Error processing client: {}\n", .{err}); - }; + { + defer std.debug.print("done processing client??\n", .{}); + std.debug.print("processing client: {d}\n", .{client_state.id}); + + std.debug.print("awaiting next message from client\n", .{}); + while (client_state.next(allocator)) |msg| { + std.debug.print("message from client!: {any}\n", .{msg}); + switch (msg) { + .ping => { + std.debug.print("got a ping! sending a pong.\n", .{}); + @import("./client.zig").writePong(out) catch return; + for (0..5) |_| { + if (try client_state.send(io, .pong)) { + std.debug.print("sent pong\n", .{}); + break; + } + std.debug.print("trying to send a pong again.\n", .{}); + } else { + std.debug.print("could not pong to client {d}\n", .{client_state.id}); + } + }, + .@"pub" => |@"pub"| { + std.debug.print("pub: {any}\n", .{@"pub"}); + try server.publishMessage(io, @"pub"); + if (client_state.connect.connect.verbose) { + std.debug.print("server IS sending +ok\n", .{}); + _ = try client_state.send(io, .@"+ok"); + } else { + std.debug.print("server NOT sending +ok\n", .{}); + } + }, + .sub => |sub| { + try server.subscribe(allocator, client_state.id, sub); + }, + .eos => { + break; + }, + else => |e| { + std.debug.panic("Unimplemented message: {any}\n", .{e}); + }, + } + + std.debug.print("processed message from client\n", .{}); + std.debug.print("awaiting next message from client\n", .{}); + } else |_| {} + + // while (!io.cancelRequested()) { + // if (client_state.send_queue.getOne(io)) |msg| { + // switch (msg) { + // // Respond to ping with pong. + // .ping => { + // try client_state.recv_queue.putOne(io, .pong); + // }, + // .@"pub" => |p| { + // std.debug.print("subs (in pub): {any}\n", .{server.subscriptions}); + // std.debug.print("subs size: {d}\n", .{server.subscriptions.size}); + // std.debug.print("subs subjects:\n", .{}); + // var key_iter = server.subscriptions.keyIterator(); + // while (key_iter.next()) |k| { + // std.debug.print("- {s}\n", .{k.*}); + // } else std.debug.print("<none>\n", .{}); + // std.debug.print("pub subject: '{s}'\n", .{p.subject}); + // std.debug.print("pub: {any}\n", .{p}); + // errdefer _ = client_state.recv_queue.put(io, &.{.@"-err"}, 1) catch {}; + // // Just publishing to exact matches right now. + // // TODO: Publish to pattern matching subjects. + // if (server.subscriptions.get(p.subject)) |subs| { + // var subs_iter = subs.iterator(); + // while (subs_iter.next()) |sub| { + // var client = server.clients.get(sub.key_ptr.*) orelse std.debug.panic("Trying to pub to a client that doesn't exist!\n", .{}); + // std.debug.print("{d} is pubbing to {d}\n", .{ client_state.id, client.id }); + // if ((try client.recv_queue.put( + // io, + // &.{ + // .{ + // .msg = .{ + // .subject = p.subject, + // .sid = sub.value_ptr.*, + // .reply_to = p.reply_to, + // .payload = p.payload, + // }, + // }, + // }, + // 0, + // )) > 0) { + // std.debug.print("published message!\n", .{}); + // } else { + // std.debug.print("skipped publishing for some reason\n", .{}); + // } + // } + // try client_state.recv_queue.putOne(io, .@"+ok"); + // } else { + // std.debug.print("no subs with subject\n", .{}); + // } + // }, + // .sub => |s| { + // var subscribers = try server.subscriptions.getOrPut(gpa, s.subject); + // if (!subscribers.found_existing) { + // subscribers.value_ptr.* = .empty; + // } + // try subscribers.value_ptr.*.put(gpa, client_state.id, s.sid); + + // std.debug.print("subs: {any}\n", .{server.subscriptions}); + // }, + // .info => |info| { + // std.debug.panic("got an info message? : {any}\n", .{info}); + // }, + // else => |m| { + // std.debug.panic("Unimplemented: {any}\n", .{m}); + // }, + // } + // } else |err| return err; + // } + + // while (true) { + // switch (next_message) { + // .connect => |connect| { + // std.debug.panic("Connection message after already connected: {any}\n", .{connect}); + // }, + // .ping => try writePong(out), + // .@"pub" => try writeOk(out), + // else => |msg| std.debug.panic("Message type not implemented: {any}\n", .{msg}), + // } + // } + } + + client_state.task.await(io); } // // Result is owned by the caller @@ -138,130 +263,6 @@ fn subscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Su try server.subscriptions.put(gpa, msg.subject, subs_for_subject); } -pub fn processClient(server: *Server, gpa: std.mem.Allocator, io: std.Io, client_state: *ClientState) !void { - defer std.debug.print("done processing client??\n", .{}); - defer client_state.deinit(gpa); - std.debug.print("processing client: {d}\n", .{client_state.id}); - - std.debug.print("awaiting next message from client\n", .{}); - while (client_state.next(io)) |msg| { - std.debug.print("message from client!: {any}\n", .{msg}); - switch (msg) { - .ping => { - std.debug.print("got a ping! sending a pong.\n", .{}); - for (0..5) |_| { - if (try client_state.send(io, .pong)) { - std.debug.print("sent pong\n", .{}); - break; - } - std.debug.print("trying to send a pong again.\n", .{}); - } else { - std.debug.print("could not pong to client {d}\n", .{client_state.id}); - } - }, - .@"pub" => |@"pub"| { - std.debug.print("pub: {any}\n", .{@"pub"}); - try server.publishMessage(io, @"pub"); - if (client_state.connect.connect.verbose) { - _ = try client_state.send(io, .@"+ok"); - } - }, - .sub => |sub| { - try server.subscribe(gpa, client_state.id, sub); - }, - .eos => { - client_state.io_group.wait(io); - break; - }, - else => |e| { - std.debug.panic("Unimplemented message: {any}\n", .{e}); - }, - } - - std.debug.print("processed message from client\n", .{}); - std.debug.print("awaiting next message from client\n", .{}); - } else |_| {} - - // while (!io.cancelRequested()) { - // if (client_state.send_queue.getOne(io)) |msg| { - // switch (msg) { - // // Respond to ping with pong. - // .ping => { - // try client_state.recv_queue.putOne(io, .pong); - // }, - // .@"pub" => |p| { - // std.debug.print("subs (in pub): {any}\n", .{server.subscriptions}); - // std.debug.print("subs size: {d}\n", .{server.subscriptions.size}); - // std.debug.print("subs subjects:\n", .{}); - // var key_iter = server.subscriptions.keyIterator(); - // while (key_iter.next()) |k| { - // std.debug.print("- {s}\n", .{k.*}); - // } else std.debug.print("<none>\n", .{}); - // std.debug.print("pub subject: '{s}'\n", .{p.subject}); - // std.debug.print("pub: {any}\n", .{p}); - // errdefer _ = client_state.recv_queue.put(io, &.{.@"-err"}, 1) catch {}; - // // Just publishing to exact matches right now. - // // TODO: Publish to pattern matching subjects. - // if (server.subscriptions.get(p.subject)) |subs| { - // var subs_iter = subs.iterator(); - // while (subs_iter.next()) |sub| { - // var client = server.clients.get(sub.key_ptr.*) orelse std.debug.panic("Trying to pub to a client that doesn't exist!\n", .{}); - // std.debug.print("{d} is pubbing to {d}\n", .{ client_state.id, client.id }); - // if ((try client.recv_queue.put( - // io, - // &.{ - // .{ - // .msg = .{ - // .subject = p.subject, - // .sid = sub.value_ptr.*, - // .reply_to = p.reply_to, - // .payload = p.payload, - // }, - // }, - // }, - // 0, - // )) > 0) { - // std.debug.print("published message!\n", .{}); - // } else { - // std.debug.print("skipped publishing for some reason\n", .{}); - // } - // } - // try client_state.recv_queue.putOne(io, .@"+ok"); - // } else { - // std.debug.print("no subs with subject\n", .{}); - // } - // }, - // .sub => |s| { - // var subscribers = try server.subscriptions.getOrPut(gpa, s.subject); - // if (!subscribers.found_existing) { - // subscribers.value_ptr.* = .empty; - // } - // try subscribers.value_ptr.*.put(gpa, client_state.id, s.sid); - - // std.debug.print("subs: {any}\n", .{server.subscriptions}); - // }, - // .info => |info| { - // std.debug.panic("got an info message? : {any}\n", .{info}); - // }, - // else => |m| { - // std.debug.panic("Unimplemented: {any}\n", .{m}); - // }, - // } - // } else |err| return err; - // } - - // while (true) { - // switch (next_message) { - // .connect => |connect| { - // std.debug.panic("Connection message after already connected: {any}\n", .{connect}); - // }, - // .ping => try writePong(out), - // .@"pub" => try writeOk(out), - // else => |msg| std.debug.panic("Message type not implemented: {any}\n", .{msg}), - // } - // } -} - pub fn createId() []const u8 { return "SERVERID"; } |
