const message = @import("message.zig"); const parse = message.parse; const Message = message.Message; const std = @import("std"); const Queue = std.Io.Queue; const Client = @This(); pub const Msgs = union(enum) { MSG: Message.Msg, HMSG: Message.HMsg, }; connect: ?Message.Connect, // Byte queue for this client to receive. recv_queue: *Queue(u8), // Only necessary to hold this lock for writing to the queue (to avoid interleaving message writes). recv_queue_write_lock: std.Io.Mutex = .init, from_client: *std.Io.Reader, to_client: *std.Io.Writer, pub fn init( connect: ?Message.Connect, recv_queue: *Queue(u8), in: *std.Io.Reader, out: *std.Io.Writer, ) Client { return .{ .connect = connect, .recv_queue = recv_queue, .from_client = in, .to_client = out, }; } pub fn deinit(self: *Client, alloc: std.mem.Allocator) void { if (self.connect) |c| { c.deinit(alloc); } self.* = undefined; } pub fn start(self: *Client, io: std.Io) !void { std.debug.assert(self.to_client.buffer.len > 0); std.debug.assert(self.to_client.end == 0); while (true) { self.to_client.end = try self.recv_queue.get(io, self.to_client.buffer, 1); try self.to_client.flush(); } // while (true) { // switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) { // .msgs => |len_err| { // @branchHint(.likely); // const msgs = msgs_buf[0..try len_err]; // for (0..msgs.len) |i| { // const msg = msgs[i]; // defer switch (msg) { // .MSG => |m| m.deinit(self.alloc), // .HMSG => |h| h.deinit(self.alloc), // }; // errdefer for (msgs[i + 1 ..]) |mg| switch (mg) { // .MSG => |m| { // m.deinit(self.alloc); // }, // .HMSG => |h| { // h.deinit(self.alloc); // }, // }; // switch (msg) { // .MSG => |m| { // }, // .HMSG => |hmsg| { // try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{ // hmsg.msg.subject, // hmsg.msg.sid, // hmsg.msg.reply_to orelse "", // hmsg.header_bytes, // hmsg.msg.payload.len, // }); // try hmsg.msg.payload.write(self.to_client); // try self.to_client.print("\r\n", .{}); // }, // } // } // recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable; // }, // .proto => |msg_err| { // @branchHint(.unlikely); // const msg = try msg_err; // switch (msg) { // .@"+OK" => { // _ = try self.to_client.write("+OK\r\n"); // }, // .PONG => { // _ = try self.to_client.write("PONG\r\n"); // }, // .INFO => |info| { // }, // .@"-ERR" => |s| { // _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); // }, // else => |m| { // std.debug.panic("unimplemented write: {any}\n", .{m}); // }, // } // recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; // }, // } // try self.to_client.flush(); // } } pub fn send(self: *Client, io: std.Io, msg: []const u8) !void { try self.recv_queue.putAll(io, msg); } test send { const io = std.testing.io; const gpa = std.testing.allocator; var to_client: std.Io.Writer = .fixed(blk: { var buf: [1024]u8 = undefined; break :blk &buf; }); var recv_queue: Queue(u8) = .init(&.{}); var client: Client = .init(null, &recv_queue, undefined, &to_client); defer client.deinit(gpa); var c_task = try io.concurrent(Client.start, .{ &client, io }); defer c_task.cancel(io) catch {}; { try client.send(io, "PONG\r\n"); // Wait for the concurrent client task to write to the writer try io.sleep(.fromMilliseconds(1), .awake); try std.testing.expectEqualSlices(u8, "PONG\r\n", to_client.buffered()); } to_client.end = 0; { const payload = "payload"; const msg: Message.Msg = .{ .sid = "1", .subject = "subject", .reply_to = "reply", .payload = .{ .len = payload.len, .short = blk: { var buf: [128]u8 = undefined; @memcpy(buf[0..payload.len], payload); break :blk buf; }, .long = null, }, }; try client.send(io, .{ // msg must be owned by the allocator the client uses .MSG = try msg.dupe(gpa), }); try io.sleep(.fromMilliseconds(1), .awake); try std.testing.expectEqualSlices(u8, "MSG subject 1 reply 7\r\npayload\r\n", to_client.buffered()); } } pub fn next(self: *Client) !message.Control { return parse.control(self.from_client); } test next { const gpa = std.testing.allocator; var from_client: std.Io.Reader = .fixed( "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_r" ++ "equired\":false,\"name\":\"NATS CLI Version v0.2." ++ "4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"prot" ++ "ocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n" ++ "PING\r\n", ); var client: Client = .init(null, undefined, undefined, undefined, &from_client, undefined); { // Simulate stream { const msg = try client.next(gpa); try std.testing.expectEqual(.CONNECT, std.meta.activeTag(msg)); defer msg.CONNECT.deinit(gpa); try std.testing.expectEqualDeep(Message{ .CONNECT = .{ .verbose = false, .pedantic = false, .tls_required = false, .name = "NATS CLI Version v0.2.4", .lang = "go", .version = "1.43.0", .protocol = 1, .echo = true, .headers = true, .no_responders = true, }, }, msg); } { const msg = try client.next(gpa); try std.testing.expectEqual(.PING, std.meta.activeTag(msg)); } } }