diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-06 18:45:17 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-06 20:43:49 -0500 |
| commit | b87412ee66197d4c89f1fbf93b32fe63ed1c63ab (patch) | |
| tree | 00613d0d3f7178d0c5b974ce04a752443e9a816e /src/Server | |
| parent | 025a5344c8c922a8f46c4ee0e73a00ce0c3c4790 (diff) | |
Restructuring
Add a bunch of tests for the client
Diffstat (limited to 'src/Server')
| -rw-r--r-- | src/Server/Client.zig | 240 | ||||
| -rw-r--r-- | src/Server/message_parser.zig | 1141 |
2 files changed, 1381 insertions, 0 deletions
diff --git a/src/Server/Client.zig b/src/Server/Client.zig new file mode 100644 index 0000000..dff3534 --- /dev/null +++ b/src/Server/Client.zig @@ -0,0 +1,240 @@ +const Message = @import("message_parser.zig").Message; +const std = @import("std"); +const Queue = std.Io.Queue; + +const Client = @This(); + +pub const Msgs = union(enum) { + MSG: Message.Msg, + HMSG: Message.HMsg, +}; + +connect: ?Message.Connect, +// Used to own messages that we receive in our queues. +alloc: std.mem.Allocator, + +// Messages for this client to receive. +recv_queue: *Queue(Message), +msg_queue: *Queue(Msgs), + +from_client: *std.Io.Reader, +to_client: *std.Io.Writer, + +pub fn init( + connect: ?Message.Connect, + alloc: std.mem.Allocator, + recv_queue: *Queue(Message), + msg_queue: *Queue(Msgs), + in: *std.Io.Reader, + out: *std.Io.Writer, +) Client { + return .{ + .connect = connect, + .alloc = alloc, + .recv_queue = recv_queue, + .msg_queue = msg_queue, + .from_client = in, + .to_client = out, + }; +} + +pub fn deinit(self: *Client, alloc: std.mem.Allocator) void { + if (self.connect) |c| { + c.deinit(alloc); + } + self.* = undefined; +} + +pub fn start(self: *Client, io: std.Io) !void { + var msgs_buf: [1024]Msgs = undefined; + + var recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch @panic("Concurrency unavailable"); + errdefer _ = recv_msgs_task.cancel(io) catch {}; + + var recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; + errdefer _ = recv_proto_task.cancel(io) catch {}; + + while (true) { + switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) { + .msgs => |len_err| { + @branchHint(.likely); + const msgs = msgs_buf[0..try len_err]; + for (0..msgs.len) |i| { + const msg = msgs[i]; + defer switch (msg) { + .MSG => |m| m.deinit(self.alloc), + .HMSG => |h| h.deinit(self.alloc), + }; + errdefer for (msgs[i + 1 ..]) |mg| switch (mg) { + .MSG => |m| { + m.deinit(self.alloc); + }, + .HMSG => |h| { + h.deinit(self.alloc); + }, + }; + switch (msg) { + .MSG => |m| { + try self.to_client.print( + "MSG {s} {s} {s} {d}\r\n", + .{ + m.subject, + m.sid, + m.reply_to orelse "", + m.payload.len, + }, + ); + try m.payload.write(self.to_client); + try self.to_client.print("\r\n", .{}); + }, + .HMSG => |hmsg| { + try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{ + hmsg.msg.subject, + hmsg.msg.sid, + hmsg.msg.reply_to orelse "", + hmsg.header_bytes, + hmsg.msg.payload.len, + }); + try hmsg.msg.payload.write(self.to_client); + try self.to_client.print("\r\n", .{}); + }, + } + } + recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable; + }, + .proto => |msg_err| { + @branchHint(.unlikely); + const msg = try msg_err; + switch (msg) { + .@"+OK" => { + _ = try self.to_client.write("+OK\r\n"); + }, + .PONG => { + _ = try self.to_client.write("PONG\r\n"); + }, + .INFO => |info| { + _ = try self.to_client.write("INFO "); + try std.json.Stringify.value(info, .{}, self.to_client); + _ = try self.to_client.write("\r\n"); + }, + .@"-ERR" => |s| { + _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); + }, + else => |m| { + std.debug.panic("unimplemented write: {any}\n", .{m}); + }, + } + recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; + }, + } + try self.to_client.flush(); + } +} + +pub fn send(self: *Client, io: std.Io, msg: Message) !void { + switch (msg) { + .MSG => |m| try self.msg_queue.putOne(io, .{ .MSG = m }), + .HMSG => |m| try self.msg_queue.putOne(io, .{ .HMSG = m }), + else => try self.recv_queue.putOne(io, msg), + } +} + +test send { + const io = std.testing.io; + const gpa = std.testing.allocator; + + var to_client: std.Io.Writer = .fixed(blk: { + var buf: [1024]u8 = undefined; + break :blk &buf; + }); + var recv_queue: Queue(Message) = .init(&.{}); + var msgs_queue: Queue(Msgs) = .init(blk: { + var buf: [1]Msgs = undefined; + break :blk &buf; + }); + var client: Client = .init(null, gpa, &recv_queue, &msgs_queue, undefined, &to_client); + defer client.deinit(gpa); + + var c_task = try io.concurrent(Client.start, .{ &client, io }); + defer c_task.cancel(io) catch {}; + + { + try client.send(io, .PONG); + // Wait for the concurrent client task to write to the writer + try io.sleep(.fromMilliseconds(1), .awake); + try std.testing.expectEqualSlices(u8, "PONG\r\n", to_client.buffered()); + } + + to_client.end = 0; + + { + const payload = "payload"; + const msg: Message.Msg = .{ + .sid = "1", + .subject = "subject", + .reply_to = "reply", + .payload = .{ + .len = payload.len, + .short = blk: { + var buf: [128]u8 = undefined; + @memcpy(buf[0..payload.len], payload); + break :blk buf; + }, + .long = null, + }, + }; + try client.send(io, .{ + // msg must be owned by the allocator the client uses + .MSG = try msg.dupe(gpa), + }); + try io.sleep(.fromMilliseconds(1), .awake); + try std.testing.expectEqualSlices(u8, "MSG subject 1 reply 7\r\npayload\r\n", to_client.buffered()); + } +} + +pub fn next(self: *Client, allocator: std.mem.Allocator) !Message { + return Message.next(allocator, self.from_client); +} + +test next { + const gpa = std.testing.allocator; + + var from_client: std.Io.Reader = .fixed( + "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_r" ++ + "equired\":false,\"name\":\"NATS CLI Version v0.2." ++ + "4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"prot" ++ + "ocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n" ++ + "PING\r\n", + ); + + var client: Client = .init(null, undefined, undefined, undefined, &from_client, undefined); + + { + // Simulate stream + + { + const msg = try client.next(gpa); + try std.testing.expectEqual(.CONNECT, std.meta.activeTag(msg)); + defer msg.CONNECT.deinit(gpa); + try std.testing.expectEqualDeep(Message{ + .CONNECT = .{ + .verbose = false, + .pedantic = false, + .tls_required = false, + .name = "NATS CLI Version v0.2.4", + .lang = "go", + .version = "1.43.0", + .protocol = 1, + .echo = true, + .headers = true, + .no_responders = true, + }, + }, msg); + } + + { + const msg = try client.next(gpa); + try std.testing.expectEqual(.PING, std.meta.activeTag(msg)); + } + } +} diff --git a/src/Server/message_parser.zig b/src/Server/message_parser.zig new file mode 100644 index 0000000..fd1b5b1 --- /dev/null +++ b/src/Server/message_parser.zig @@ -0,0 +1,1141 @@ +const std = @import("std"); +const Allocator = std.mem.Allocator; +const ArenaAllocator = std.heap.ArenaAllocator; +const ArrayList = std.ArrayList; +const StaticStringMap = std.StaticStringMap; + +const Io = std.Io; +const Writer = Io.Writer; +const AllocatingWriter = Writer.Allocating; +const Reader = Io.Reader; + +const ascii = std.ascii; +const isDigit = std.ascii.isDigit; +const isUpper = std.ascii.isUpper; +const isWhitespace = std.ascii.isWhitespace; + +const parseUnsigned = std.fmt.parseUnsigned; + +const log = std.log; + +pub const Payload = struct { + len: u32, + short: [128]u8, + long: ?[]u8, + + pub fn read(alloc: Allocator, in: *Reader, bytes: usize) !Payload { + var res: Payload = .{ + .len = @intCast(bytes), + .short = undefined, + .long = null, + }; + + try in.readSliceAll(res.short[0..@min(bytes, res.short.len)]); + if (bytes > res.short.len) { + const long = try alloc.alloc(u8, bytes - res.short.len); + errdefer alloc.free(long); + try in.readSliceAll(long); + res.long = long; + } + return res; + } + + pub fn write(self: Payload, out: *Writer) !void { + std.debug.assert(out.buffer.len >= self.short.len); + std.debug.assert(self.len <= self.short.len or self.long != null); + try out.writeAll(self.short[0..@min(self.len, self.short.len)]); + if (self.long) |l| { + try out.writeAll(l); + } + } + + pub fn deinit(self: Payload, alloc: Allocator) void { + if (self.long) |l| { + alloc.free(l); + } + } + + pub fn dupe(self: Payload, alloc: Allocator) !Payload { + var res = self; + if (self.long) |l| { + res.long = try alloc.dupe(u8, l); + } + errdefer if (res.long) |l| alloc.free(l); + return res; + } +}; + +pub const MessageType = @typeInfo(Message).@"union".tag_type.?; + +pub const Message = union(enum) { + INFO: ServerInfo, + CONNECT: Connect, + PUB: Pub, + HPUB: HPub, + SUB: Sub, + UNSUB: Unsub, + MSG: Msg, + HMSG: HMsg, + PING, + PONG, + @"+OK": void, + @"-ERR": []const u8, + pub const ServerInfo = struct { + /// The unique identifier of the NATS server. + server_id: []const u8, + /// The name of the NATS server. + server_name: []const u8, + /// The version of NATS. + version: []const u8, + /// The version of golang the NATS server was built with. + go: []const u8 = "0.0.0", + /// The IP address used to start the NATS server, + /// by default this will be 0.0.0.0 and can be + /// configured with -client_advertise host:port. + host: []const u8 = "0.0.0.0", + /// The port number the NATS server is configured + /// to listen on. + port: u16 = 4222, + /// Whether the server supports headers. + headers: bool = false, + /// Maximum payload size, in bytes, that the server + /// will accept from the client. + max_payload: u64, + /// An integer indicating the protocol version of + /// the server. The server version 1.2.0 sets this + /// to 1 to indicate that it supports the "Echo" + /// feature. + proto: u32 = 1, + }; + pub const Connect = struct { + verbose: bool = false, + pedantic: bool = false, + tls_required: bool = false, + auth_token: ?[]const u8 = null, + user: ?[]const u8 = null, + pass: ?[]const u8 = null, + name: ?[]const u8 = null, + lang: []const u8, + version: []const u8, + protocol: u32, + echo: ?bool = null, + sig: ?[]const u8 = null, + jwt: ?[]const u8 = null, + no_responders: ?bool = null, + headers: ?bool = null, + nkey: ?[]const u8 = null, + + pub fn deinit(self: Connect, alloc: Allocator) void { + if (self.auth_token) |a| alloc.free(a); + if (self.user) |u| alloc.free(u); + if (self.pass) |p| alloc.free(p); + if (self.name) |n| alloc.free(n); + alloc.free(self.lang); + alloc.free(self.version); + if (self.sig) |s| alloc.free(s); + if (self.jwt) |j| alloc.free(j); + if (self.nkey) |n| alloc.free(n); + } + + pub fn dupe(self: Connect, alloc: Allocator) !Connect { + var res = self; + res.auth_token = if (self.auth_token) |a| try alloc.dupe(u8, a) else null; + errdefer if (res.auth_token) |a| alloc.free(a); + res.user = if (self.user) |u| try alloc.dupe(u8, u) else null; + errdefer if (res.user) |u| alloc.free(u); + res.pass = if (self.pass) |p| try alloc.dupe(u8, p) else null; + errdefer if (res.pass) |p| alloc.free(p); + res.name = if (self.name) |n| try alloc.dupe(u8, n) else null; + errdefer if (res.name) |n| alloc.free(n); + res.lang = try alloc.dupe(u8, self.lang); + errdefer alloc.free(res.lang); + res.version = try alloc.dupe(u8, self.version); + errdefer alloc.free(res.version); + res.sig = if (self.sig) |s| try alloc.dupe(u8, s) else null; + errdefer if (res.sig) |s| alloc.free(s); + res.jwt = if (self.jwt) |j| try alloc.dupe(u8, j) else null; + errdefer if (res.jwt) |j| alloc.free(j); + res.nkey = if (self.nkey) |n| try alloc.dupe(u8, n) else null; + errdefer if (res.nkey) |n| alloc.free(n); + return res; + } + }; + pub const Pub = struct { + /// The destination subject to publish to. + subject: []const u8, + /// The reply subject that subscribers can use to send a response back to the publisher/requestor. + reply_to: ?[]const u8 = null, + /// The message payload data. + payload: Payload, + + pub fn deinit(self: Pub, alloc: Allocator) void { + alloc.free(self.subject); + self.payload.deinit(alloc); + if (self.reply_to) |r| alloc.free(r); + } + + pub fn toMsg(self: Pub, alloc: Allocator, sid: []const u8) !Msg { + const res: Msg = .{ + .subject = self.subject, + .sid = sid, + .reply_to = self.reply_to, + .payload = self.payload, + }; + return res.dupe(alloc); + } + }; + pub const HPub = struct { + header_bytes: usize, + @"pub": Pub, + + pub fn deinit(self: HPub, alloc: Allocator) void { + self.@"pub".deinit(alloc); + } + + pub fn toHMsg(self: HPub, alloc: Allocator, sid: []const u8) !HMsg { + return .{ + .header_bytes = self.header_bytes, + .msg = try self.@"pub".toMsg(alloc, sid), + }; + } + }; + + pub const HMsg = struct { + header_bytes: usize, + msg: Msg, + + pub fn deinit(self: HMsg, alloc: Allocator) void { + self.msg.deinit(alloc); + } + + pub fn dupe(self: HMsg, alloc: Allocator) !HMsg { + var res = self; + res.msg = try self.msg.dupe(alloc); + errdefer alloc.free(res.msg); + return res; + } + }; + pub const Sub = struct { + /// The subject name to subscribe to. + subject: []const u8, + /// If specified, the subscriber will join this queue group. + queue_group: ?[]const u8, + /// A unique alphanumeric subscription ID, generated by the client. + sid: []const u8, + + pub fn deinit(self: Sub, alloc: Allocator) void { + alloc.free(self.subject); + alloc.free(self.sid); + if (self.queue_group) |q| alloc.free(q); + } + }; + pub const Unsub = struct { + /// The unique alphanumeric subscription ID of the subject to unsubscribe from. + sid: []const u8, + /// A number of messages to wait for before automatically unsubscribing. + max_msgs: ?usize = null, + + pub fn deinit(self: Unsub, alloc: Allocator) void { + alloc.free(self.sid); + } + }; + pub const Msg = struct { + subject: []const u8, + sid: []const u8, + reply_to: ?[]const u8, + payload: Payload, + + pub fn deinit(self: Msg, alloc: Allocator) void { + alloc.free(self.subject); + alloc.free(self.sid); + if (self.reply_to) |r| alloc.free(r); + self.payload.deinit(alloc); + } + + pub fn dupe(self: Msg, alloc: Allocator) !Msg { + var res: Msg = undefined; + res.subject = try alloc.dupe(u8, self.subject); + errdefer alloc.free(res.subject); + res.sid = try alloc.dupe(u8, self.sid); + errdefer alloc.free(res.sid); + res.reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null; + errdefer if (res.reply_to) |r| alloc.free(r); + res.payload = try self.payload.dupe(alloc); + errdefer alloc.free(res.payload); + return res; + } + }; + + const client_types = StaticStringMap(MessageType).initComptime( + .{ + // {"INFO", .info}, + .{ @tagName(.CONNECT), .CONNECT }, + .{ @tagName(.PUB), .PUB }, + .{ @tagName(.HPUB), .HPUB }, + .{ @tagName(.SUB), .SUB }, + .{ @tagName(.UNSUB), .UNSUB }, + // {"MSG", .msg}, + // {"HMSG", .hmsg}, + .{ @tagName(.PING), .PING }, + .{ @tagName(.PONG), .PONG }, + // {"+OK", .@"+ok"}, + // {"-ERR", .@"-err"}, + }, + ); + fn parseStaticStringMap(input: []const u8) ?MessageType { + return client_types.get(input); + } + + pub const parse = parseStaticStringMap; + + /// An error should be handled by cleaning up this connection. + pub fn next(alloc: Allocator, in: *Reader) !Message { + var operation_string: ArrayList(u8) = blk: { + comptime var buf_len = 0; + comptime { + for (client_types.keys()) |key| { + buf_len = @max(buf_len, key.len); + } + } + var buf: [buf_len]u8 = undefined; + break :blk .initBuffer(&buf); + }; + + while (in.peekByte()) |byte| { + if (isUpper(byte)) { + try operation_string.appendBounded(byte); + in.toss(1); + } else break; + } else |err| return err; + + const operation = parse(operation_string.items) orelse { + log.err("Invalid operation: '{s}'", .{operation_string.items}); + return error.InvalidOperation; + }; + + errdefer log.err("Failed to parse {s}", .{operation_string.items}); + + switch (operation) { + .CONNECT => { + return parseConnect(alloc, in); + }, + .PUB => { + @branchHint(.likely); + return parsePub(alloc, in); + }, + .HPUB => { + @branchHint(.likely); + return parseHPub(alloc, in); + }, + .PING => { + try expectStreamBytes(in, "\r\n"); + return .PING; + }, + .PONG => { + try expectStreamBytes(in, "\r\n"); + return .PONG; + }, + .SUB => { + return parseSub(alloc, in); + }, + .UNSUB => { + return parseUnsub(alloc, in); + }, + else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}), + } + } +}; + +fn parseConnect(alloc: Allocator, in: *Reader) !Message { + // for storing the json string + var connect_string_writer_allocating: AllocatingWriter = .init(alloc); + defer connect_string_writer_allocating.deinit(); + var connect_string_writer = &connect_string_writer_allocating.writer; + + // for parsing the json string + var connect_arena_allocator: ArenaAllocator = .init(alloc); + defer connect_arena_allocator.deinit(); + const connect_allocator = connect_arena_allocator.allocator(); + + try in.discardAll(1); // throw away space + + // Should read the next JSON object to the fixed buffer writer. + _ = try in.streamDelimiter(connect_string_writer, '}'); + try connect_string_writer.writeByte('}'); + try expectStreamBytes(in, "}\r\n"); // discard '}\r\n' + + const connect_str = try connect_string_writer_allocating.toOwnedSlice(); + defer alloc.free(connect_str); + // TODO: should be CONNECTION allocator + const res = try std.json.parseFromSliceLeaky( + Message.Connect, + connect_allocator, + connect_str, + .{ .allocate = .alloc_always }, + ); + + return .{ .CONNECT = try res.dupe(alloc) }; +} + +fn parseSub(alloc: Allocator, in: *Reader) !Message { + try in.discardAll(1); // throw away space + const subject = try readSubject(alloc, in, .sub); + + const States = enum { + before_second, + in_second, + after_second, + in_third, + in_end, + }; + + var second: ArrayList(u8) = .empty; + errdefer second.deinit(alloc); + var third: ?ArrayList(u8) = null; + errdefer if (third) |*t| t.deinit(alloc); + + sw: switch (@as(States, .before_second)) { + .before_second => { + const byte = try in.peekByte(); + if (isWhitespace(byte)) { + in.toss(1); + continue :sw .before_second; + } + continue :sw .in_second; + }, + .in_second => { + for (1..in.buffer.len) |i| { + try in.fill(i + 1); + if (isWhitespace(in.buffered()[i])) { + @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); + in.toss(i); + break; + } + } else return error.EndOfStream; + continue :sw .after_second; + }, + .after_second => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (isWhitespace(byte)) { + in.toss(1); + continue :sw .after_second; + } + third = .empty; + continue :sw .in_third; + }, + .in_third => { + for (1..in.buffer.len) |i| { + try in.fill(i + 1); + if (isWhitespace(in.buffered()[i])) { + @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); + in.toss(i); + break; + } + } else return error.EndOfStream; + continue :sw .in_end; + }, + .in_end => { + try expectStreamBytes(in, "\r\n"); + }, + } + + return .{ + .SUB = .{ + .subject = subject, + .queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null, + .sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc), + }, + }; +} + +test parseSub { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; + { + var in: Reader = .fixed(" foo 1\r\n"); + var res = try parseSub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "foo", + .queue_group = null, + .sid = "1", + }, + }, + res, + ); + } + { + var in: Reader = .fixed(" foo 1\r\n"); + var res = try parseSub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "foo", + .queue_group = null, + .sid = "1", + }, + }, + res, + ); + } + { + var in: Reader = .fixed(" foo q 1\r\n"); + var res = try parseSub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "foo", + .queue_group = "q", + .sid = "1", + }, + }, + res, + ); + } + { + var in: Reader = .fixed(" 1 q 1\r\n"); + var res = try parseSub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "1", + .queue_group = "q", + .sid = "1", + }, + }, + res, + ); + } + { + var in: Reader = .fixed(" $SRV.PING 4\r\n"); + var res = try parseSub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "$SRV.PING", + .queue_group = null, + .sid = "4", + }, + }, + res, + ); + } + { + var in: Reader = .fixed(" foo.echo q 10\r\n"); + var res = try parseSub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "foo.echo", + .queue_group = "q", + .sid = "10", + }, + }, + res, + ); + } +} + +fn parseUnsub(alloc: Allocator, in: *Reader) !Message { + const States = enum { + before_first, + in_first, + after_first, + in_second, + in_end, + }; + + var first: ArrayList(u8) = .empty; + errdefer first.deinit(alloc); + var second: ?ArrayList(u8) = null; + defer if (second) |*s| s.deinit(alloc); + + sw: switch (@as(States, .before_first)) { + .before_first => { + const byte = try in.peekByte(); + if (isWhitespace(byte)) { + in.toss(1); + continue :sw .before_first; + } + continue :sw .in_first; + }, + .in_first => { + const byte = try in.peekByte(); + if (!isWhitespace(byte)) { + try first.append(alloc, byte); + in.toss(1); + continue :sw .in_first; + } + continue :sw .after_first; + }, + .after_first => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (isWhitespace(byte)) { + in.toss(1); + continue :sw .after_first; + } + second = .empty; + continue :sw .in_second; + }, + .in_second => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } + try second.?.append(alloc, byte); + in.toss(1); + continue :sw .in_second; + }, + .in_end => { + try expectStreamBytes(in, "\r\n"); + }, + } + + return .{ + .UNSUB = .{ + .sid = try first.toOwnedSlice(alloc), + .max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null, + }, + }; +} + +test parseUnsub { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; + const expectEqual = std.testing.expectEqual; + { + var in: Reader = .fixed(" 1\r\n"); + var res = try parseUnsub(alloc, &in); + defer res.UNSUB.deinit(alloc); + try expectEqualDeep( + Message{ + .UNSUB = .{ + .sid = "1", + .max_msgs = null, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } + + { + var in: Reader = .fixed(" 1 1\r\n"); + var res = try parseUnsub(alloc, &in); + defer res.UNSUB.deinit(alloc); + try expectEqualDeep( + Message{ + .UNSUB = .{ + .sid = "1", + .max_msgs = 1, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } +} + +fn parsePub(alloc: Allocator, in: *Reader) !Message { + try in.discardAll(1); // throw away space + + // Parse subject + const subject: []const u8 = try readSubject(alloc, in, .@"pub"); + errdefer alloc.free(subject); + + const States = enum { + before_second, + in_second, + after_second, + in_third, + in_end, + }; + + var second: ArrayList(u8) = .empty; + defer second.deinit(alloc); + var third: ?ArrayList(u8) = null; + defer if (third) |*t| t.deinit(alloc); + + sw: switch (@as(States, .before_second)) { + .before_second => { + // Drop whitespace + const byte = try in.peekByte(); + if (isWhitespace(byte)) { + in.toss(1); + continue :sw .before_second; + } + continue :sw .in_second; + }, + .in_second => { + for (1..in.buffer.len) |i| { + try in.fill(i + 1); + if (isWhitespace(in.buffered()[i])) { + @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); + in.toss(i); + break; + } + } else return error.EndOfStream; + continue :sw .after_second; + }, + .after_second => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (isWhitespace(byte)) { + in.toss(1); + continue :sw .after_second; + } + third = .empty; + continue :sw .in_third; + }, + .in_third => { + for (1..in.buffer.len) |i| { + try in.fill(i + 1); + if (isWhitespace(in.buffered()[i])) { + @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); + in.toss(i); + break; + } + } else return error.EndOfStream; + continue :sw .in_end; + }, + .in_end => { + try expectStreamBytes(in, "\r\n"); + }, + } + + const reply_to: ?[]const u8, const bytes: usize = + if (third) |t| .{ + try alloc.dupe(u8, second.items), + try parseUnsigned(usize, t.items, 10), + } else .{ + null, + try parseUnsigned(usize, second.items, 10), + }; + + const payload: Payload = try .read(alloc, in, bytes); + errdefer payload.deinit(alloc); + try expectStreamBytes(in, "\r\n"); + + return .{ + .PUB = .{ + .subject = subject, + .payload = payload, + .reply_to = reply_to, + }, + }; +} + +test parsePub { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; + const expectEqual = std.testing.expectEqual; + { + var in: Reader = .fixed(" foo 3\r\nbar\r\n"); + var res = try parsePub(alloc, &in); + defer res.PUB.deinit(alloc); + try expectEqualDeep( + Message{ + .PUB = .{ + .subject = "foo", + .reply_to = null, + .payload = .{ + .len = 3, + .short = blk: { + var s: [128]u8 = undefined; + @memcpy(s[0..3], "bar"); + break :blk s; + }, + .long = null, + }, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } + + { + var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n"); + var res = try parsePub(alloc, &in); + defer res.PUB.deinit(alloc); + try expectEqualDeep( + Message{ + .PUB = .{ + .subject = "foo", + .reply_to = "reply.to", + .payload = .{ + .len = 3, + .short = blk: { + var s: [128]u8 = undefined; + @memcpy(s[0..3], "bar"); + break :blk s; + }, + .long = null, + }, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } + + // numeric reply subject + { + var in: Reader = .fixed(" foo 5 3\r\nbar\r\n"); + var res = try parsePub(alloc, &in); + defer res.PUB.deinit(alloc); + try expectEqualDeep( + Message{ + .PUB = .{ + .subject = "foo", + .reply_to = "5", + .payload = .{ + .len = 3, + .short = blk: { + var s: [128]u8 = undefined; + @memcpy(s[0..3], "bar"); + break :blk s; + }, + .long = null, + }, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } +} + +fn parseHPub(alloc: Allocator, in: *Reader) !Message { + try in.discardAll(1); // throw away space + + // Parse subject + const subject: []const u8 = try readSubject(alloc, in, .@"pub"); + errdefer alloc.free(subject); + + const States = enum { + before_second, + in_second, + after_second, + in_third, + after_third, + in_fourth, + in_end, + }; + + var second: ArrayList(u8) = .empty; + defer second.deinit(alloc); + var third: ArrayList(u8) = .empty; + defer third.deinit(alloc); + var fourth: ?ArrayList(u8) = null; + defer if (fourth) |*f| f.deinit(alloc); + + sw: switch (@as(States, .before_second)) { + .before_second => { + // Drop whitespace + const byte = try in.peekByte(); + if (isWhitespace(byte)) { + in.toss(1); + continue :sw .before_second; + } + continue :sw .in_second; + }, + .in_second => { + for (1..in.buffer.len) |i| { + try in.fill(i + 1); + if (isWhitespace(in.buffered()[i])) { + @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); + in.toss(i); + break; + } + } else return error.EndOfStream; + continue :sw .after_second; + }, + .after_second => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (isWhitespace(byte)) { + in.toss(1); + continue :sw .after_second; + } + third = .empty; + continue :sw .in_third; + }, + .in_third => { + for (1..in.buffer.len) |i| { + try in.fill(i + 1); + if (isWhitespace(in.buffered()[i])) { + @memcpy(try third.addManyAsSlice(alloc, i), in.buffered()[0..i]); + in.toss(i); + break; + } + } else return error.EndOfStream; + continue :sw .after_third; + }, + .after_third => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (isWhitespace(byte)) { + in.toss(1); + continue :sw .after_third; + } + fourth = .empty; + continue :sw .in_fourth; + }, + .in_fourth => { + for (1..in.buffer.len) |i| { + try in.fill(i + 1); + if (isWhitespace(in.buffered()[i])) { + @memcpy(try fourth.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); + in.toss(i); + break; + } + } else return error.EndOfStream; + continue :sw .in_end; + }, + .in_end => { + try expectStreamBytes(in, "\r\n"); + }, + } + + const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize = + if (fourth) |f| .{ + try alloc.dupe(u8, second.items), + try parseUnsigned(usize, third.items, 10), + try parseUnsigned(usize, f.items, 10), + } else .{ + null, + try parseUnsigned(usize, second.items, 10), + try parseUnsigned(usize, third.items, 10), + }; + + const payload: Payload = try .read(alloc, in, total_bytes); + errdefer payload.deinit(alloc); + try expectStreamBytes(in, "\r\n"); + + return .{ + .HPUB = .{ + .header_bytes = header_bytes, + .@"pub" = .{ + .subject = subject, + .payload = payload, + .reply_to = reply_to, + }, + }, + }; +} + +test parseHPub { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; + const expectEqual = std.testing.expectEqual; + { + var in: Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); + var res = try parseHPub(alloc, &in); + defer res.HPUB.deinit(alloc); + try expectEqualDeep( + Message{ + .HPUB = .{ + .header_bytes = 22, + .@"pub" = .{ + .subject = "foo", + .reply_to = null, + .payload = .{ + .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, + .short = blk: { + var s: [128]u8 = undefined; + const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; + @memcpy(s[0..str.len], str); + break :blk s; + }, + .long = null, + }, + }, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } + + { + var in: Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); + var res = try parseHPub(alloc, &in); + defer res.HPUB.deinit(alloc); + try expectEqualDeep( + Message{ + .HPUB = .{ + .header_bytes = 22, + .@"pub" = .{ + .subject = "foo", + .reply_to = "reply.to", + .payload = .{ + .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, + .short = blk: { + var s: [128]u8 = undefined; + const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; + @memcpy(s[0..str.len], str); + break :blk s; + }, + .long = null, + }, + }, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } + + { + var in: Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); + var res = try parseHPub(alloc, &in); + defer res.HPUB.deinit(alloc); + try expectEqualDeep( + Message{ + .HPUB = .{ + .header_bytes = 22, + .@"pub" = .{ + .subject = "foo", + .reply_to = "6", + .payload = .{ + .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, + .short = blk: { + var s: [128]u8 = undefined; + const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; + @memcpy(s[0..str.len], str); + break :blk s; + }, + .long = null, + }, + }, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } +} + +fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 { + var subject_list: ArrayList(u8) = .empty; + errdefer subject_list.deinit(alloc); + + // Handle the first character + { + const byte = try in.takeByte(); + if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>'))) + return error.InvalidStream; + + try subject_list.append(alloc, byte); + } + + switch (pub_or_sub) { + .sub => { + while (in.takeByte()) |byte| { + if (isWhitespace(byte)) break; + if (byte == '.') { + const next_byte = try in.peekByte(); + if (next_byte == '.' or isWhitespace(next_byte)) + return error.InvalidStream; + } else if (byte == '>') { + const next_byte = try in.takeByte(); + if (!isWhitespace(next_byte)) + return error.InvalidStream; + } else if (byte == '*') { + const next_byte = try in.peekByte(); + if (next_byte != '.' and !isWhitespace(next_byte)) + return error.InvalidStream; + } + try subject_list.append(alloc, byte); + } else |err| return err; + }, + .@"pub" => { + while (in.takeByte()) |byte| { + if (isWhitespace(byte)) break; + if (byte == '*' or byte == '>') return error.InvalidStream; + if (byte == '.') { + const next_byte = try in.peekByte(); + if (next_byte == '.' or isWhitespace(next_byte)) + return error.InvalidStream; + } + try subject_list.append(alloc, byte); + } else |err| return err; + }, + } + + return subject_list.toOwnedSlice(alloc); +} + +inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void { + if (!std.mem.eql(u8, try reader.take(expected.len), expected)) { + @branchHint(.unlikely); + return error.InvalidStream; + } +} + +test "parsing a stream" { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; + const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":fa" ++ + "lse,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43" ++ + ".0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r" ++ + "\nPUB hi 3\r\nfoo\r\n"; + var reader: Reader = .fixed(input); + var arena: ArenaAllocator = .init(alloc); + defer arena.deinit(); + const gpa = arena.allocator(); + + { + const msg: Message = try Message.next(gpa, &reader); + const expected: Message = .{ + .CONNECT = .{ + .verbose = false, + .pedantic = false, + .tls_required = false, + .name = "NATS CLI Version v0.2.4", + .lang = "go", + .version = "1.43.0", + .protocol = 1, + .echo = true, + .headers = true, + .no_responders = true, + }, + }; + + try expectEqualDeep(expected, msg); + } + { + const msg: Message = try Message.next(gpa, &reader); + const expected: Message = .{ + .PUB = .{ + .subject = "hi", + .payload = .{ + .len = 3, + .short = blk: { + var s: [128]u8 = undefined; + const str = "foo"; + @memcpy(s[0..str.len], str); + break :blk s; + }, + .long = null, + }, + }, + }; + try expectEqualDeep(expected, msg); + } +} |
