const Message = @import("message_parser.zig").Message; const std = @import("std"); const Queue = std.Io.Queue; const Client = @This(); pub const Msgs = union(enum) { MSG: Message.Msg, HMSG: Message.HMsg, }; connect: ?Message.Connect, // Used to own messages that we receive in our queues. alloc: std.mem.Allocator, // Messages for this client to receive. recv_queue: *Queue(Message), msg_queue: *Queue(Msgs), from_client: *std.Io.Reader, to_client: *std.Io.Writer, pub fn init( connect: ?Message.Connect, alloc: std.mem.Allocator, recv_queue: *Queue(Message), msg_queue: *Queue(Msgs), in: *std.Io.Reader, out: *std.Io.Writer, ) Client { return .{ .connect = connect, .alloc = alloc, .recv_queue = recv_queue, .msg_queue = msg_queue, .from_client = in, .to_client = out, }; } pub fn deinit(self: *Client, alloc: std.mem.Allocator) void { if (self.connect) |c| { c.deinit(alloc); } self.* = undefined; } pub fn start(self: *Client, io: std.Io) !void { var msgs_buf: [1024]Msgs = undefined; var recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch @panic("Concurrency unavailable"); errdefer _ = recv_msgs_task.cancel(io) catch {}; var recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; errdefer _ = recv_proto_task.cancel(io) catch {}; while (true) { switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) { .msgs => |len_err| { @branchHint(.likely); const msgs = msgs_buf[0..try len_err]; for (0..msgs.len) |i| { const msg = msgs[i]; defer switch (msg) { .MSG => |m| m.deinit(self.alloc), .HMSG => |h| h.deinit(self.alloc), }; errdefer for (msgs[i + 1 ..]) |mg| switch (mg) { .MSG => |m| { m.deinit(self.alloc); }, .HMSG => |h| { h.deinit(self.alloc); }, }; switch (msg) { .MSG => |m| { try self.to_client.print( "MSG {s} {s} {s} {d}\r\n", .{ m.subject, m.sid, m.reply_to orelse "", m.payload.len, }, ); try m.payload.write(self.to_client); try self.to_client.print("\r\n", .{}); }, .HMSG => |hmsg| { try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{ hmsg.msg.subject, hmsg.msg.sid, hmsg.msg.reply_to orelse "", hmsg.header_bytes, hmsg.msg.payload.len, }); try hmsg.msg.payload.write(self.to_client); try self.to_client.print("\r\n", .{}); }, } } recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable; }, .proto => |msg_err| { @branchHint(.unlikely); const msg = try msg_err; switch (msg) { .@"+OK" => { _ = try self.to_client.write("+OK\r\n"); }, .PONG => { _ = try self.to_client.write("PONG\r\n"); }, .INFO => |info| { _ = try self.to_client.write("INFO "); try std.json.Stringify.value(info, .{}, self.to_client); _ = try self.to_client.write("\r\n"); }, .@"-ERR" => |s| { _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); }, else => |m| { std.debug.panic("unimplemented write: {any}\n", .{m}); }, } recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; }, } try self.to_client.flush(); } } pub fn send(self: *Client, io: std.Io, msg: Message) !void { switch (msg) { .MSG => |m| try self.msg_queue.putOne(io, .{ .MSG = m }), .HMSG => |m| try self.msg_queue.putOne(io, .{ .HMSG = m }), else => try self.recv_queue.putOne(io, msg), } } test send { const io = std.testing.io; const gpa = std.testing.allocator; var to_client: std.Io.Writer = .fixed(blk: { var buf: [1024]u8 = undefined; break :blk &buf; }); var recv_queue: Queue(Message) = .init(&.{}); var msgs_queue: Queue(Msgs) = .init(blk: { var buf: [1]Msgs = undefined; break :blk &buf; }); var client: Client = .init(null, gpa, &recv_queue, &msgs_queue, undefined, &to_client); defer client.deinit(gpa); var c_task = try io.concurrent(Client.start, .{ &client, io }); defer c_task.cancel(io) catch {}; { try client.send(io, .PONG); // Wait for the concurrent client task to write to the writer try io.sleep(.fromMilliseconds(1), .awake); try std.testing.expectEqualSlices(u8, "PONG\r\n", to_client.buffered()); } to_client.end = 0; { const payload = "payload"; const msg: Message.Msg = .{ .sid = "1", .subject = "subject", .reply_to = "reply", .payload = .{ .len = payload.len, .short = blk: { var buf: [128]u8 = undefined; @memcpy(buf[0..payload.len], payload); break :blk buf; }, .long = null, }, }; try client.send(io, .{ // msg must be owned by the allocator the client uses .MSG = try msg.dupe(gpa), }); try io.sleep(.fromMilliseconds(1), .awake); try std.testing.expectEqualSlices(u8, "MSG subject 1 reply 7\r\npayload\r\n", to_client.buffered()); } } pub fn next(self: *Client, allocator: std.mem.Allocator) !Message { return Message.next(allocator, self.from_client); } test next { const gpa = std.testing.allocator; var from_client: std.Io.Reader = .fixed( "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_r" ++ "equired\":false,\"name\":\"NATS CLI Version v0.2." ++ "4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"prot" ++ "ocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n" ++ "PING\r\n", ); var client: Client = .init(null, undefined, undefined, undefined, &from_client, undefined); { // Simulate stream { const msg = try client.next(gpa); try std.testing.expectEqual(.CONNECT, std.meta.activeTag(msg)); defer msg.CONNECT.deinit(gpa); try std.testing.expectEqualDeep(Message{ .CONNECT = .{ .verbose = false, .pedantic = false, .tls_required = false, .name = "NATS CLI Version v0.2.4", .lang = "go", .version = "1.43.0", .protocol = 1, .echo = true, .headers = true, .no_responders = true, }, }, msg); } { const msg = try client.next(gpa); try std.testing.expectEqual(.PING, std.meta.activeTag(msg)); } } }