diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-06 18:45:17 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-06 20:43:49 -0500 |
| commit | b87412ee66197d4c89f1fbf93b32fe63ed1c63ab (patch) | |
| tree | 00613d0d3f7178d0c5b974ce04a752443e9a816e /src/Server/Client.zig | |
| parent | 025a5344c8c922a8f46c4ee0e73a00ce0c3c4790 (diff) | |
Restructuring
Add a bunch of tests for the client
Diffstat (limited to 'src/Server/Client.zig')
| -rw-r--r-- | src/Server/Client.zig | 240 |
1 files changed, 240 insertions, 0 deletions
diff --git a/src/Server/Client.zig b/src/Server/Client.zig new file mode 100644 index 0000000..dff3534 --- /dev/null +++ b/src/Server/Client.zig @@ -0,0 +1,240 @@ +const Message = @import("message_parser.zig").Message; +const std = @import("std"); +const Queue = std.Io.Queue; + +const Client = @This(); + +pub const Msgs = union(enum) { + MSG: Message.Msg, + HMSG: Message.HMsg, +}; + +connect: ?Message.Connect, +// Used to own messages that we receive in our queues. +alloc: std.mem.Allocator, + +// Messages for this client to receive. +recv_queue: *Queue(Message), +msg_queue: *Queue(Msgs), + +from_client: *std.Io.Reader, +to_client: *std.Io.Writer, + +pub fn init( + connect: ?Message.Connect, + alloc: std.mem.Allocator, + recv_queue: *Queue(Message), + msg_queue: *Queue(Msgs), + in: *std.Io.Reader, + out: *std.Io.Writer, +) Client { + return .{ + .connect = connect, + .alloc = alloc, + .recv_queue = recv_queue, + .msg_queue = msg_queue, + .from_client = in, + .to_client = out, + }; +} + +pub fn deinit(self: *Client, alloc: std.mem.Allocator) void { + if (self.connect) |c| { + c.deinit(alloc); + } + self.* = undefined; +} + +pub fn start(self: *Client, io: std.Io) !void { + var msgs_buf: [1024]Msgs = undefined; + + var recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch @panic("Concurrency unavailable"); + errdefer _ = recv_msgs_task.cancel(io) catch {}; + + var recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; + errdefer _ = recv_proto_task.cancel(io) catch {}; + + while (true) { + switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) { + .msgs => |len_err| { + @branchHint(.likely); + const msgs = msgs_buf[0..try len_err]; + for (0..msgs.len) |i| { + const msg = msgs[i]; + defer switch (msg) { + .MSG => |m| m.deinit(self.alloc), + .HMSG => |h| h.deinit(self.alloc), + }; + errdefer for (msgs[i + 1 ..]) |mg| switch (mg) { + .MSG => |m| { + m.deinit(self.alloc); + }, + .HMSG => |h| { + h.deinit(self.alloc); + }, + }; + switch (msg) { + .MSG => |m| { + try self.to_client.print( + "MSG {s} {s} {s} {d}\r\n", + .{ + m.subject, + m.sid, + m.reply_to orelse "", + m.payload.len, + }, + ); + try m.payload.write(self.to_client); + try self.to_client.print("\r\n", .{}); + }, + .HMSG => |hmsg| { + try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{ + hmsg.msg.subject, + hmsg.msg.sid, + hmsg.msg.reply_to orelse "", + hmsg.header_bytes, + hmsg.msg.payload.len, + }); + try hmsg.msg.payload.write(self.to_client); + try self.to_client.print("\r\n", .{}); + }, + } + } + recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable; + }, + .proto => |msg_err| { + @branchHint(.unlikely); + const msg = try msg_err; + switch (msg) { + .@"+OK" => { + _ = try self.to_client.write("+OK\r\n"); + }, + .PONG => { + _ = try self.to_client.write("PONG\r\n"); + }, + .INFO => |info| { + _ = try self.to_client.write("INFO "); + try std.json.Stringify.value(info, .{}, self.to_client); + _ = try self.to_client.write("\r\n"); + }, + .@"-ERR" => |s| { + _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); + }, + else => |m| { + std.debug.panic("unimplemented write: {any}\n", .{m}); + }, + } + recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; + }, + } + try self.to_client.flush(); + } +} + +pub fn send(self: *Client, io: std.Io, msg: Message) !void { + switch (msg) { + .MSG => |m| try self.msg_queue.putOne(io, .{ .MSG = m }), + .HMSG => |m| try self.msg_queue.putOne(io, .{ .HMSG = m }), + else => try self.recv_queue.putOne(io, msg), + } +} + +test send { + const io = std.testing.io; + const gpa = std.testing.allocator; + + var to_client: std.Io.Writer = .fixed(blk: { + var buf: [1024]u8 = undefined; + break :blk &buf; + }); + var recv_queue: Queue(Message) = .init(&.{}); + var msgs_queue: Queue(Msgs) = .init(blk: { + var buf: [1]Msgs = undefined; + break :blk &buf; + }); + var client: Client = .init(null, gpa, &recv_queue, &msgs_queue, undefined, &to_client); + defer client.deinit(gpa); + + var c_task = try io.concurrent(Client.start, .{ &client, io }); + defer c_task.cancel(io) catch {}; + + { + try client.send(io, .PONG); + // Wait for the concurrent client task to write to the writer + try io.sleep(.fromMilliseconds(1), .awake); + try std.testing.expectEqualSlices(u8, "PONG\r\n", to_client.buffered()); + } + + to_client.end = 0; + + { + const payload = "payload"; + const msg: Message.Msg = .{ + .sid = "1", + .subject = "subject", + .reply_to = "reply", + .payload = .{ + .len = payload.len, + .short = blk: { + var buf: [128]u8 = undefined; + @memcpy(buf[0..payload.len], payload); + break :blk buf; + }, + .long = null, + }, + }; + try client.send(io, .{ + // msg must be owned by the allocator the client uses + .MSG = try msg.dupe(gpa), + }); + try io.sleep(.fromMilliseconds(1), .awake); + try std.testing.expectEqualSlices(u8, "MSG subject 1 reply 7\r\npayload\r\n", to_client.buffered()); + } +} + +pub fn next(self: *Client, allocator: std.mem.Allocator) !Message { + return Message.next(allocator, self.from_client); +} + +test next { + const gpa = std.testing.allocator; + + var from_client: std.Io.Reader = .fixed( + "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_r" ++ + "equired\":false,\"name\":\"NATS CLI Version v0.2." ++ + "4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"prot" ++ + "ocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n" ++ + "PING\r\n", + ); + + var client: Client = .init(null, undefined, undefined, undefined, &from_client, undefined); + + { + // Simulate stream + + { + const msg = try client.next(gpa); + try std.testing.expectEqual(.CONNECT, std.meta.activeTag(msg)); + defer msg.CONNECT.deinit(gpa); + try std.testing.expectEqualDeep(Message{ + .CONNECT = .{ + .verbose = false, + .pedantic = false, + .tls_required = false, + .name = "NATS CLI Version v0.2.4", + .lang = "go", + .version = "1.43.0", + .protocol = 1, + .echo = true, + .headers = true, + .no_responders = true, + }, + }, msg); + } + + { + const msg = try client.next(gpa); + try std.testing.expectEqual(.PING, std.meta.activeTag(msg)); + } + } +} |
