summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/server/Client.zig30
1 files changed, 11 insertions, 19 deletions
diff --git a/src/server/Client.zig b/src/server/Client.zig
index 4eceb89..d099ecb 100644
--- a/src/server/Client.zig
+++ b/src/server/Client.zig
@@ -1,5 +1,6 @@
const Message = @import("message_parser.zig").Message;
const std = @import("std");
+const Queue = std.Io.Queue;
const Client = @This();
@@ -11,16 +12,16 @@ pub const Msgs = union(enum) {
connect: ?Message.Connect,
// Messages for this client to receive.
-recv_queue: *std.Io.Queue(Message),
-msg_queue: *std.Io.Queue(Msgs),
+recv_queue: *Queue(Message),
+msg_queue: *Queue(Msgs),
from_client: *std.Io.Reader,
to_client: *std.Io.Writer,
pub fn init(
connect: ?Message.Connect,
- recv_queue: *std.Io.Queue(Message),
- msg_queue: *std.Io.Queue(Msgs),
+ recv_queue: *Queue(Message),
+ msg_queue: *Queue(Msgs),
in: *std.Io.Reader,
out: *std.Io.Writer,
) Client {
@@ -43,18 +44,17 @@ pub fn deinit(self: *Client, alloc: std.mem.Allocator) void {
pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
var msgs_buf: [1024]Msgs = undefined;
- var recv_msgs_task = io.concurrent(recvMsgsMsg, .{ self, io, &msgs_buf }) catch @panic("Concurrency unavailable");
+ var recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch @panic("Concurrency unavailable");
errdefer _ = recv_msgs_task.cancel(io) catch {};
- var recv_proto_task = io.concurrent(recvProtoMsg, .{ self, io }) catch unreachable;
+ var recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable;
errdefer _ = recv_proto_task.cancel(io) catch {};
while (true) {
switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) {
- .msgs => |msgs_err| {
+ .msgs => |len_err| {
@branchHint(.likely);
- defer recv_msgs_task = io.concurrent(recvMsgsMsg, .{ self, io, &msgs_buf }) catch unreachable;
- const msgs = try msgs_err;
+ const msgs = msgs_buf[0..try len_err];
for (0..msgs.len) |i| {
const msg = msgs[i];
defer switch (msg) {
@@ -96,10 +96,10 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
},
}
}
+ recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable;
},
.proto => |msg_err| {
@branchHint(.unlikely);
- defer recv_proto_task = io.concurrent(recvProtoMsg, .{ self, io }) catch unreachable;
const msg = try msg_err;
switch (msg) {
.@"+OK" => {
@@ -120,21 +120,13 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
std.debug.panic("unimplemented write: {any}\n", .{m});
},
}
+ recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable;
},
}
try self.to_client.flush();
}
}
-fn recvProtoMsg(self: *Client, io: std.Io) !Message {
- return self.recv_queue.getOne(io);
-}
-
-fn recvMsgsMsg(self: *Client, io: std.Io, buf: []Msgs) ![]Msgs {
- const len = try self.msg_queue.get(io, buf, 1);
- return buf[0..len];
-}
-
pub fn send(self: *Client, io: std.Io, msg: Message) !void {
try self.recv_queue.putOne(io, msg);
}