diff options
| -rw-r--r-- | src/server/Client.zig | 30 |
1 files changed, 11 insertions, 19 deletions
diff --git a/src/server/Client.zig b/src/server/Client.zig index 4eceb89..d099ecb 100644 --- a/src/server/Client.zig +++ b/src/server/Client.zig @@ -1,5 +1,6 @@ const Message = @import("message_parser.zig").Message; const std = @import("std"); +const Queue = std.Io.Queue; const Client = @This(); @@ -11,16 +12,16 @@ pub const Msgs = union(enum) { connect: ?Message.Connect, // Messages for this client to receive. -recv_queue: *std.Io.Queue(Message), -msg_queue: *std.Io.Queue(Msgs), +recv_queue: *Queue(Message), +msg_queue: *Queue(Msgs), from_client: *std.Io.Reader, to_client: *std.Io.Writer, pub fn init( connect: ?Message.Connect, - recv_queue: *std.Io.Queue(Message), - msg_queue: *std.Io.Queue(Msgs), + recv_queue: *Queue(Message), + msg_queue: *Queue(Msgs), in: *std.Io.Reader, out: *std.Io.Writer, ) Client { @@ -43,18 +44,17 @@ pub fn deinit(self: *Client, alloc: std.mem.Allocator) void { pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { var msgs_buf: [1024]Msgs = undefined; - var recv_msgs_task = io.concurrent(recvMsgsMsg, .{ self, io, &msgs_buf }) catch @panic("Concurrency unavailable"); + var recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch @panic("Concurrency unavailable"); errdefer _ = recv_msgs_task.cancel(io) catch {}; - var recv_proto_task = io.concurrent(recvProtoMsg, .{ self, io }) catch unreachable; + var recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; errdefer _ = recv_proto_task.cancel(io) catch {}; while (true) { switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) { - .msgs => |msgs_err| { + .msgs => |len_err| { @branchHint(.likely); - defer recv_msgs_task = io.concurrent(recvMsgsMsg, .{ self, io, &msgs_buf }) catch unreachable; - const msgs = try msgs_err; + const msgs = msgs_buf[0..try len_err]; for (0..msgs.len) |i| { const msg = msgs[i]; defer switch (msg) { @@ -96,10 +96,10 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { }, } } + recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable; }, .proto => |msg_err| { @branchHint(.unlikely); - defer recv_proto_task = io.concurrent(recvProtoMsg, .{ self, io }) catch unreachable; const msg = try msg_err; switch (msg) { .@"+OK" => { @@ -120,21 +120,13 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { std.debug.panic("unimplemented write: {any}\n", .{m}); }, } + recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; }, } try self.to_client.flush(); } } -fn recvProtoMsg(self: *Client, io: std.Io) !Message { - return self.recv_queue.getOne(io); -} - -fn recvMsgsMsg(self: *Client, io: std.Io, buf: []Msgs) ![]Msgs { - const len = try self.msg_queue.get(io, buf, 1); - return buf[0..len]; -} - pub fn send(self: *Client, io: std.Io, msg: Message) !void { try self.recv_queue.putOne(io, msg); } |
