summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/server/client.zig136
-rw-r--r--src/server/main.zig22
2 files changed, 102 insertions, 56 deletions
diff --git a/src/server/client.zig b/src/server/client.zig
index ff7e5c1..0f7fa2c 100644
--- a/src/server/client.zig
+++ b/src/server/client.zig
@@ -5,7 +5,9 @@ const Client = @This();
connect: ?Message.Connect,
-// Messages for this client to receive.
+// Messages to send to this client.
+send_queue: ?*std.Io.Queue(Message) = null,
+// Messages received from this client.
recv_queue: ?*std.Io.Queue(Message) = null,
from_client: *std.Io.Reader,
@@ -23,70 +25,102 @@ pub fn init(
};
}
-pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io.Queue(Message)) !void {
- self.recv_queue = queue;
+pub fn start(
+ self: *Client,
+ io: std.Io,
+ alloc: std.mem.Allocator,
+ send_queue: *std.Io.Queue(Message),
+ recv_queue: *std.Io.Queue(Message),
+) !void {
+ self.send_queue = send_queue;
+ self.recv_queue = recv_queue;
+
+ var recvL = try io.concurrent(startSendLoop, .{ self, io, alloc });
+ defer recvL.cancel(io) catch {};
+
+ var sendL = try io.concurrent(startRecvLoop, .{ self, io, alloc });
+ defer sendL.cancel(io) catch {};
+
+ // Wait for one of the tasks to cancel.
+ _ = try io.select(.{
+ .recv = &recvL,
+ .send = &sendL,
+ });
+}
+
+fn startSendLoop(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
var msgs: [8]Message = undefined;
- while (true) {
- const len = try queue.get(io, &msgs, 1);
- std.debug.assert(len <= msgs.len);
- for (0..len) |i| {
- const msg = msgs[i];
- defer switch (msg) {
- .msg => |m| m.deinit(alloc),
- else => {},
- };
- errdefer {
- for (msgs[i + 1 .. len]) |mg| switch (mg) {
- .msg => |m| {
- m.deinit(alloc);
- },
+ if (self.send_queue) |queue| {
+ while (true) {
+ const len = try queue.get(io, &msgs, 1);
+ std.debug.assert(len <= msgs.len);
+ for (0..len) |i| {
+ const msg = msgs[i];
+ defer switch (msg) {
+ .msg => |m| m.deinit(alloc),
else => {},
};
- }
- switch (msg) {
- .@"+ok" => {
- _ = try self.to_client.write("+OK\r\n");
- },
- .pong => {
- _ = try self.to_client.write("PONG\r\n");
- },
- .info => |info| {
- _ = try self.to_client.write("INFO ");
- try std.json.Stringify.value(info, .{}, self.to_client);
- _ = try self.to_client.write("\r\n");
- },
- .msg => |m| {
- try self.to_client.print(
- "MSG {s} {s} {s} {d}\r\n{s}\r\n",
- .{
- m.subject,
- m.sid,
- m.reply_to orelse "",
- m.payload.len,
- m.payload,
+ errdefer {
+ for (msgs[i + 1 .. len]) |mg| switch (mg) {
+ .msg => |m| {
+ m.deinit(alloc);
},
- );
- },
- .@"-err" => |s| {
- _ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
- },
- else => |m| {
- std.debug.panic("unimplemented write: {any}\n", .{m});
- },
+ else => {},
+ };
+ }
+ switch (msg) {
+ .@"+ok" => {
+ _ = try self.to_client.write("+OK\r\n");
+ },
+ .pong => {
+ _ = try self.to_client.write("PONG\r\n");
+ },
+ .info => |info| {
+ _ = try self.to_client.write("INFO ");
+ try std.json.Stringify.value(info, .{}, self.to_client);
+ _ = try self.to_client.write("\r\n");
+ },
+ .msg => |m| {
+ try self.to_client.print(
+ "MSG {s} {s} {s} {d}\r\n{s}\r\n",
+ .{
+ m.subject,
+ m.sid,
+ m.reply_to orelse "",
+ m.payload.len,
+ m.payload,
+ },
+ );
+ },
+ .@"-err" => |s| {
+ _ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
+ },
+ else => |m| {
+ std.debug.panic("unimplemented write: {any}\n", .{m});
+ },
+ }
}
+ try self.to_client.flush();
}
- try self.to_client.flush();
- }
+ } else unreachable;
}
-pub fn send(self: *Client, io: std.Io, msg: Message) !void {
+fn startRecvLoop(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
if (self.recv_queue) |queue| {
+ while (Message.next(alloc, self.from_client)) |msg| {
+ try queue.putOne(io, msg);
+ } else |_| {}
+ } else unreachable;
+}
+
+pub fn send(self: *Client, io: std.Io, msg: Message) !void {
+ if (self.send_queue) |queue| {
try queue.putOne(io, msg);
} else @panic("Must start() the client before sending it messages.");
}
pub fn next(self: *Client, allocator: std.mem.Allocator) !Message {
- return Message.next(allocator, self.from_client);
+ return;
}
test {
diff --git a/src/server/main.zig b/src/server/main.zig
index 044f551..c24a758 100644
--- a/src/server/main.zig
+++ b/src/server/main.zig
@@ -160,11 +160,11 @@ fn handleConnection(
try server.addClient(server_allocator, id, &client);
defer server.removeClient(io, server_allocator, id);
- var qbuf: [8]Message = undefined;
- var queue: std.Io.Queue(Message) = .init(&qbuf);
+ var sdqbuf: [8]Message = undefined;
+ var send_queue: std.Io.Queue(Message) = .init(&sdqbuf);
defer {
- queue.close(io);
- while (queue.getOne(io)) |msg| {
+ send_queue.close(io);
+ while (send_queue.getOne(io)) |msg| {
switch (msg) {
.msg => |m| m.deinit(server_allocator),
else => {},
@@ -172,7 +172,19 @@ fn handleConnection(
} else |_| {}
}
- var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator, &queue });
+ var rcqbuf: [8]Message = undefined;
+ var recv_queue: std.Io.Queue(Message) = .init(&rcqbuf);
+ defer {
+ recv_queue.close(io);
+ while (recv_queue.getOne(io)) |msg| {
+ switch (msg) {
+ .msg => |m| m.deinit(server_allocator),
+ else => {},
+ }
+ } else |_| {}
+ }
+
+ var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator, &send_queue, &recv_queue });
defer client_task.cancel(io) catch {};
try io.sleep(std.Io.Duration.fromMilliseconds(5), .real);