diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-02 02:13:13 +0000 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-02 02:13:13 +0000 |
| commit | 1e3c21f150331169ae5dabf61c89423810676f33 (patch) | |
| tree | f5904fc933768d9878409ad489fde86e0e6c1699 /src/server | |
| parent | 9ee8317cb092630ae00b736af9f6f3e7c4ea46c7 (diff) | |
Diffstat (limited to 'src/server')
| -rw-r--r-- | src/server/client.zig | 136 | ||||
| -rw-r--r-- | src/server/main.zig | 22 |
2 files changed, 102 insertions, 56 deletions
diff --git a/src/server/client.zig b/src/server/client.zig index ff7e5c1..0f7fa2c 100644 --- a/src/server/client.zig +++ b/src/server/client.zig @@ -5,7 +5,9 @@ const Client = @This(); connect: ?Message.Connect, -// Messages for this client to receive. +// Messages to send to this client. +send_queue: ?*std.Io.Queue(Message) = null, +// Messages received from this client. recv_queue: ?*std.Io.Queue(Message) = null, from_client: *std.Io.Reader, @@ -23,70 +25,102 @@ pub fn init( }; } -pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io.Queue(Message)) !void { - self.recv_queue = queue; +pub fn start( + self: *Client, + io: std.Io, + alloc: std.mem.Allocator, + send_queue: *std.Io.Queue(Message), + recv_queue: *std.Io.Queue(Message), +) !void { + self.send_queue = send_queue; + self.recv_queue = recv_queue; + + var recvL = try io.concurrent(startSendLoop, .{ self, io, alloc }); + defer recvL.cancel(io) catch {}; + + var sendL = try io.concurrent(startRecvLoop, .{ self, io, alloc }); + defer sendL.cancel(io) catch {}; + + // Wait for one of the tasks to cancel. + _ = try io.select(.{ + .recv = &recvL, + .send = &sendL, + }); +} + +fn startSendLoop(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { var msgs: [8]Message = undefined; - while (true) { - const len = try queue.get(io, &msgs, 1); - std.debug.assert(len <= msgs.len); - for (0..len) |i| { - const msg = msgs[i]; - defer switch (msg) { - .msg => |m| m.deinit(alloc), - else => {}, - }; - errdefer { - for (msgs[i + 1 .. len]) |mg| switch (mg) { - .msg => |m| { - m.deinit(alloc); - }, + if (self.send_queue) |queue| { + while (true) { + const len = try queue.get(io, &msgs, 1); + std.debug.assert(len <= msgs.len); + for (0..len) |i| { + const msg = msgs[i]; + defer switch (msg) { + .msg => |m| m.deinit(alloc), else => {}, }; - } - switch (msg) { - .@"+ok" => { - _ = try self.to_client.write("+OK\r\n"); - }, - .pong => { - _ = try self.to_client.write("PONG\r\n"); - }, - .info => |info| { - _ = try self.to_client.write("INFO "); - try std.json.Stringify.value(info, .{}, self.to_client); - _ = try self.to_client.write("\r\n"); - }, - .msg => |m| { - try self.to_client.print( - "MSG {s} {s} {s} {d}\r\n{s}\r\n", - .{ - m.subject, - m.sid, - m.reply_to orelse "", - m.payload.len, - m.payload, + errdefer { + for (msgs[i + 1 .. len]) |mg| switch (mg) { + .msg => |m| { + m.deinit(alloc); }, - ); - }, - .@"-err" => |s| { - _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); - }, - else => |m| { - std.debug.panic("unimplemented write: {any}\n", .{m}); - }, + else => {}, + }; + } + switch (msg) { + .@"+ok" => { + _ = try self.to_client.write("+OK\r\n"); + }, + .pong => { + _ = try self.to_client.write("PONG\r\n"); + }, + .info => |info| { + _ = try self.to_client.write("INFO "); + try std.json.Stringify.value(info, .{}, self.to_client); + _ = try self.to_client.write("\r\n"); + }, + .msg => |m| { + try self.to_client.print( + "MSG {s} {s} {s} {d}\r\n{s}\r\n", + .{ + m.subject, + m.sid, + m.reply_to orelse "", + m.payload.len, + m.payload, + }, + ); + }, + .@"-err" => |s| { + _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); + }, + else => |m| { + std.debug.panic("unimplemented write: {any}\n", .{m}); + }, + } } + try self.to_client.flush(); } - try self.to_client.flush(); - } + } else unreachable; } -pub fn send(self: *Client, io: std.Io, msg: Message) !void { +fn startRecvLoop(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { if (self.recv_queue) |queue| { + while (Message.next(alloc, self.from_client)) |msg| { + try queue.putOne(io, msg); + } else |_| {} + } else unreachable; +} + +pub fn send(self: *Client, io: std.Io, msg: Message) !void { + if (self.send_queue) |queue| { try queue.putOne(io, msg); } else @panic("Must start() the client before sending it messages."); } pub fn next(self: *Client, allocator: std.mem.Allocator) !Message { - return Message.next(allocator, self.from_client); + return; } test { diff --git a/src/server/main.zig b/src/server/main.zig index 044f551..c24a758 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -160,11 +160,11 @@ fn handleConnection( try server.addClient(server_allocator, id, &client); defer server.removeClient(io, server_allocator, id); - var qbuf: [8]Message = undefined; - var queue: std.Io.Queue(Message) = .init(&qbuf); + var sdqbuf: [8]Message = undefined; + var send_queue: std.Io.Queue(Message) = .init(&sdqbuf); defer { - queue.close(io); - while (queue.getOne(io)) |msg| { + send_queue.close(io); + while (send_queue.getOne(io)) |msg| { switch (msg) { .msg => |m| m.deinit(server_allocator), else => {}, @@ -172,7 +172,19 @@ fn handleConnection( } else |_| {} } - var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator, &queue }); + var rcqbuf: [8]Message = undefined; + var recv_queue: std.Io.Queue(Message) = .init(&rcqbuf); + defer { + recv_queue.close(io); + while (recv_queue.getOne(io)) |msg| { + switch (msg) { + .msg => |m| m.deinit(server_allocator), + else => {}, + } + } else |_| {} + } + + var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator, &send_queue, &recv_queue }); defer client_task.cancel(io) catch {}; try io.sleep(std.Io.Duration.fromMilliseconds(5), .real); |
