diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-05 13:56:31 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-05 13:56:40 -0500 |
| commit | 1d2af4a69a31174263481588ee8c7a53375a12e8 (patch) | |
| tree | f9975dcc8c87322defc7d98bae010ec8a41aa506 /src/server | |
| parent | 80d14f7303fd1dd577d74bfd81fb28e159fd5e79 (diff) | |
Simplified queue access
Also correctly move resetting the task to the end instead of defer.
We don't want to reset the task in the case of an error, so shouldn't
use defer.
Diffstat (limited to 'src/server')
| -rw-r--r-- | src/server/Client.zig | 30 |
1 files changed, 11 insertions, 19 deletions
diff --git a/src/server/Client.zig b/src/server/Client.zig index 4eceb89..d099ecb 100644 --- a/src/server/Client.zig +++ b/src/server/Client.zig @@ -1,5 +1,6 @@ const Message = @import("message_parser.zig").Message; const std = @import("std"); +const Queue = std.Io.Queue; const Client = @This(); @@ -11,16 +12,16 @@ pub const Msgs = union(enum) { connect: ?Message.Connect, // Messages for this client to receive. -recv_queue: *std.Io.Queue(Message), -msg_queue: *std.Io.Queue(Msgs), +recv_queue: *Queue(Message), +msg_queue: *Queue(Msgs), from_client: *std.Io.Reader, to_client: *std.Io.Writer, pub fn init( connect: ?Message.Connect, - recv_queue: *std.Io.Queue(Message), - msg_queue: *std.Io.Queue(Msgs), + recv_queue: *Queue(Message), + msg_queue: *Queue(Msgs), in: *std.Io.Reader, out: *std.Io.Writer, ) Client { @@ -43,18 +44,17 @@ pub fn deinit(self: *Client, alloc: std.mem.Allocator) void { pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { var msgs_buf: [1024]Msgs = undefined; - var recv_msgs_task = io.concurrent(recvMsgsMsg, .{ self, io, &msgs_buf }) catch @panic("Concurrency unavailable"); + var recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch @panic("Concurrency unavailable"); errdefer _ = recv_msgs_task.cancel(io) catch {}; - var recv_proto_task = io.concurrent(recvProtoMsg, .{ self, io }) catch unreachable; + var recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; errdefer _ = recv_proto_task.cancel(io) catch {}; while (true) { switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) { - .msgs => |msgs_err| { + .msgs => |len_err| { @branchHint(.likely); - defer recv_msgs_task = io.concurrent(recvMsgsMsg, .{ self, io, &msgs_buf }) catch unreachable; - const msgs = try msgs_err; + const msgs = msgs_buf[0..try len_err]; for (0..msgs.len) |i| { const msg = msgs[i]; defer switch (msg) { @@ -96,10 +96,10 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { }, } } + recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable; }, .proto => |msg_err| { @branchHint(.unlikely); - defer recv_proto_task = io.concurrent(recvProtoMsg, .{ self, io }) catch unreachable; const msg = try msg_err; switch (msg) { .@"+OK" => { @@ -120,21 +120,13 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { std.debug.panic("unimplemented write: {any}\n", .{m}); }, } + recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; }, } try self.to_client.flush(); } } -fn recvProtoMsg(self: *Client, io: std.Io) !Message { - return self.recv_queue.getOne(io); -} - -fn recvMsgsMsg(self: *Client, io: std.Io, buf: []Msgs) ![]Msgs { - const len = try self.msg_queue.get(io, buf, 1); - return buf[0..len]; -} - pub fn send(self: *Client, io: std.Io, msg: Message) !void { try self.recv_queue.putOne(io, msg); } |
