const message = @import("message.zig"); const parse = message.parse; const Message = message.Message; const std = @import("std"); const Queue = std.Io.Queue; const Client = @This(); connect: ?Message.Connect, // Byte queue for this client to receive. recv_queue: *Queue(u8), // Only necessary to hold this lock for writing to the queue (to avoid interleaving message writes). recv_queue_write_lock: std.Io.Mutex = .init, from_client: *std.Io.Reader, to_client: *std.Io.Writer, pub fn init( connect: ?Message.Connect, recv_queue: *Queue(u8), in: *std.Io.Reader, out: *std.Io.Writer, ) Client { return .{ .connect = connect, .recv_queue = recv_queue, .from_client = in, .to_client = out, }; } pub fn deinit(self: *Client, alloc: std.mem.Allocator) void { if (self.connect) |c| { c.deinit(alloc); } self.* = undefined; } pub fn start(self: *Client, io: std.Io) !void { std.debug.assert(self.to_client.buffer.len > 0); std.debug.assert(self.to_client.end == 0); while (true) { self.to_client.end = try self.recv_queue.get(io, self.to_client.buffer, 1); try self.to_client.flush(); } } pub fn send(self: *Client, io: std.Io, msg: []const u8) !void { try self.recv_queue.putAll(io, msg); } test send { const io = std.testing.io; const gpa = std.testing.allocator; var to_client: std.Io.Writer = .fixed(blk: { var buf: [1024]u8 = undefined; break :blk &buf; }); var recv_queue: Queue(u8) = .init(&.{}); var client: Client = .init(null, &recv_queue, undefined, &to_client); defer client.deinit(gpa); var c_task = try io.concurrent(Client.start, .{ &client, io }); defer c_task.cancel(io) catch {}; { try client.send(io, "PONG\r\n"); // Wait for the concurrent client task to write to the writer try io.sleep(.fromMilliseconds(1), .awake); try std.testing.expectEqualSlices(u8, "PONG\r\n", to_client.buffered()); } to_client.end = 0; { const payload = "payload"; const msg: Message.Msg = .{ .sid = "1", .subject = "subject", .reply_to = "reply", .payload = .{ .len = payload.len, .short = blk: { var buf: [128]u8 = undefined; @memcpy(buf[0..payload.len], payload); break :blk buf; }, .long = null, }, }; try client.send(io, .{ // msg must be owned by the allocator the client uses .MSG = try msg.dupe(gpa), }); try io.sleep(.fromMilliseconds(1), .awake); try std.testing.expectEqualSlices(u8, "MSG subject 1 reply 7\r\npayload\r\n", to_client.buffered()); } } pub fn next(self: *Client) !message.Control { return parse.control(self.from_client); } test next { const gpa = std.testing.allocator; var from_client: std.Io.Reader = .fixed( "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_r" ++ "equired\":false,\"name\":\"NATS CLI Version v0.2." ++ "4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"prot" ++ "ocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n" ++ "PING\r\n", ); var client: Client = .init(null, undefined, undefined, undefined, &from_client, undefined); { // Simulate stream { const msg = try client.next(gpa); try std.testing.expectEqual(.CONNECT, std.meta.activeTag(msg)); defer msg.CONNECT.deinit(gpa); try std.testing.expectEqualDeep(Message{ .CONNECT = .{ .verbose = false, .pedantic = false, .tls_required = false, .name = "NATS CLI Version v0.2.4", .lang = "go", .version = "1.43.0", .protocol = 1, .echo = true, .headers = true, .no_responders = true, }, }, msg); } { const msg = try client.next(gpa); try std.testing.expectEqual(.PING, std.meta.activeTag(msg)); } } }