summaryrefslogtreecommitdiff
path: root/src/Server/Client.zig
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-06 18:45:17 -0500
committerRobby Zambito <contact@robbyzambito.me>2026-01-06 20:43:49 -0500
commitb87412ee66197d4c89f1fbf93b32fe63ed1c63ab (patch)
tree00613d0d3f7178d0c5b974ce04a752443e9a816e /src/Server/Client.zig
parent025a5344c8c922a8f46c4ee0e73a00ce0c3c4790 (diff)
Restructuring
Add a bunch of tests for the client
Diffstat (limited to 'src/Server/Client.zig')
-rw-r--r--src/Server/Client.zig240
1 files changed, 240 insertions, 0 deletions
diff --git a/src/Server/Client.zig b/src/Server/Client.zig
new file mode 100644
index 0000000..dff3534
--- /dev/null
+++ b/src/Server/Client.zig
@@ -0,0 +1,240 @@
+const Message = @import("message_parser.zig").Message;
+const std = @import("std");
+const Queue = std.Io.Queue;
+
+const Client = @This();
+
+pub const Msgs = union(enum) {
+ MSG: Message.Msg,
+ HMSG: Message.HMsg,
+};
+
+connect: ?Message.Connect,
+// Used to own messages that we receive in our queues.
+alloc: std.mem.Allocator,
+
+// Messages for this client to receive.
+recv_queue: *Queue(Message),
+msg_queue: *Queue(Msgs),
+
+from_client: *std.Io.Reader,
+to_client: *std.Io.Writer,
+
+pub fn init(
+ connect: ?Message.Connect,
+ alloc: std.mem.Allocator,
+ recv_queue: *Queue(Message),
+ msg_queue: *Queue(Msgs),
+ in: *std.Io.Reader,
+ out: *std.Io.Writer,
+) Client {
+ return .{
+ .connect = connect,
+ .alloc = alloc,
+ .recv_queue = recv_queue,
+ .msg_queue = msg_queue,
+ .from_client = in,
+ .to_client = out,
+ };
+}
+
+pub fn deinit(self: *Client, alloc: std.mem.Allocator) void {
+ if (self.connect) |c| {
+ c.deinit(alloc);
+ }
+ self.* = undefined;
+}
+
+pub fn start(self: *Client, io: std.Io) !void {
+ var msgs_buf: [1024]Msgs = undefined;
+
+ var recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch @panic("Concurrency unavailable");
+ errdefer _ = recv_msgs_task.cancel(io) catch {};
+
+ var recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable;
+ errdefer _ = recv_proto_task.cancel(io) catch {};
+
+ while (true) {
+ switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) {
+ .msgs => |len_err| {
+ @branchHint(.likely);
+ const msgs = msgs_buf[0..try len_err];
+ for (0..msgs.len) |i| {
+ const msg = msgs[i];
+ defer switch (msg) {
+ .MSG => |m| m.deinit(self.alloc),
+ .HMSG => |h| h.deinit(self.alloc),
+ };
+ errdefer for (msgs[i + 1 ..]) |mg| switch (mg) {
+ .MSG => |m| {
+ m.deinit(self.alloc);
+ },
+ .HMSG => |h| {
+ h.deinit(self.alloc);
+ },
+ };
+ switch (msg) {
+ .MSG => |m| {
+ try self.to_client.print(
+ "MSG {s} {s} {s} {d}\r\n",
+ .{
+ m.subject,
+ m.sid,
+ m.reply_to orelse "",
+ m.payload.len,
+ },
+ );
+ try m.payload.write(self.to_client);
+ try self.to_client.print("\r\n", .{});
+ },
+ .HMSG => |hmsg| {
+ try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{
+ hmsg.msg.subject,
+ hmsg.msg.sid,
+ hmsg.msg.reply_to orelse "",
+ hmsg.header_bytes,
+ hmsg.msg.payload.len,
+ });
+ try hmsg.msg.payload.write(self.to_client);
+ try self.to_client.print("\r\n", .{});
+ },
+ }
+ }
+ recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable;
+ },
+ .proto => |msg_err| {
+ @branchHint(.unlikely);
+ const msg = try msg_err;
+ switch (msg) {
+ .@"+OK" => {
+ _ = try self.to_client.write("+OK\r\n");
+ },
+ .PONG => {
+ _ = try self.to_client.write("PONG\r\n");
+ },
+ .INFO => |info| {
+ _ = try self.to_client.write("INFO ");
+ try std.json.Stringify.value(info, .{}, self.to_client);
+ _ = try self.to_client.write("\r\n");
+ },
+ .@"-ERR" => |s| {
+ _ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
+ },
+ else => |m| {
+ std.debug.panic("unimplemented write: {any}\n", .{m});
+ },
+ }
+ recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable;
+ },
+ }
+ try self.to_client.flush();
+ }
+}
+
+pub fn send(self: *Client, io: std.Io, msg: Message) !void {
+ switch (msg) {
+ .MSG => |m| try self.msg_queue.putOne(io, .{ .MSG = m }),
+ .HMSG => |m| try self.msg_queue.putOne(io, .{ .HMSG = m }),
+ else => try self.recv_queue.putOne(io, msg),
+ }
+}
+
+test send {
+ const io = std.testing.io;
+ const gpa = std.testing.allocator;
+
+ var to_client: std.Io.Writer = .fixed(blk: {
+ var buf: [1024]u8 = undefined;
+ break :blk &buf;
+ });
+ var recv_queue: Queue(Message) = .init(&.{});
+ var msgs_queue: Queue(Msgs) = .init(blk: {
+ var buf: [1]Msgs = undefined;
+ break :blk &buf;
+ });
+ var client: Client = .init(null, gpa, &recv_queue, &msgs_queue, undefined, &to_client);
+ defer client.deinit(gpa);
+
+ var c_task = try io.concurrent(Client.start, .{ &client, io });
+ defer c_task.cancel(io) catch {};
+
+ {
+ try client.send(io, .PONG);
+ // Wait for the concurrent client task to write to the writer
+ try io.sleep(.fromMilliseconds(1), .awake);
+ try std.testing.expectEqualSlices(u8, "PONG\r\n", to_client.buffered());
+ }
+
+ to_client.end = 0;
+
+ {
+ const payload = "payload";
+ const msg: Message.Msg = .{
+ .sid = "1",
+ .subject = "subject",
+ .reply_to = "reply",
+ .payload = .{
+ .len = payload.len,
+ .short = blk: {
+ var buf: [128]u8 = undefined;
+ @memcpy(buf[0..payload.len], payload);
+ break :blk buf;
+ },
+ .long = null,
+ },
+ };
+ try client.send(io, .{
+ // msg must be owned by the allocator the client uses
+ .MSG = try msg.dupe(gpa),
+ });
+ try io.sleep(.fromMilliseconds(1), .awake);
+ try std.testing.expectEqualSlices(u8, "MSG subject 1 reply 7\r\npayload\r\n", to_client.buffered());
+ }
+}
+
+pub fn next(self: *Client, allocator: std.mem.Allocator) !Message {
+ return Message.next(allocator, self.from_client);
+}
+
+test next {
+ const gpa = std.testing.allocator;
+
+ var from_client: std.Io.Reader = .fixed(
+ "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_r" ++
+ "equired\":false,\"name\":\"NATS CLI Version v0.2." ++
+ "4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"prot" ++
+ "ocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n" ++
+ "PING\r\n",
+ );
+
+ var client: Client = .init(null, undefined, undefined, undefined, &from_client, undefined);
+
+ {
+ // Simulate stream
+
+ {
+ const msg = try client.next(gpa);
+ try std.testing.expectEqual(.CONNECT, std.meta.activeTag(msg));
+ defer msg.CONNECT.deinit(gpa);
+ try std.testing.expectEqualDeep(Message{
+ .CONNECT = .{
+ .verbose = false,
+ .pedantic = false,
+ .tls_required = false,
+ .name = "NATS CLI Version v0.2.4",
+ .lang = "go",
+ .version = "1.43.0",
+ .protocol = 1,
+ .echo = true,
+ .headers = true,
+ .no_responders = true,
+ },
+ }, msg);
+ }
+
+ {
+ const msg = try client.next(gpa);
+ try std.testing.expectEqual(.PING, std.meta.activeTag(msg));
+ }
+ }
+}