summaryrefslogtreecommitdiff
path: root/src/Server
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-06 18:45:17 -0500
committerRobby Zambito <contact@robbyzambito.me>2026-01-06 20:43:49 -0500
commitb87412ee66197d4c89f1fbf93b32fe63ed1c63ab (patch)
tree00613d0d3f7178d0c5b974ce04a752443e9a816e /src/Server
parent025a5344c8c922a8f46c4ee0e73a00ce0c3c4790 (diff)
Restructuring
Add a bunch of tests for the client
Diffstat (limited to 'src/Server')
-rw-r--r--src/Server/Client.zig240
-rw-r--r--src/Server/message_parser.zig1141
2 files changed, 1381 insertions, 0 deletions
diff --git a/src/Server/Client.zig b/src/Server/Client.zig
new file mode 100644
index 0000000..dff3534
--- /dev/null
+++ b/src/Server/Client.zig
@@ -0,0 +1,240 @@
+const Message = @import("message_parser.zig").Message;
+const std = @import("std");
+const Queue = std.Io.Queue;
+
+const Client = @This();
+
+pub const Msgs = union(enum) {
+ MSG: Message.Msg,
+ HMSG: Message.HMsg,
+};
+
+connect: ?Message.Connect,
+// Used to own messages that we receive in our queues.
+alloc: std.mem.Allocator,
+
+// Messages for this client to receive.
+recv_queue: *Queue(Message),
+msg_queue: *Queue(Msgs),
+
+from_client: *std.Io.Reader,
+to_client: *std.Io.Writer,
+
+pub fn init(
+ connect: ?Message.Connect,
+ alloc: std.mem.Allocator,
+ recv_queue: *Queue(Message),
+ msg_queue: *Queue(Msgs),
+ in: *std.Io.Reader,
+ out: *std.Io.Writer,
+) Client {
+ return .{
+ .connect = connect,
+ .alloc = alloc,
+ .recv_queue = recv_queue,
+ .msg_queue = msg_queue,
+ .from_client = in,
+ .to_client = out,
+ };
+}
+
+pub fn deinit(self: *Client, alloc: std.mem.Allocator) void {
+ if (self.connect) |c| {
+ c.deinit(alloc);
+ }
+ self.* = undefined;
+}
+
+pub fn start(self: *Client, io: std.Io) !void {
+ var msgs_buf: [1024]Msgs = undefined;
+
+ var recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch @panic("Concurrency unavailable");
+ errdefer _ = recv_msgs_task.cancel(io) catch {};
+
+ var recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable;
+ errdefer _ = recv_proto_task.cancel(io) catch {};
+
+ while (true) {
+ switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) {
+ .msgs => |len_err| {
+ @branchHint(.likely);
+ const msgs = msgs_buf[0..try len_err];
+ for (0..msgs.len) |i| {
+ const msg = msgs[i];
+ defer switch (msg) {
+ .MSG => |m| m.deinit(self.alloc),
+ .HMSG => |h| h.deinit(self.alloc),
+ };
+ errdefer for (msgs[i + 1 ..]) |mg| switch (mg) {
+ .MSG => |m| {
+ m.deinit(self.alloc);
+ },
+ .HMSG => |h| {
+ h.deinit(self.alloc);
+ },
+ };
+ switch (msg) {
+ .MSG => |m| {
+ try self.to_client.print(
+ "MSG {s} {s} {s} {d}\r\n",
+ .{
+ m.subject,
+ m.sid,
+ m.reply_to orelse "",
+ m.payload.len,
+ },
+ );
+ try m.payload.write(self.to_client);
+ try self.to_client.print("\r\n", .{});
+ },
+ .HMSG => |hmsg| {
+ try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{
+ hmsg.msg.subject,
+ hmsg.msg.sid,
+ hmsg.msg.reply_to orelse "",
+ hmsg.header_bytes,
+ hmsg.msg.payload.len,
+ });
+ try hmsg.msg.payload.write(self.to_client);
+ try self.to_client.print("\r\n", .{});
+ },
+ }
+ }
+ recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable;
+ },
+ .proto => |msg_err| {
+ @branchHint(.unlikely);
+ const msg = try msg_err;
+ switch (msg) {
+ .@"+OK" => {
+ _ = try self.to_client.write("+OK\r\n");
+ },
+ .PONG => {
+ _ = try self.to_client.write("PONG\r\n");
+ },
+ .INFO => |info| {
+ _ = try self.to_client.write("INFO ");
+ try std.json.Stringify.value(info, .{}, self.to_client);
+ _ = try self.to_client.write("\r\n");
+ },
+ .@"-ERR" => |s| {
+ _ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
+ },
+ else => |m| {
+ std.debug.panic("unimplemented write: {any}\n", .{m});
+ },
+ }
+ recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable;
+ },
+ }
+ try self.to_client.flush();
+ }
+}
+
+pub fn send(self: *Client, io: std.Io, msg: Message) !void {
+ switch (msg) {
+ .MSG => |m| try self.msg_queue.putOne(io, .{ .MSG = m }),
+ .HMSG => |m| try self.msg_queue.putOne(io, .{ .HMSG = m }),
+ else => try self.recv_queue.putOne(io, msg),
+ }
+}
+
+test send {
+ const io = std.testing.io;
+ const gpa = std.testing.allocator;
+
+ var to_client: std.Io.Writer = .fixed(blk: {
+ var buf: [1024]u8 = undefined;
+ break :blk &buf;
+ });
+ var recv_queue: Queue(Message) = .init(&.{});
+ var msgs_queue: Queue(Msgs) = .init(blk: {
+ var buf: [1]Msgs = undefined;
+ break :blk &buf;
+ });
+ var client: Client = .init(null, gpa, &recv_queue, &msgs_queue, undefined, &to_client);
+ defer client.deinit(gpa);
+
+ var c_task = try io.concurrent(Client.start, .{ &client, io });
+ defer c_task.cancel(io) catch {};
+
+ {
+ try client.send(io, .PONG);
+ // Wait for the concurrent client task to write to the writer
+ try io.sleep(.fromMilliseconds(1), .awake);
+ try std.testing.expectEqualSlices(u8, "PONG\r\n", to_client.buffered());
+ }
+
+ to_client.end = 0;
+
+ {
+ const payload = "payload";
+ const msg: Message.Msg = .{
+ .sid = "1",
+ .subject = "subject",
+ .reply_to = "reply",
+ .payload = .{
+ .len = payload.len,
+ .short = blk: {
+ var buf: [128]u8 = undefined;
+ @memcpy(buf[0..payload.len], payload);
+ break :blk buf;
+ },
+ .long = null,
+ },
+ };
+ try client.send(io, .{
+ // msg must be owned by the allocator the client uses
+ .MSG = try msg.dupe(gpa),
+ });
+ try io.sleep(.fromMilliseconds(1), .awake);
+ try std.testing.expectEqualSlices(u8, "MSG subject 1 reply 7\r\npayload\r\n", to_client.buffered());
+ }
+}
+
+pub fn next(self: *Client, allocator: std.mem.Allocator) !Message {
+ return Message.next(allocator, self.from_client);
+}
+
+test next {
+ const gpa = std.testing.allocator;
+
+ var from_client: std.Io.Reader = .fixed(
+ "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_r" ++
+ "equired\":false,\"name\":\"NATS CLI Version v0.2." ++
+ "4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"prot" ++
+ "ocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n" ++
+ "PING\r\n",
+ );
+
+ var client: Client = .init(null, undefined, undefined, undefined, &from_client, undefined);
+
+ {
+ // Simulate stream
+
+ {
+ const msg = try client.next(gpa);
+ try std.testing.expectEqual(.CONNECT, std.meta.activeTag(msg));
+ defer msg.CONNECT.deinit(gpa);
+ try std.testing.expectEqualDeep(Message{
+ .CONNECT = .{
+ .verbose = false,
+ .pedantic = false,
+ .tls_required = false,
+ .name = "NATS CLI Version v0.2.4",
+ .lang = "go",
+ .version = "1.43.0",
+ .protocol = 1,
+ .echo = true,
+ .headers = true,
+ .no_responders = true,
+ },
+ }, msg);
+ }
+
+ {
+ const msg = try client.next(gpa);
+ try std.testing.expectEqual(.PING, std.meta.activeTag(msg));
+ }
+ }
+}
diff --git a/src/Server/message_parser.zig b/src/Server/message_parser.zig
new file mode 100644
index 0000000..fd1b5b1
--- /dev/null
+++ b/src/Server/message_parser.zig
@@ -0,0 +1,1141 @@
+const std = @import("std");
+const Allocator = std.mem.Allocator;
+const ArenaAllocator = std.heap.ArenaAllocator;
+const ArrayList = std.ArrayList;
+const StaticStringMap = std.StaticStringMap;
+
+const Io = std.Io;
+const Writer = Io.Writer;
+const AllocatingWriter = Writer.Allocating;
+const Reader = Io.Reader;
+
+const ascii = std.ascii;
+const isDigit = std.ascii.isDigit;
+const isUpper = std.ascii.isUpper;
+const isWhitespace = std.ascii.isWhitespace;
+
+const parseUnsigned = std.fmt.parseUnsigned;
+
+const log = std.log;
+
+pub const Payload = struct {
+ len: u32,
+ short: [128]u8,
+ long: ?[]u8,
+
+ pub fn read(alloc: Allocator, in: *Reader, bytes: usize) !Payload {
+ var res: Payload = .{
+ .len = @intCast(bytes),
+ .short = undefined,
+ .long = null,
+ };
+
+ try in.readSliceAll(res.short[0..@min(bytes, res.short.len)]);
+ if (bytes > res.short.len) {
+ const long = try alloc.alloc(u8, bytes - res.short.len);
+ errdefer alloc.free(long);
+ try in.readSliceAll(long);
+ res.long = long;
+ }
+ return res;
+ }
+
+ pub fn write(self: Payload, out: *Writer) !void {
+ std.debug.assert(out.buffer.len >= self.short.len);
+ std.debug.assert(self.len <= self.short.len or self.long != null);
+ try out.writeAll(self.short[0..@min(self.len, self.short.len)]);
+ if (self.long) |l| {
+ try out.writeAll(l);
+ }
+ }
+
+ pub fn deinit(self: Payload, alloc: Allocator) void {
+ if (self.long) |l| {
+ alloc.free(l);
+ }
+ }
+
+ pub fn dupe(self: Payload, alloc: Allocator) !Payload {
+ var res = self;
+ if (self.long) |l| {
+ res.long = try alloc.dupe(u8, l);
+ }
+ errdefer if (res.long) |l| alloc.free(l);
+ return res;
+ }
+};
+
+pub const MessageType = @typeInfo(Message).@"union".tag_type.?;
+
+pub const Message = union(enum) {
+ INFO: ServerInfo,
+ CONNECT: Connect,
+ PUB: Pub,
+ HPUB: HPub,
+ SUB: Sub,
+ UNSUB: Unsub,
+ MSG: Msg,
+ HMSG: HMsg,
+ PING,
+ PONG,
+ @"+OK": void,
+ @"-ERR": []const u8,
+ pub const ServerInfo = struct {
+ /// The unique identifier of the NATS server.
+ server_id: []const u8,
+ /// The name of the NATS server.
+ server_name: []const u8,
+ /// The version of NATS.
+ version: []const u8,
+ /// The version of golang the NATS server was built with.
+ go: []const u8 = "0.0.0",
+ /// The IP address used to start the NATS server,
+ /// by default this will be 0.0.0.0 and can be
+ /// configured with -client_advertise host:port.
+ host: []const u8 = "0.0.0.0",
+ /// The port number the NATS server is configured
+ /// to listen on.
+ port: u16 = 4222,
+ /// Whether the server supports headers.
+ headers: bool = false,
+ /// Maximum payload size, in bytes, that the server
+ /// will accept from the client.
+ max_payload: u64,
+ /// An integer indicating the protocol version of
+ /// the server. The server version 1.2.0 sets this
+ /// to 1 to indicate that it supports the "Echo"
+ /// feature.
+ proto: u32 = 1,
+ };
+ pub const Connect = struct {
+ verbose: bool = false,
+ pedantic: bool = false,
+ tls_required: bool = false,
+ auth_token: ?[]const u8 = null,
+ user: ?[]const u8 = null,
+ pass: ?[]const u8 = null,
+ name: ?[]const u8 = null,
+ lang: []const u8,
+ version: []const u8,
+ protocol: u32,
+ echo: ?bool = null,
+ sig: ?[]const u8 = null,
+ jwt: ?[]const u8 = null,
+ no_responders: ?bool = null,
+ headers: ?bool = null,
+ nkey: ?[]const u8 = null,
+
+ pub fn deinit(self: Connect, alloc: Allocator) void {
+ if (self.auth_token) |a| alloc.free(a);
+ if (self.user) |u| alloc.free(u);
+ if (self.pass) |p| alloc.free(p);
+ if (self.name) |n| alloc.free(n);
+ alloc.free(self.lang);
+ alloc.free(self.version);
+ if (self.sig) |s| alloc.free(s);
+ if (self.jwt) |j| alloc.free(j);
+ if (self.nkey) |n| alloc.free(n);
+ }
+
+ pub fn dupe(self: Connect, alloc: Allocator) !Connect {
+ var res = self;
+ res.auth_token = if (self.auth_token) |a| try alloc.dupe(u8, a) else null;
+ errdefer if (res.auth_token) |a| alloc.free(a);
+ res.user = if (self.user) |u| try alloc.dupe(u8, u) else null;
+ errdefer if (res.user) |u| alloc.free(u);
+ res.pass = if (self.pass) |p| try alloc.dupe(u8, p) else null;
+ errdefer if (res.pass) |p| alloc.free(p);
+ res.name = if (self.name) |n| try alloc.dupe(u8, n) else null;
+ errdefer if (res.name) |n| alloc.free(n);
+ res.lang = try alloc.dupe(u8, self.lang);
+ errdefer alloc.free(res.lang);
+ res.version = try alloc.dupe(u8, self.version);
+ errdefer alloc.free(res.version);
+ res.sig = if (self.sig) |s| try alloc.dupe(u8, s) else null;
+ errdefer if (res.sig) |s| alloc.free(s);
+ res.jwt = if (self.jwt) |j| try alloc.dupe(u8, j) else null;
+ errdefer if (res.jwt) |j| alloc.free(j);
+ res.nkey = if (self.nkey) |n| try alloc.dupe(u8, n) else null;
+ errdefer if (res.nkey) |n| alloc.free(n);
+ return res;
+ }
+ };
+ pub const Pub = struct {
+ /// The destination subject to publish to.
+ subject: []const u8,
+ /// The reply subject that subscribers can use to send a response back to the publisher/requestor.
+ reply_to: ?[]const u8 = null,
+ /// The message payload data.
+ payload: Payload,
+
+ pub fn deinit(self: Pub, alloc: Allocator) void {
+ alloc.free(self.subject);
+ self.payload.deinit(alloc);
+ if (self.reply_to) |r| alloc.free(r);
+ }
+
+ pub fn toMsg(self: Pub, alloc: Allocator, sid: []const u8) !Msg {
+ const res: Msg = .{
+ .subject = self.subject,
+ .sid = sid,
+ .reply_to = self.reply_to,
+ .payload = self.payload,
+ };
+ return res.dupe(alloc);
+ }
+ };
+ pub const HPub = struct {
+ header_bytes: usize,
+ @"pub": Pub,
+
+ pub fn deinit(self: HPub, alloc: Allocator) void {
+ self.@"pub".deinit(alloc);
+ }
+
+ pub fn toHMsg(self: HPub, alloc: Allocator, sid: []const u8) !HMsg {
+ return .{
+ .header_bytes = self.header_bytes,
+ .msg = try self.@"pub".toMsg(alloc, sid),
+ };
+ }
+ };
+
+ pub const HMsg = struct {
+ header_bytes: usize,
+ msg: Msg,
+
+ pub fn deinit(self: HMsg, alloc: Allocator) void {
+ self.msg.deinit(alloc);
+ }
+
+ pub fn dupe(self: HMsg, alloc: Allocator) !HMsg {
+ var res = self;
+ res.msg = try self.msg.dupe(alloc);
+ errdefer alloc.free(res.msg);
+ return res;
+ }
+ };
+ pub const Sub = struct {
+ /// The subject name to subscribe to.
+ subject: []const u8,
+ /// If specified, the subscriber will join this queue group.
+ queue_group: ?[]const u8,
+ /// A unique alphanumeric subscription ID, generated by the client.
+ sid: []const u8,
+
+ pub fn deinit(self: Sub, alloc: Allocator) void {
+ alloc.free(self.subject);
+ alloc.free(self.sid);
+ if (self.queue_group) |q| alloc.free(q);
+ }
+ };
+ pub const Unsub = struct {
+ /// The unique alphanumeric subscription ID of the subject to unsubscribe from.
+ sid: []const u8,
+ /// A number of messages to wait for before automatically unsubscribing.
+ max_msgs: ?usize = null,
+
+ pub fn deinit(self: Unsub, alloc: Allocator) void {
+ alloc.free(self.sid);
+ }
+ };
+ pub const Msg = struct {
+ subject: []const u8,
+ sid: []const u8,
+ reply_to: ?[]const u8,
+ payload: Payload,
+
+ pub fn deinit(self: Msg, alloc: Allocator) void {
+ alloc.free(self.subject);
+ alloc.free(self.sid);
+ if (self.reply_to) |r| alloc.free(r);
+ self.payload.deinit(alloc);
+ }
+
+ pub fn dupe(self: Msg, alloc: Allocator) !Msg {
+ var res: Msg = undefined;
+ res.subject = try alloc.dupe(u8, self.subject);
+ errdefer alloc.free(res.subject);
+ res.sid = try alloc.dupe(u8, self.sid);
+ errdefer alloc.free(res.sid);
+ res.reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null;
+ errdefer if (res.reply_to) |r| alloc.free(r);
+ res.payload = try self.payload.dupe(alloc);
+ errdefer alloc.free(res.payload);
+ return res;
+ }
+ };
+
+ const client_types = StaticStringMap(MessageType).initComptime(
+ .{
+ // {"INFO", .info},
+ .{ @tagName(.CONNECT), .CONNECT },
+ .{ @tagName(.PUB), .PUB },
+ .{ @tagName(.HPUB), .HPUB },
+ .{ @tagName(.SUB), .SUB },
+ .{ @tagName(.UNSUB), .UNSUB },
+ // {"MSG", .msg},
+ // {"HMSG", .hmsg},
+ .{ @tagName(.PING), .PING },
+ .{ @tagName(.PONG), .PONG },
+ // {"+OK", .@"+ok"},
+ // {"-ERR", .@"-err"},
+ },
+ );
+ fn parseStaticStringMap(input: []const u8) ?MessageType {
+ return client_types.get(input);
+ }
+
+ pub const parse = parseStaticStringMap;
+
+ /// An error should be handled by cleaning up this connection.
+ pub fn next(alloc: Allocator, in: *Reader) !Message {
+ var operation_string: ArrayList(u8) = blk: {
+ comptime var buf_len = 0;
+ comptime {
+ for (client_types.keys()) |key| {
+ buf_len = @max(buf_len, key.len);
+ }
+ }
+ var buf: [buf_len]u8 = undefined;
+ break :blk .initBuffer(&buf);
+ };
+
+ while (in.peekByte()) |byte| {
+ if (isUpper(byte)) {
+ try operation_string.appendBounded(byte);
+ in.toss(1);
+ } else break;
+ } else |err| return err;
+
+ const operation = parse(operation_string.items) orelse {
+ log.err("Invalid operation: '{s}'", .{operation_string.items});
+ return error.InvalidOperation;
+ };
+
+ errdefer log.err("Failed to parse {s}", .{operation_string.items});
+
+ switch (operation) {
+ .CONNECT => {
+ return parseConnect(alloc, in);
+ },
+ .PUB => {
+ @branchHint(.likely);
+ return parsePub(alloc, in);
+ },
+ .HPUB => {
+ @branchHint(.likely);
+ return parseHPub(alloc, in);
+ },
+ .PING => {
+ try expectStreamBytes(in, "\r\n");
+ return .PING;
+ },
+ .PONG => {
+ try expectStreamBytes(in, "\r\n");
+ return .PONG;
+ },
+ .SUB => {
+ return parseSub(alloc, in);
+ },
+ .UNSUB => {
+ return parseUnsub(alloc, in);
+ },
+ else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}),
+ }
+ }
+};
+
+fn parseConnect(alloc: Allocator, in: *Reader) !Message {
+ // for storing the json string
+ var connect_string_writer_allocating: AllocatingWriter = .init(alloc);
+ defer connect_string_writer_allocating.deinit();
+ var connect_string_writer = &connect_string_writer_allocating.writer;
+
+ // for parsing the json string
+ var connect_arena_allocator: ArenaAllocator = .init(alloc);
+ defer connect_arena_allocator.deinit();
+ const connect_allocator = connect_arena_allocator.allocator();
+
+ try in.discardAll(1); // throw away space
+
+ // Should read the next JSON object to the fixed buffer writer.
+ _ = try in.streamDelimiter(connect_string_writer, '}');
+ try connect_string_writer.writeByte('}');
+ try expectStreamBytes(in, "}\r\n"); // discard '}\r\n'
+
+ const connect_str = try connect_string_writer_allocating.toOwnedSlice();
+ defer alloc.free(connect_str);
+ // TODO: should be CONNECTION allocator
+ const res = try std.json.parseFromSliceLeaky(
+ Message.Connect,
+ connect_allocator,
+ connect_str,
+ .{ .allocate = .alloc_always },
+ );
+
+ return .{ .CONNECT = try res.dupe(alloc) };
+}
+
+fn parseSub(alloc: Allocator, in: *Reader) !Message {
+ try in.discardAll(1); // throw away space
+ const subject = try readSubject(alloc, in, .sub);
+
+ const States = enum {
+ before_second,
+ in_second,
+ after_second,
+ in_third,
+ in_end,
+ };
+
+ var second: ArrayList(u8) = .empty;
+ errdefer second.deinit(alloc);
+ var third: ?ArrayList(u8) = null;
+ errdefer if (third) |*t| t.deinit(alloc);
+
+ sw: switch (@as(States, .before_second)) {
+ .before_second => {
+ const byte = try in.peekByte();
+ if (isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .before_second;
+ }
+ continue :sw .in_second;
+ },
+ .in_second => {
+ for (1..in.buffer.len) |i| {
+ try in.fill(i + 1);
+ if (isWhitespace(in.buffered()[i])) {
+ @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]);
+ in.toss(i);
+ break;
+ }
+ } else return error.EndOfStream;
+ continue :sw .after_second;
+ },
+ .after_second => {
+ const byte = try in.peekByte();
+ if (byte == '\r') {
+ continue :sw .in_end;
+ } else if (isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .after_second;
+ }
+ third = .empty;
+ continue :sw .in_third;
+ },
+ .in_third => {
+ for (1..in.buffer.len) |i| {
+ try in.fill(i + 1);
+ if (isWhitespace(in.buffered()[i])) {
+ @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]);
+ in.toss(i);
+ break;
+ }
+ } else return error.EndOfStream;
+ continue :sw .in_end;
+ },
+ .in_end => {
+ try expectStreamBytes(in, "\r\n");
+ },
+ }
+
+ return .{
+ .SUB = .{
+ .subject = subject,
+ .queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null,
+ .sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc),
+ },
+ };
+}
+
+test parseSub {
+ const alloc = std.testing.allocator;
+ const expectEqualDeep = std.testing.expectEqualDeep;
+ {
+ var in: Reader = .fixed(" foo 1\r\n");
+ var res = try parseSub(alloc, &in);
+ defer res.SUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .SUB = .{
+ .subject = "foo",
+ .queue_group = null,
+ .sid = "1",
+ },
+ },
+ res,
+ );
+ }
+ {
+ var in: Reader = .fixed(" foo 1\r\n");
+ var res = try parseSub(alloc, &in);
+ defer res.SUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .SUB = .{
+ .subject = "foo",
+ .queue_group = null,
+ .sid = "1",
+ },
+ },
+ res,
+ );
+ }
+ {
+ var in: Reader = .fixed(" foo q 1\r\n");
+ var res = try parseSub(alloc, &in);
+ defer res.SUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .SUB = .{
+ .subject = "foo",
+ .queue_group = "q",
+ .sid = "1",
+ },
+ },
+ res,
+ );
+ }
+ {
+ var in: Reader = .fixed(" 1 q 1\r\n");
+ var res = try parseSub(alloc, &in);
+ defer res.SUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .SUB = .{
+ .subject = "1",
+ .queue_group = "q",
+ .sid = "1",
+ },
+ },
+ res,
+ );
+ }
+ {
+ var in: Reader = .fixed(" $SRV.PING 4\r\n");
+ var res = try parseSub(alloc, &in);
+ defer res.SUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .SUB = .{
+ .subject = "$SRV.PING",
+ .queue_group = null,
+ .sid = "4",
+ },
+ },
+ res,
+ );
+ }
+ {
+ var in: Reader = .fixed(" foo.echo q 10\r\n");
+ var res = try parseSub(alloc, &in);
+ defer res.SUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .SUB = .{
+ .subject = "foo.echo",
+ .queue_group = "q",
+ .sid = "10",
+ },
+ },
+ res,
+ );
+ }
+}
+
+fn parseUnsub(alloc: Allocator, in: *Reader) !Message {
+ const States = enum {
+ before_first,
+ in_first,
+ after_first,
+ in_second,
+ in_end,
+ };
+
+ var first: ArrayList(u8) = .empty;
+ errdefer first.deinit(alloc);
+ var second: ?ArrayList(u8) = null;
+ defer if (second) |*s| s.deinit(alloc);
+
+ sw: switch (@as(States, .before_first)) {
+ .before_first => {
+ const byte = try in.peekByte();
+ if (isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .before_first;
+ }
+ continue :sw .in_first;
+ },
+ .in_first => {
+ const byte = try in.peekByte();
+ if (!isWhitespace(byte)) {
+ try first.append(alloc, byte);
+ in.toss(1);
+ continue :sw .in_first;
+ }
+ continue :sw .after_first;
+ },
+ .after_first => {
+ const byte = try in.peekByte();
+ if (byte == '\r') {
+ continue :sw .in_end;
+ } else if (isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .after_first;
+ }
+ second = .empty;
+ continue :sw .in_second;
+ },
+ .in_second => {
+ const byte = try in.peekByte();
+ if (byte == '\r') {
+ continue :sw .in_end;
+ }
+ try second.?.append(alloc, byte);
+ in.toss(1);
+ continue :sw .in_second;
+ },
+ .in_end => {
+ try expectStreamBytes(in, "\r\n");
+ },
+ }
+
+ return .{
+ .UNSUB = .{
+ .sid = try first.toOwnedSlice(alloc),
+ .max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null,
+ },
+ };
+}
+
+test parseUnsub {
+ const alloc = std.testing.allocator;
+ const expectEqualDeep = std.testing.expectEqualDeep;
+ const expectEqual = std.testing.expectEqual;
+ {
+ var in: Reader = .fixed(" 1\r\n");
+ var res = try parseUnsub(alloc, &in);
+ defer res.UNSUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .UNSUB = .{
+ .sid = "1",
+ .max_msgs = null,
+ },
+ },
+ res,
+ );
+ try expectEqual(0, in.buffered().len);
+ }
+
+ {
+ var in: Reader = .fixed(" 1 1\r\n");
+ var res = try parseUnsub(alloc, &in);
+ defer res.UNSUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .UNSUB = .{
+ .sid = "1",
+ .max_msgs = 1,
+ },
+ },
+ res,
+ );
+ try expectEqual(0, in.buffered().len);
+ }
+}
+
+fn parsePub(alloc: Allocator, in: *Reader) !Message {
+ try in.discardAll(1); // throw away space
+
+ // Parse subject
+ const subject: []const u8 = try readSubject(alloc, in, .@"pub");
+ errdefer alloc.free(subject);
+
+ const States = enum {
+ before_second,
+ in_second,
+ after_second,
+ in_third,
+ in_end,
+ };
+
+ var second: ArrayList(u8) = .empty;
+ defer second.deinit(alloc);
+ var third: ?ArrayList(u8) = null;
+ defer if (third) |*t| t.deinit(alloc);
+
+ sw: switch (@as(States, .before_second)) {
+ .before_second => {
+ // Drop whitespace
+ const byte = try in.peekByte();
+ if (isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .before_second;
+ }
+ continue :sw .in_second;
+ },
+ .in_second => {
+ for (1..in.buffer.len) |i| {
+ try in.fill(i + 1);
+ if (isWhitespace(in.buffered()[i])) {
+ @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]);
+ in.toss(i);
+ break;
+ }
+ } else return error.EndOfStream;
+ continue :sw .after_second;
+ },
+ .after_second => {
+ const byte = try in.peekByte();
+ if (byte == '\r') {
+ continue :sw .in_end;
+ } else if (isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .after_second;
+ }
+ third = .empty;
+ continue :sw .in_third;
+ },
+ .in_third => {
+ for (1..in.buffer.len) |i| {
+ try in.fill(i + 1);
+ if (isWhitespace(in.buffered()[i])) {
+ @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]);
+ in.toss(i);
+ break;
+ }
+ } else return error.EndOfStream;
+ continue :sw .in_end;
+ },
+ .in_end => {
+ try expectStreamBytes(in, "\r\n");
+ },
+ }
+
+ const reply_to: ?[]const u8, const bytes: usize =
+ if (third) |t| .{
+ try alloc.dupe(u8, second.items),
+ try parseUnsigned(usize, t.items, 10),
+ } else .{
+ null,
+ try parseUnsigned(usize, second.items, 10),
+ };
+
+ const payload: Payload = try .read(alloc, in, bytes);
+ errdefer payload.deinit(alloc);
+ try expectStreamBytes(in, "\r\n");
+
+ return .{
+ .PUB = .{
+ .subject = subject,
+ .payload = payload,
+ .reply_to = reply_to,
+ },
+ };
+}
+
+test parsePub {
+ const alloc = std.testing.allocator;
+ const expectEqualDeep = std.testing.expectEqualDeep;
+ const expectEqual = std.testing.expectEqual;
+ {
+ var in: Reader = .fixed(" foo 3\r\nbar\r\n");
+ var res = try parsePub(alloc, &in);
+ defer res.PUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .PUB = .{
+ .subject = "foo",
+ .reply_to = null,
+ .payload = .{
+ .len = 3,
+ .short = blk: {
+ var s: [128]u8 = undefined;
+ @memcpy(s[0..3], "bar");
+ break :blk s;
+ },
+ .long = null,
+ },
+ },
+ },
+ res,
+ );
+ try expectEqual(0, in.buffered().len);
+ }
+
+ {
+ var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n");
+ var res = try parsePub(alloc, &in);
+ defer res.PUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .PUB = .{
+ .subject = "foo",
+ .reply_to = "reply.to",
+ .payload = .{
+ .len = 3,
+ .short = blk: {
+ var s: [128]u8 = undefined;
+ @memcpy(s[0..3], "bar");
+ break :blk s;
+ },
+ .long = null,
+ },
+ },
+ },
+ res,
+ );
+ try expectEqual(0, in.buffered().len);
+ }
+
+ // numeric reply subject
+ {
+ var in: Reader = .fixed(" foo 5 3\r\nbar\r\n");
+ var res = try parsePub(alloc, &in);
+ defer res.PUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .PUB = .{
+ .subject = "foo",
+ .reply_to = "5",
+ .payload = .{
+ .len = 3,
+ .short = blk: {
+ var s: [128]u8 = undefined;
+ @memcpy(s[0..3], "bar");
+ break :blk s;
+ },
+ .long = null,
+ },
+ },
+ },
+ res,
+ );
+ try expectEqual(0, in.buffered().len);
+ }
+}
+
+fn parseHPub(alloc: Allocator, in: *Reader) !Message {
+ try in.discardAll(1); // throw away space
+
+ // Parse subject
+ const subject: []const u8 = try readSubject(alloc, in, .@"pub");
+ errdefer alloc.free(subject);
+
+ const States = enum {
+ before_second,
+ in_second,
+ after_second,
+ in_third,
+ after_third,
+ in_fourth,
+ in_end,
+ };
+
+ var second: ArrayList(u8) = .empty;
+ defer second.deinit(alloc);
+ var third: ArrayList(u8) = .empty;
+ defer third.deinit(alloc);
+ var fourth: ?ArrayList(u8) = null;
+ defer if (fourth) |*f| f.deinit(alloc);
+
+ sw: switch (@as(States, .before_second)) {
+ .before_second => {
+ // Drop whitespace
+ const byte = try in.peekByte();
+ if (isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .before_second;
+ }
+ continue :sw .in_second;
+ },
+ .in_second => {
+ for (1..in.buffer.len) |i| {
+ try in.fill(i + 1);
+ if (isWhitespace(in.buffered()[i])) {
+ @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]);
+ in.toss(i);
+ break;
+ }
+ } else return error.EndOfStream;
+ continue :sw .after_second;
+ },
+ .after_second => {
+ const byte = try in.peekByte();
+ if (byte == '\r') {
+ continue :sw .in_end;
+ } else if (isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .after_second;
+ }
+ third = .empty;
+ continue :sw .in_third;
+ },
+ .in_third => {
+ for (1..in.buffer.len) |i| {
+ try in.fill(i + 1);
+ if (isWhitespace(in.buffered()[i])) {
+ @memcpy(try third.addManyAsSlice(alloc, i), in.buffered()[0..i]);
+ in.toss(i);
+ break;
+ }
+ } else return error.EndOfStream;
+ continue :sw .after_third;
+ },
+ .after_third => {
+ const byte = try in.peekByte();
+ if (byte == '\r') {
+ continue :sw .in_end;
+ } else if (isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .after_third;
+ }
+ fourth = .empty;
+ continue :sw .in_fourth;
+ },
+ .in_fourth => {
+ for (1..in.buffer.len) |i| {
+ try in.fill(i + 1);
+ if (isWhitespace(in.buffered()[i])) {
+ @memcpy(try fourth.?.addManyAsSlice(alloc, i), in.buffered()[0..i]);
+ in.toss(i);
+ break;
+ }
+ } else return error.EndOfStream;
+ continue :sw .in_end;
+ },
+ .in_end => {
+ try expectStreamBytes(in, "\r\n");
+ },
+ }
+
+ const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize =
+ if (fourth) |f| .{
+ try alloc.dupe(u8, second.items),
+ try parseUnsigned(usize, third.items, 10),
+ try parseUnsigned(usize, f.items, 10),
+ } else .{
+ null,
+ try parseUnsigned(usize, second.items, 10),
+ try parseUnsigned(usize, third.items, 10),
+ };
+
+ const payload: Payload = try .read(alloc, in, total_bytes);
+ errdefer payload.deinit(alloc);
+ try expectStreamBytes(in, "\r\n");
+
+ return .{
+ .HPUB = .{
+ .header_bytes = header_bytes,
+ .@"pub" = .{
+ .subject = subject,
+ .payload = payload,
+ .reply_to = reply_to,
+ },
+ },
+ };
+}
+
+test parseHPub {
+ const alloc = std.testing.allocator;
+ const expectEqualDeep = std.testing.expectEqualDeep;
+ const expectEqual = std.testing.expectEqual;
+ {
+ var in: Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
+ var res = try parseHPub(alloc, &in);
+ defer res.HPUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .HPUB = .{
+ .header_bytes = 22,
+ .@"pub" = .{
+ .subject = "foo",
+ .reply_to = null,
+ .payload = .{
+ .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len,
+ .short = blk: {
+ var s: [128]u8 = undefined;
+ const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!";
+ @memcpy(s[0..str.len], str);
+ break :blk s;
+ },
+ .long = null,
+ },
+ },
+ },
+ },
+ res,
+ );
+ try expectEqual(0, in.buffered().len);
+ }
+
+ {
+ var in: Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
+ var res = try parseHPub(alloc, &in);
+ defer res.HPUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .HPUB = .{
+ .header_bytes = 22,
+ .@"pub" = .{
+ .subject = "foo",
+ .reply_to = "reply.to",
+ .payload = .{
+ .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len,
+ .short = blk: {
+ var s: [128]u8 = undefined;
+ const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!";
+ @memcpy(s[0..str.len], str);
+ break :blk s;
+ },
+ .long = null,
+ },
+ },
+ },
+ },
+ res,
+ );
+ try expectEqual(0, in.buffered().len);
+ }
+
+ {
+ var in: Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
+ var res = try parseHPub(alloc, &in);
+ defer res.HPUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .HPUB = .{
+ .header_bytes = 22,
+ .@"pub" = .{
+ .subject = "foo",
+ .reply_to = "6",
+ .payload = .{
+ .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len,
+ .short = blk: {
+ var s: [128]u8 = undefined;
+ const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!";
+ @memcpy(s[0..str.len], str);
+ break :blk s;
+ },
+ .long = null,
+ },
+ },
+ },
+ },
+ res,
+ );
+ try expectEqual(0, in.buffered().len);
+ }
+}
+
+fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 {
+ var subject_list: ArrayList(u8) = .empty;
+ errdefer subject_list.deinit(alloc);
+
+ // Handle the first character
+ {
+ const byte = try in.takeByte();
+ if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>')))
+ return error.InvalidStream;
+
+ try subject_list.append(alloc, byte);
+ }
+
+ switch (pub_or_sub) {
+ .sub => {
+ while (in.takeByte()) |byte| {
+ if (isWhitespace(byte)) break;
+ if (byte == '.') {
+ const next_byte = try in.peekByte();
+ if (next_byte == '.' or isWhitespace(next_byte))
+ return error.InvalidStream;
+ } else if (byte == '>') {
+ const next_byte = try in.takeByte();
+ if (!isWhitespace(next_byte))
+ return error.InvalidStream;
+ } else if (byte == '*') {
+ const next_byte = try in.peekByte();
+ if (next_byte != '.' and !isWhitespace(next_byte))
+ return error.InvalidStream;
+ }
+ try subject_list.append(alloc, byte);
+ } else |err| return err;
+ },
+ .@"pub" => {
+ while (in.takeByte()) |byte| {
+ if (isWhitespace(byte)) break;
+ if (byte == '*' or byte == '>') return error.InvalidStream;
+ if (byte == '.') {
+ const next_byte = try in.peekByte();
+ if (next_byte == '.' or isWhitespace(next_byte))
+ return error.InvalidStream;
+ }
+ try subject_list.append(alloc, byte);
+ } else |err| return err;
+ },
+ }
+
+ return subject_list.toOwnedSlice(alloc);
+}
+
+inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void {
+ if (!std.mem.eql(u8, try reader.take(expected.len), expected)) {
+ @branchHint(.unlikely);
+ return error.InvalidStream;
+ }
+}
+
+test "parsing a stream" {
+ const alloc = std.testing.allocator;
+ const expectEqualDeep = std.testing.expectEqualDeep;
+ const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":fa" ++
+ "lse,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43" ++
+ ".0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r" ++
+ "\nPUB hi 3\r\nfoo\r\n";
+ var reader: Reader = .fixed(input);
+ var arena: ArenaAllocator = .init(alloc);
+ defer arena.deinit();
+ const gpa = arena.allocator();
+
+ {
+ const msg: Message = try Message.next(gpa, &reader);
+ const expected: Message = .{
+ .CONNECT = .{
+ .verbose = false,
+ .pedantic = false,
+ .tls_required = false,
+ .name = "NATS CLI Version v0.2.4",
+ .lang = "go",
+ .version = "1.43.0",
+ .protocol = 1,
+ .echo = true,
+ .headers = true,
+ .no_responders = true,
+ },
+ };
+
+ try expectEqualDeep(expected, msg);
+ }
+ {
+ const msg: Message = try Message.next(gpa, &reader);
+ const expected: Message = .{
+ .PUB = .{
+ .subject = "hi",
+ .payload = .{
+ .len = 3,
+ .short = blk: {
+ var s: [128]u8 = undefined;
+ const str = "foo";
+ @memcpy(s[0..str.len], str);
+ break :blk s;
+ },
+ .long = null,
+ },
+ },
+ };
+ try expectEqualDeep(expected, msg);
+ }
+}