diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2025-12-09 18:22:41 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2025-12-09 18:22:41 -0500 |
| commit | 45e8c4cc0824138b0433f789b4a0a21e77385aef (patch) | |
| tree | 972b0a780f340f8316ac0d3a1479ac42d2eecb2a /src | |
| parent | 30cc0170813500f73f897d846052cb1aeaa58d52 (diff) | |
sending errors on pub sub!
Diffstat (limited to 'src')
| -rw-r--r-- | src/server/client.zig | 30 | ||||
| -rw-r--r-- | src/server/main.zig | 10 |
2 files changed, 23 insertions, 17 deletions
diff --git a/src/server/client.zig b/src/server/client.zig index 05f7859..d6ccacf 100644 --- a/src/server/client.zig +++ b/src/server/client.zig @@ -6,7 +6,7 @@ pub const ClientState = struct { connect: Message.AllocatedConnect, /// Messages that this client should receive. - recv_queue: std.Io.Queue(Message), + recv_queue: *std.Io.Queue(Message), recv_queue_buffer: [1024]Message = undefined, from_client: *std.Io.Reader, @@ -16,6 +16,7 @@ pub const ClientState = struct { pub fn init( io: std.Io, + allocator: std.mem.Allocator, id: usize, connect: Message.AllocatedConnect, in: *std.Io.Reader, @@ -24,13 +25,13 @@ pub const ClientState = struct { var res: ClientState = .{ .id = id, .connect = connect, - .recv_queue = undefined, + .recv_queue = try allocator.create(std.Io.Queue(Message)), .from_client = in, .to_client = out, .task = undefined, }; - res.recv_queue = .init(&res.recv_queue_buffer); - res.task = try io.concurrent(processWrite, .{ &res, io, out }); + res.recv_queue.* = .init(&res.recv_queue_buffer); + res.task = try io.concurrent(processWrite, .{ &res, io }); return res; } @@ -38,23 +39,24 @@ pub const ClientState = struct { fn processWrite( self: *ClientState, io: std.Io, - out: *std.Io.Writer, ) void { + std.debug.print("out pointer in write proc: {*}\n", .{self.to_client}); + std.debug.print("recv queue in write proc: {*}\n", .{&self.recv_queue}); while (true) { const message = self.recv_queue.getOne(io) catch continue; std.debug.print("got message in write loop to send to client: {any}\n", .{message}); switch (message) { .@"+ok" => { - writeOk(out) catch break; + writeOk(self.to_client) catch break; }, .pong => { - writePong(out) catch break; + writePong(self.to_client) catch break; }, .info => |info| { - writeInfo(out, info) catch break; + writeInfo(self.to_client, info) catch break; }, .msg => |m| { - writeMsg(out, m) catch break; + writeMsg(self.to_client, m) catch break; }, else => { std.debug.panic("unimplemented write", .{}); @@ -63,9 +65,10 @@ pub const ClientState = struct { } } - pub fn deinit(self: *ClientState, io: std.Io) void { + pub fn deinit(self: *ClientState, io: std.Io, allocator: std.mem.Allocator) void { self.task.cancel(io); self.connect.deinit(); + allocator.destroy(self.recv_queue); } /// Return true if the value was put in the clients buffer to process, else false. @@ -89,11 +92,10 @@ fn writeOk(out: *std.Io.Writer) !void { } pub fn writePong(out: *std.Io.Writer) !void { + std.debug.print("out pointer: {*}\n", .{out}); std.debug.print("writing pong\n", .{}); - for (0..10000) |_| { - _ = try out.write("PONG\r\n"); - try out.flush(); - } + _ = try out.write("PONG\r\n"); + try out.flush(); } pub fn writeInfo(out: *std.Io.Writer, info: Message.ServerInfo) !void { diff --git a/src/server/main.zig b/src/server/main.zig index a732525..2ebf96d 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -63,6 +63,8 @@ fn handleConnection( var writer = stream.writer(io, &w_buffer); const out = &writer.interface; + std.debug.print("out pointer in client handler: {*}\n", .{out}); + var r_buffer: [8192]u8 = undefined; var reader = stream.reader(io, &r_buffer); const in = &reader.interface; @@ -71,8 +73,8 @@ fn handleConnection( var connect_arena: std.heap.ArenaAllocator = .init(allocator); defer connect_arena.deinit(); const connect = (Message.next(connect_arena.allocator(), in) catch return).connect; - var client_state: ClientState = try .init(io, id, connect, in, out); - defer client_state.deinit(io); + var client_state: ClientState = try .init(io, allocator, id, connect, in, out); + defer client_state.deinit(io, allocator); try server.addClient(allocator, id, &client_state); defer server.removeClient(allocator, id); @@ -107,7 +109,9 @@ fn handleConnection( switch (msg) { .ping => { std.debug.print("got a ping! sending a pong.\n", .{}); - @import("./client.zig").writePong(out) catch return; + + std.debug.print("recv queue in server loop: {*}\n", .{&client_state.recv_queue}); + // @import("./client.zig").writePong(out) catch return; for (0..5) |_| { if (try client_state.send(io, .pong)) { std.debug.print("sent pong\n", .{}); |
