From 1d2af4a69a31174263481588ee8c7a53375a12e8 Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Mon, 5 Jan 2026 13:56:31 -0500 Subject: Simplified queue access Also correctly move resetting the task to the end instead of defer. We don't want to reset the task in the case of an error, so shouldn't use defer. --- src/server/Client.zig | 30 +++++++++++------------------- 1 file changed, 11 insertions(+), 19 deletions(-) (limited to 'src/server/Client.zig') diff --git a/src/server/Client.zig b/src/server/Client.zig index 4eceb89..d099ecb 100644 --- a/src/server/Client.zig +++ b/src/server/Client.zig @@ -1,5 +1,6 @@ const Message = @import("message_parser.zig").Message; const std = @import("std"); +const Queue = std.Io.Queue; const Client = @This(); @@ -11,16 +12,16 @@ pub const Msgs = union(enum) { connect: ?Message.Connect, // Messages for this client to receive. -recv_queue: *std.Io.Queue(Message), -msg_queue: *std.Io.Queue(Msgs), +recv_queue: *Queue(Message), +msg_queue: *Queue(Msgs), from_client: *std.Io.Reader, to_client: *std.Io.Writer, pub fn init( connect: ?Message.Connect, - recv_queue: *std.Io.Queue(Message), - msg_queue: *std.Io.Queue(Msgs), + recv_queue: *Queue(Message), + msg_queue: *Queue(Msgs), in: *std.Io.Reader, out: *std.Io.Writer, ) Client { @@ -43,18 +44,17 @@ pub fn deinit(self: *Client, alloc: std.mem.Allocator) void { pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { var msgs_buf: [1024]Msgs = undefined; - var recv_msgs_task = io.concurrent(recvMsgsMsg, .{ self, io, &msgs_buf }) catch @panic("Concurrency unavailable"); + var recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch @panic("Concurrency unavailable"); errdefer _ = recv_msgs_task.cancel(io) catch {}; - var recv_proto_task = io.concurrent(recvProtoMsg, .{ self, io }) catch unreachable; + var recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; errdefer _ = recv_proto_task.cancel(io) catch {}; while (true) { switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) { - .msgs => |msgs_err| { + .msgs => |len_err| { @branchHint(.likely); - defer recv_msgs_task = io.concurrent(recvMsgsMsg, .{ self, io, &msgs_buf }) catch unreachable; - const msgs = try msgs_err; + const msgs = msgs_buf[0..try len_err]; for (0..msgs.len) |i| { const msg = msgs[i]; defer switch (msg) { @@ -96,10 +96,10 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { }, } } + recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable; }, .proto => |msg_err| { @branchHint(.unlikely); - defer recv_proto_task = io.concurrent(recvProtoMsg, .{ self, io }) catch unreachable; const msg = try msg_err; switch (msg) { .@"+OK" => { @@ -120,21 +120,13 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { std.debug.panic("unimplemented write: {any}\n", .{m}); }, } + recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; }, } try self.to_client.flush(); } } -fn recvProtoMsg(self: *Client, io: std.Io) !Message { - return self.recv_queue.getOne(io); -} - -fn recvMsgsMsg(self: *Client, io: std.Io, buf: []Msgs) ![]Msgs { - const len = try self.msg_queue.get(io, buf, 1); - return buf[0..len]; -} - pub fn send(self: *Client, io: std.Io, msg: Message) !void { try self.recv_queue.putOne(io, msg); } -- cgit