summaryrefslogtreecommitdiff
path: root/src/server/Client.zig
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-05 13:56:31 -0500
committerRobby Zambito <contact@robbyzambito.me>2026-01-05 13:56:40 -0500
commit1d2af4a69a31174263481588ee8c7a53375a12e8 (patch)
treef9975dcc8c87322defc7d98bae010ec8a41aa506 /src/server/Client.zig
parent80d14f7303fd1dd577d74bfd81fb28e159fd5e79 (diff)
Simplified queue access
Also correctly move resetting the task to the end instead of defer. We don't want to reset the task in the case of an error, so shouldn't use defer.
Diffstat (limited to 'src/server/Client.zig')
-rw-r--r--src/server/Client.zig30
1 files changed, 11 insertions, 19 deletions
diff --git a/src/server/Client.zig b/src/server/Client.zig
index 4eceb89..d099ecb 100644
--- a/src/server/Client.zig
+++ b/src/server/Client.zig
@@ -1,5 +1,6 @@
const Message = @import("message_parser.zig").Message;
const std = @import("std");
+const Queue = std.Io.Queue;
const Client = @This();
@@ -11,16 +12,16 @@ pub const Msgs = union(enum) {
connect: ?Message.Connect,
// Messages for this client to receive.
-recv_queue: *std.Io.Queue(Message),
-msg_queue: *std.Io.Queue(Msgs),
+recv_queue: *Queue(Message),
+msg_queue: *Queue(Msgs),
from_client: *std.Io.Reader,
to_client: *std.Io.Writer,
pub fn init(
connect: ?Message.Connect,
- recv_queue: *std.Io.Queue(Message),
- msg_queue: *std.Io.Queue(Msgs),
+ recv_queue: *Queue(Message),
+ msg_queue: *Queue(Msgs),
in: *std.Io.Reader,
out: *std.Io.Writer,
) Client {
@@ -43,18 +44,17 @@ pub fn deinit(self: *Client, alloc: std.mem.Allocator) void {
pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
var msgs_buf: [1024]Msgs = undefined;
- var recv_msgs_task = io.concurrent(recvMsgsMsg, .{ self, io, &msgs_buf }) catch @panic("Concurrency unavailable");
+ var recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch @panic("Concurrency unavailable");
errdefer _ = recv_msgs_task.cancel(io) catch {};
- var recv_proto_task = io.concurrent(recvProtoMsg, .{ self, io }) catch unreachable;
+ var recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable;
errdefer _ = recv_proto_task.cancel(io) catch {};
while (true) {
switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) {
- .msgs => |msgs_err| {
+ .msgs => |len_err| {
@branchHint(.likely);
- defer recv_msgs_task = io.concurrent(recvMsgsMsg, .{ self, io, &msgs_buf }) catch unreachable;
- const msgs = try msgs_err;
+ const msgs = msgs_buf[0..try len_err];
for (0..msgs.len) |i| {
const msg = msgs[i];
defer switch (msg) {
@@ -96,10 +96,10 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
},
}
}
+ recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable;
},
.proto => |msg_err| {
@branchHint(.unlikely);
- defer recv_proto_task = io.concurrent(recvProtoMsg, .{ self, io }) catch unreachable;
const msg = try msg_err;
switch (msg) {
.@"+OK" => {
@@ -120,21 +120,13 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
std.debug.panic("unimplemented write: {any}\n", .{m});
},
}
+ recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable;
},
}
try self.to_client.flush();
}
}
-fn recvProtoMsg(self: *Client, io: std.Io) !Message {
- return self.recv_queue.getOne(io);
-}
-
-fn recvMsgsMsg(self: *Client, io: std.Io, buf: []Msgs) ![]Msgs {
- const len = try self.msg_queue.get(io, buf, 1);
- return buf[0..len];
-}
-
pub fn send(self: *Client, io: std.Io, msg: Message) !void {
try self.recv_queue.putOne(io, msg);
}