diff options
Diffstat (limited to 'src/server/Client.zig')
| -rw-r--r-- | src/server/Client.zig | 150 |
1 files changed, 92 insertions, 58 deletions
diff --git a/src/server/Client.zig b/src/server/Client.zig index 1e93360..8ff92e8 100644 --- a/src/server/Client.zig +++ b/src/server/Client.zig @@ -3,10 +3,16 @@ const std = @import("std"); const Client = @This(); +pub const Msgs = union(enum) { + MSG: Message.Msg, + HMSG: Message.HMsg, +}; + connect: ?Message.Connect, // Messages for this client to receive. recv_queue: *std.Io.Queue(Message), +msg_queue: *std.Io.Queue(Msgs), from_client: *std.Io.Reader, to_client: *std.Io.Writer, @@ -14,12 +20,14 @@ to_client: *std.Io.Writer, pub fn init( connect: ?Message.Connect, recv_queue: *std.Io.Queue(Message), + msg_queue: *std.Io.Queue(Msgs), in: *std.Io.Reader, out: *std.Io.Writer, ) Client { return .{ .connect = connect, .recv_queue = recv_queue, + .msg_queue = msg_queue, .from_client = in, .to_client = out, }; @@ -33,72 +41,98 @@ pub fn deinit(self: *Client, alloc: std.mem.Allocator) void { } pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { - var msgs: [256]Message = undefined; + var msgs_buf: [1024]Msgs = undefined; + + var recv_msgs_task = io.concurrent(recvMsgsMsg, .{ self, io, &msgs_buf }) catch @panic("Concurrency unavailable"); + errdefer _ = recv_msgs_task.cancel(io) catch {}; + + var recv_proto_task = io.concurrent(recvProtoMsg, .{ self, io }) catch unreachable; + errdefer _ = recv_proto_task.cancel(io) catch {}; while (true) { - const len = try self.recv_queue.get(io, &msgs, 1); - std.debug.assert(len <= msgs.len); - for (0..len) |i| { - const msg = msgs[i]; - defer switch (msg) { - .MSG => |m| m.deinit(alloc), - .HMSG => |h| h.deinit(alloc), - else => {}, - }; - errdefer for (msgs[i + 1 .. len]) |mg| switch (mg) { - .MSG => |m| { - m.deinit(alloc); - }, - else => {}, - }; - switch (msg) { - .@"+OK" => { - _ = try self.to_client.write("+OK\r\n"); - }, - .PONG => { - _ = try self.to_client.write("PONG\r\n"); - }, - .INFO => |info| { - _ = try self.to_client.write("INFO "); - try std.json.Stringify.value(info, .{}, self.to_client); - _ = try self.to_client.write("\r\n"); - }, - .MSG => |m| { - @branchHint(.likely); - try self.to_client.print( - "MSG {s} {s} {s} {d}\r\n{s}\r\n", - .{ - m.subject, - m.sid, - m.reply_to orelse "", - m.payload.len, - m.payload, + switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) { + .msgs => |msgs_err| { + @branchHint(.likely); + defer recv_msgs_task = io.concurrent(recvMsgsMsg, .{ self, io, &msgs_buf }) catch unreachable; + const msgs = try msgs_err; + for (0..msgs.len) |i| { + const msg = msgs[i]; + defer switch (msg) { + .MSG => |m| m.deinit(alloc), + .HMSG => |h| h.deinit(alloc), + }; + errdefer for (msgs[i + 1 ..]) |mg| switch (mg) { + .MSG => |m| { + m.deinit(alloc); }, - ); - }, - .HMSG => |hmsg| { - @branchHint(.likely); - try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n{s}\r\n", .{ - hmsg.msg.subject, - hmsg.msg.sid, - hmsg.msg.reply_to orelse "", - hmsg.header_bytes, - hmsg.msg.payload.len, - hmsg.msg.payload, - }); - }, - .@"-ERR" => |s| { - _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); - }, - else => |m| { - std.debug.panic("unimplemented write: {any}\n", .{m}); - }, - } + .HMSG => |h| { + h.deinit(alloc); + }, + }; + switch (msg) { + .MSG => |m| { + try self.to_client.print( + "MSG {s} {s} {s} {d}\r\n{s}\r\n", + .{ + m.subject, + m.sid, + m.reply_to orelse "", + m.payload.len, + m.payload, + }, + ); + }, + .HMSG => |hmsg| { + try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n{s}\r\n", .{ + hmsg.msg.subject, + hmsg.msg.sid, + hmsg.msg.reply_to orelse "", + hmsg.header_bytes, + hmsg.msg.payload.len, + hmsg.msg.payload, + }); + }, + } + } + }, + .proto => |msg_err| { + @branchHint(.unlikely); + defer recv_proto_task = io.concurrent(recvProtoMsg, .{ self, io }) catch unreachable; + const msg = try msg_err; + switch (msg) { + .@"+OK" => { + _ = try self.to_client.write("+OK\r\n"); + }, + .PONG => { + _ = try self.to_client.write("PONG\r\n"); + }, + .INFO => |info| { + _ = try self.to_client.write("INFO "); + try std.json.Stringify.value(info, .{}, self.to_client); + _ = try self.to_client.write("\r\n"); + }, + .@"-ERR" => |s| { + _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); + }, + else => |m| { + std.debug.panic("unimplemented write: {any}\n", .{m}); + }, + } + }, } try self.to_client.flush(); } } +fn recvProtoMsg(self: *Client, io: std.Io) !Message { + return self.recv_queue.getOne(io); +} + +fn recvMsgsMsg(self: *Client, io: std.Io, buf: []Msgs) ![]Msgs { + const len = try self.msg_queue.get(io, buf, 1); + return buf[0..len]; +} + pub fn send(self: *Client, io: std.Io, msg: Message) !void { try self.recv_queue.putOne(io, msg); } |
