diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-06 18:45:17 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-06 20:43:49 -0500 |
| commit | b87412ee66197d4c89f1fbf93b32fe63ed1c63ab (patch) | |
| tree | 00613d0d3f7178d0c5b974ce04a752443e9a816e /src/Server/message_parser.zig | |
| parent | 025a5344c8c922a8f46c4ee0e73a00ce0c3c4790 (diff) | |
Restructuring
Add a bunch of tests for the client
Diffstat (limited to 'src/Server/message_parser.zig')
| -rw-r--r-- | src/Server/message_parser.zig | 1141 |
1 files changed, 1141 insertions, 0 deletions
diff --git a/src/Server/message_parser.zig b/src/Server/message_parser.zig new file mode 100644 index 0000000..fd1b5b1 --- /dev/null +++ b/src/Server/message_parser.zig @@ -0,0 +1,1141 @@ +const std = @import("std"); +const Allocator = std.mem.Allocator; +const ArenaAllocator = std.heap.ArenaAllocator; +const ArrayList = std.ArrayList; +const StaticStringMap = std.StaticStringMap; + +const Io = std.Io; +const Writer = Io.Writer; +const AllocatingWriter = Writer.Allocating; +const Reader = Io.Reader; + +const ascii = std.ascii; +const isDigit = std.ascii.isDigit; +const isUpper = std.ascii.isUpper; +const isWhitespace = std.ascii.isWhitespace; + +const parseUnsigned = std.fmt.parseUnsigned; + +const log = std.log; + +pub const Payload = struct { + len: u32, + short: [128]u8, + long: ?[]u8, + + pub fn read(alloc: Allocator, in: *Reader, bytes: usize) !Payload { + var res: Payload = .{ + .len = @intCast(bytes), + .short = undefined, + .long = null, + }; + + try in.readSliceAll(res.short[0..@min(bytes, res.short.len)]); + if (bytes > res.short.len) { + const long = try alloc.alloc(u8, bytes - res.short.len); + errdefer alloc.free(long); + try in.readSliceAll(long); + res.long = long; + } + return res; + } + + pub fn write(self: Payload, out: *Writer) !void { + std.debug.assert(out.buffer.len >= self.short.len); + std.debug.assert(self.len <= self.short.len or self.long != null); + try out.writeAll(self.short[0..@min(self.len, self.short.len)]); + if (self.long) |l| { + try out.writeAll(l); + } + } + + pub fn deinit(self: Payload, alloc: Allocator) void { + if (self.long) |l| { + alloc.free(l); + } + } + + pub fn dupe(self: Payload, alloc: Allocator) !Payload { + var res = self; + if (self.long) |l| { + res.long = try alloc.dupe(u8, l); + } + errdefer if (res.long) |l| alloc.free(l); + return res; + } +}; + +pub const MessageType = @typeInfo(Message).@"union".tag_type.?; + +pub const Message = union(enum) { + INFO: ServerInfo, + CONNECT: Connect, + PUB: Pub, + HPUB: HPub, + SUB: Sub, + UNSUB: Unsub, + MSG: Msg, + HMSG: HMsg, + PING, + PONG, + @"+OK": void, + @"-ERR": []const u8, + pub const ServerInfo = struct { + /// The unique identifier of the NATS server. + server_id: []const u8, + /// The name of the NATS server. + server_name: []const u8, + /// The version of NATS. + version: []const u8, + /// The version of golang the NATS server was built with. + go: []const u8 = "0.0.0", + /// The IP address used to start the NATS server, + /// by default this will be 0.0.0.0 and can be + /// configured with -client_advertise host:port. + host: []const u8 = "0.0.0.0", + /// The port number the NATS server is configured + /// to listen on. + port: u16 = 4222, + /// Whether the server supports headers. + headers: bool = false, + /// Maximum payload size, in bytes, that the server + /// will accept from the client. + max_payload: u64, + /// An integer indicating the protocol version of + /// the server. The server version 1.2.0 sets this + /// to 1 to indicate that it supports the "Echo" + /// feature. + proto: u32 = 1, + }; + pub const Connect = struct { + verbose: bool = false, + pedantic: bool = false, + tls_required: bool = false, + auth_token: ?[]const u8 = null, + user: ?[]const u8 = null, + pass: ?[]const u8 = null, + name: ?[]const u8 = null, + lang: []const u8, + version: []const u8, + protocol: u32, + echo: ?bool = null, + sig: ?[]const u8 = null, + jwt: ?[]const u8 = null, + no_responders: ?bool = null, + headers: ?bool = null, + nkey: ?[]const u8 = null, + + pub fn deinit(self: Connect, alloc: Allocator) void { + if (self.auth_token) |a| alloc.free(a); + if (self.user) |u| alloc.free(u); + if (self.pass) |p| alloc.free(p); + if (self.name) |n| alloc.free(n); + alloc.free(self.lang); + alloc.free(self.version); + if (self.sig) |s| alloc.free(s); + if (self.jwt) |j| alloc.free(j); + if (self.nkey) |n| alloc.free(n); + } + + pub fn dupe(self: Connect, alloc: Allocator) !Connect { + var res = self; + res.auth_token = if (self.auth_token) |a| try alloc.dupe(u8, a) else null; + errdefer if (res.auth_token) |a| alloc.free(a); + res.user = if (self.user) |u| try alloc.dupe(u8, u) else null; + errdefer if (res.user) |u| alloc.free(u); + res.pass = if (self.pass) |p| try alloc.dupe(u8, p) else null; + errdefer if (res.pass) |p| alloc.free(p); + res.name = if (self.name) |n| try alloc.dupe(u8, n) else null; + errdefer if (res.name) |n| alloc.free(n); + res.lang = try alloc.dupe(u8, self.lang); + errdefer alloc.free(res.lang); + res.version = try alloc.dupe(u8, self.version); + errdefer alloc.free(res.version); + res.sig = if (self.sig) |s| try alloc.dupe(u8, s) else null; + errdefer if (res.sig) |s| alloc.free(s); + res.jwt = if (self.jwt) |j| try alloc.dupe(u8, j) else null; + errdefer if (res.jwt) |j| alloc.free(j); + res.nkey = if (self.nkey) |n| try alloc.dupe(u8, n) else null; + errdefer if (res.nkey) |n| alloc.free(n); + return res; + } + }; + pub const Pub = struct { + /// The destination subject to publish to. + subject: []const u8, + /// The reply subject that subscribers can use to send a response back to the publisher/requestor. + reply_to: ?[]const u8 = null, + /// The message payload data. + payload: Payload, + + pub fn deinit(self: Pub, alloc: Allocator) void { + alloc.free(self.subject); + self.payload.deinit(alloc); + if (self.reply_to) |r| alloc.free(r); + } + + pub fn toMsg(self: Pub, alloc: Allocator, sid: []const u8) !Msg { + const res: Msg = .{ + .subject = self.subject, + .sid = sid, + .reply_to = self.reply_to, + .payload = self.payload, + }; + return res.dupe(alloc); + } + }; + pub const HPub = struct { + header_bytes: usize, + @"pub": Pub, + + pub fn deinit(self: HPub, alloc: Allocator) void { + self.@"pub".deinit(alloc); + } + + pub fn toHMsg(self: HPub, alloc: Allocator, sid: []const u8) !HMsg { + return .{ + .header_bytes = self.header_bytes, + .msg = try self.@"pub".toMsg(alloc, sid), + }; + } + }; + + pub const HMsg = struct { + header_bytes: usize, + msg: Msg, + + pub fn deinit(self: HMsg, alloc: Allocator) void { + self.msg.deinit(alloc); + } + + pub fn dupe(self: HMsg, alloc: Allocator) !HMsg { + var res = self; + res.msg = try self.msg.dupe(alloc); + errdefer alloc.free(res.msg); + return res; + } + }; + pub const Sub = struct { + /// The subject name to subscribe to. + subject: []const u8, + /// If specified, the subscriber will join this queue group. + queue_group: ?[]const u8, + /// A unique alphanumeric subscription ID, generated by the client. + sid: []const u8, + + pub fn deinit(self: Sub, alloc: Allocator) void { + alloc.free(self.subject); + alloc.free(self.sid); + if (self.queue_group) |q| alloc.free(q); + } + }; + pub const Unsub = struct { + /// The unique alphanumeric subscription ID of the subject to unsubscribe from. + sid: []const u8, + /// A number of messages to wait for before automatically unsubscribing. + max_msgs: ?usize = null, + + pub fn deinit(self: Unsub, alloc: Allocator) void { + alloc.free(self.sid); + } + }; + pub const Msg = struct { + subject: []const u8, + sid: []const u8, + reply_to: ?[]const u8, + payload: Payload, + + pub fn deinit(self: Msg, alloc: Allocator) void { + alloc.free(self.subject); + alloc.free(self.sid); + if (self.reply_to) |r| alloc.free(r); + self.payload.deinit(alloc); + } + + pub fn dupe(self: Msg, alloc: Allocator) !Msg { + var res: Msg = undefined; + res.subject = try alloc.dupe(u8, self.subject); + errdefer alloc.free(res.subject); + res.sid = try alloc.dupe(u8, self.sid); + errdefer alloc.free(res.sid); + res.reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null; + errdefer if (res.reply_to) |r| alloc.free(r); + res.payload = try self.payload.dupe(alloc); + errdefer alloc.free(res.payload); + return res; + } + }; + + const client_types = StaticStringMap(MessageType).initComptime( + .{ + // {"INFO", .info}, + .{ @tagName(.CONNECT), .CONNECT }, + .{ @tagName(.PUB), .PUB }, + .{ @tagName(.HPUB), .HPUB }, + .{ @tagName(.SUB), .SUB }, + .{ @tagName(.UNSUB), .UNSUB }, + // {"MSG", .msg}, + // {"HMSG", .hmsg}, + .{ @tagName(.PING), .PING }, + .{ @tagName(.PONG), .PONG }, + // {"+OK", .@"+ok"}, + // {"-ERR", .@"-err"}, + }, + ); + fn parseStaticStringMap(input: []const u8) ?MessageType { + return client_types.get(input); + } + + pub const parse = parseStaticStringMap; + + /// An error should be handled by cleaning up this connection. + pub fn next(alloc: Allocator, in: *Reader) !Message { + var operation_string: ArrayList(u8) = blk: { + comptime var buf_len = 0; + comptime { + for (client_types.keys()) |key| { + buf_len = @max(buf_len, key.len); + } + } + var buf: [buf_len]u8 = undefined; + break :blk .initBuffer(&buf); + }; + + while (in.peekByte()) |byte| { + if (isUpper(byte)) { + try operation_string.appendBounded(byte); + in.toss(1); + } else break; + } else |err| return err; + + const operation = parse(operation_string.items) orelse { + log.err("Invalid operation: '{s}'", .{operation_string.items}); + return error.InvalidOperation; + }; + + errdefer log.err("Failed to parse {s}", .{operation_string.items}); + + switch (operation) { + .CONNECT => { + return parseConnect(alloc, in); + }, + .PUB => { + @branchHint(.likely); + return parsePub(alloc, in); + }, + .HPUB => { + @branchHint(.likely); + return parseHPub(alloc, in); + }, + .PING => { + try expectStreamBytes(in, "\r\n"); + return .PING; + }, + .PONG => { + try expectStreamBytes(in, "\r\n"); + return .PONG; + }, + .SUB => { + return parseSub(alloc, in); + }, + .UNSUB => { + return parseUnsub(alloc, in); + }, + else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}), + } + } +}; + +fn parseConnect(alloc: Allocator, in: *Reader) !Message { + // for storing the json string + var connect_string_writer_allocating: AllocatingWriter = .init(alloc); + defer connect_string_writer_allocating.deinit(); + var connect_string_writer = &connect_string_writer_allocating.writer; + + // for parsing the json string + var connect_arena_allocator: ArenaAllocator = .init(alloc); + defer connect_arena_allocator.deinit(); + const connect_allocator = connect_arena_allocator.allocator(); + + try in.discardAll(1); // throw away space + + // Should read the next JSON object to the fixed buffer writer. + _ = try in.streamDelimiter(connect_string_writer, '}'); + try connect_string_writer.writeByte('}'); + try expectStreamBytes(in, "}\r\n"); // discard '}\r\n' + + const connect_str = try connect_string_writer_allocating.toOwnedSlice(); + defer alloc.free(connect_str); + // TODO: should be CONNECTION allocator + const res = try std.json.parseFromSliceLeaky( + Message.Connect, + connect_allocator, + connect_str, + .{ .allocate = .alloc_always }, + ); + + return .{ .CONNECT = try res.dupe(alloc) }; +} + +fn parseSub(alloc: Allocator, in: *Reader) !Message { + try in.discardAll(1); // throw away space + const subject = try readSubject(alloc, in, .sub); + + const States = enum { + before_second, + in_second, + after_second, + in_third, + in_end, + }; + + var second: ArrayList(u8) = .empty; + errdefer second.deinit(alloc); + var third: ?ArrayList(u8) = null; + errdefer if (third) |*t| t.deinit(alloc); + + sw: switch (@as(States, .before_second)) { + .before_second => { + const byte = try in.peekByte(); + if (isWhitespace(byte)) { + in.toss(1); + continue :sw .before_second; + } + continue :sw .in_second; + }, + .in_second => { + for (1..in.buffer.len) |i| { + try in.fill(i + 1); + if (isWhitespace(in.buffered()[i])) { + @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); + in.toss(i); + break; + } + } else return error.EndOfStream; + continue :sw .after_second; + }, + .after_second => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (isWhitespace(byte)) { + in.toss(1); + continue :sw .after_second; + } + third = .empty; + continue :sw .in_third; + }, + .in_third => { + for (1..in.buffer.len) |i| { + try in.fill(i + 1); + if (isWhitespace(in.buffered()[i])) { + @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); + in.toss(i); + break; + } + } else return error.EndOfStream; + continue :sw .in_end; + }, + .in_end => { + try expectStreamBytes(in, "\r\n"); + }, + } + + return .{ + .SUB = .{ + .subject = subject, + .queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null, + .sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc), + }, + }; +} + +test parseSub { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; + { + var in: Reader = .fixed(" foo 1\r\n"); + var res = try parseSub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "foo", + .queue_group = null, + .sid = "1", + }, + }, + res, + ); + } + { + var in: Reader = .fixed(" foo 1\r\n"); + var res = try parseSub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "foo", + .queue_group = null, + .sid = "1", + }, + }, + res, + ); + } + { + var in: Reader = .fixed(" foo q 1\r\n"); + var res = try parseSub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "foo", + .queue_group = "q", + .sid = "1", + }, + }, + res, + ); + } + { + var in: Reader = .fixed(" 1 q 1\r\n"); + var res = try parseSub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "1", + .queue_group = "q", + .sid = "1", + }, + }, + res, + ); + } + { + var in: Reader = .fixed(" $SRV.PING 4\r\n"); + var res = try parseSub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "$SRV.PING", + .queue_group = null, + .sid = "4", + }, + }, + res, + ); + } + { + var in: Reader = .fixed(" foo.echo q 10\r\n"); + var res = try parseSub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "foo.echo", + .queue_group = "q", + .sid = "10", + }, + }, + res, + ); + } +} + +fn parseUnsub(alloc: Allocator, in: *Reader) !Message { + const States = enum { + before_first, + in_first, + after_first, + in_second, + in_end, + }; + + var first: ArrayList(u8) = .empty; + errdefer first.deinit(alloc); + var second: ?ArrayList(u8) = null; + defer if (second) |*s| s.deinit(alloc); + + sw: switch (@as(States, .before_first)) { + .before_first => { + const byte = try in.peekByte(); + if (isWhitespace(byte)) { + in.toss(1); + continue :sw .before_first; + } + continue :sw .in_first; + }, + .in_first => { + const byte = try in.peekByte(); + if (!isWhitespace(byte)) { + try first.append(alloc, byte); + in.toss(1); + continue :sw .in_first; + } + continue :sw .after_first; + }, + .after_first => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (isWhitespace(byte)) { + in.toss(1); + continue :sw .after_first; + } + second = .empty; + continue :sw .in_second; + }, + .in_second => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } + try second.?.append(alloc, byte); + in.toss(1); + continue :sw .in_second; + }, + .in_end => { + try expectStreamBytes(in, "\r\n"); + }, + } + + return .{ + .UNSUB = .{ + .sid = try first.toOwnedSlice(alloc), + .max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null, + }, + }; +} + +test parseUnsub { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; + const expectEqual = std.testing.expectEqual; + { + var in: Reader = .fixed(" 1\r\n"); + var res = try parseUnsub(alloc, &in); + defer res.UNSUB.deinit(alloc); + try expectEqualDeep( + Message{ + .UNSUB = .{ + .sid = "1", + .max_msgs = null, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } + + { + var in: Reader = .fixed(" 1 1\r\n"); + var res = try parseUnsub(alloc, &in); + defer res.UNSUB.deinit(alloc); + try expectEqualDeep( + Message{ + .UNSUB = .{ + .sid = "1", + .max_msgs = 1, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } +} + +fn parsePub(alloc: Allocator, in: *Reader) !Message { + try in.discardAll(1); // throw away space + + // Parse subject + const subject: []const u8 = try readSubject(alloc, in, .@"pub"); + errdefer alloc.free(subject); + + const States = enum { + before_second, + in_second, + after_second, + in_third, + in_end, + }; + + var second: ArrayList(u8) = .empty; + defer second.deinit(alloc); + var third: ?ArrayList(u8) = null; + defer if (third) |*t| t.deinit(alloc); + + sw: switch (@as(States, .before_second)) { + .before_second => { + // Drop whitespace + const byte = try in.peekByte(); + if (isWhitespace(byte)) { + in.toss(1); + continue :sw .before_second; + } + continue :sw .in_second; + }, + .in_second => { + for (1..in.buffer.len) |i| { + try in.fill(i + 1); + if (isWhitespace(in.buffered()[i])) { + @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); + in.toss(i); + break; + } + } else return error.EndOfStream; + continue :sw .after_second; + }, + .after_second => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (isWhitespace(byte)) { + in.toss(1); + continue :sw .after_second; + } + third = .empty; + continue :sw .in_third; + }, + .in_third => { + for (1..in.buffer.len) |i| { + try in.fill(i + 1); + if (isWhitespace(in.buffered()[i])) { + @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); + in.toss(i); + break; + } + } else return error.EndOfStream; + continue :sw .in_end; + }, + .in_end => { + try expectStreamBytes(in, "\r\n"); + }, + } + + const reply_to: ?[]const u8, const bytes: usize = + if (third) |t| .{ + try alloc.dupe(u8, second.items), + try parseUnsigned(usize, t.items, 10), + } else .{ + null, + try parseUnsigned(usize, second.items, 10), + }; + + const payload: Payload = try .read(alloc, in, bytes); + errdefer payload.deinit(alloc); + try expectStreamBytes(in, "\r\n"); + + return .{ + .PUB = .{ + .subject = subject, + .payload = payload, + .reply_to = reply_to, + }, + }; +} + +test parsePub { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; + const expectEqual = std.testing.expectEqual; + { + var in: Reader = .fixed(" foo 3\r\nbar\r\n"); + var res = try parsePub(alloc, &in); + defer res.PUB.deinit(alloc); + try expectEqualDeep( + Message{ + .PUB = .{ + .subject = "foo", + .reply_to = null, + .payload = .{ + .len = 3, + .short = blk: { + var s: [128]u8 = undefined; + @memcpy(s[0..3], "bar"); + break :blk s; + }, + .long = null, + }, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } + + { + var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n"); + var res = try parsePub(alloc, &in); + defer res.PUB.deinit(alloc); + try expectEqualDeep( + Message{ + .PUB = .{ + .subject = "foo", + .reply_to = "reply.to", + .payload = .{ + .len = 3, + .short = blk: { + var s: [128]u8 = undefined; + @memcpy(s[0..3], "bar"); + break :blk s; + }, + .long = null, + }, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } + + // numeric reply subject + { + var in: Reader = .fixed(" foo 5 3\r\nbar\r\n"); + var res = try parsePub(alloc, &in); + defer res.PUB.deinit(alloc); + try expectEqualDeep( + Message{ + .PUB = .{ + .subject = "foo", + .reply_to = "5", + .payload = .{ + .len = 3, + .short = blk: { + var s: [128]u8 = undefined; + @memcpy(s[0..3], "bar"); + break :blk s; + }, + .long = null, + }, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } +} + +fn parseHPub(alloc: Allocator, in: *Reader) !Message { + try in.discardAll(1); // throw away space + + // Parse subject + const subject: []const u8 = try readSubject(alloc, in, .@"pub"); + errdefer alloc.free(subject); + + const States = enum { + before_second, + in_second, + after_second, + in_third, + after_third, + in_fourth, + in_end, + }; + + var second: ArrayList(u8) = .empty; + defer second.deinit(alloc); + var third: ArrayList(u8) = .empty; + defer third.deinit(alloc); + var fourth: ?ArrayList(u8) = null; + defer if (fourth) |*f| f.deinit(alloc); + + sw: switch (@as(States, .before_second)) { + .before_second => { + // Drop whitespace + const byte = try in.peekByte(); + if (isWhitespace(byte)) { + in.toss(1); + continue :sw .before_second; + } + continue :sw .in_second; + }, + .in_second => { + for (1..in.buffer.len) |i| { + try in.fill(i + 1); + if (isWhitespace(in.buffered()[i])) { + @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); + in.toss(i); + break; + } + } else return error.EndOfStream; + continue :sw .after_second; + }, + .after_second => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (isWhitespace(byte)) { + in.toss(1); + continue :sw .after_second; + } + third = .empty; + continue :sw .in_third; + }, + .in_third => { + for (1..in.buffer.len) |i| { + try in.fill(i + 1); + if (isWhitespace(in.buffered()[i])) { + @memcpy(try third.addManyAsSlice(alloc, i), in.buffered()[0..i]); + in.toss(i); + break; + } + } else return error.EndOfStream; + continue :sw .after_third; + }, + .after_third => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (isWhitespace(byte)) { + in.toss(1); + continue :sw .after_third; + } + fourth = .empty; + continue :sw .in_fourth; + }, + .in_fourth => { + for (1..in.buffer.len) |i| { + try in.fill(i + 1); + if (isWhitespace(in.buffered()[i])) { + @memcpy(try fourth.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); + in.toss(i); + break; + } + } else return error.EndOfStream; + continue :sw .in_end; + }, + .in_end => { + try expectStreamBytes(in, "\r\n"); + }, + } + + const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize = + if (fourth) |f| .{ + try alloc.dupe(u8, second.items), + try parseUnsigned(usize, third.items, 10), + try parseUnsigned(usize, f.items, 10), + } else .{ + null, + try parseUnsigned(usize, second.items, 10), + try parseUnsigned(usize, third.items, 10), + }; + + const payload: Payload = try .read(alloc, in, total_bytes); + errdefer payload.deinit(alloc); + try expectStreamBytes(in, "\r\n"); + + return .{ + .HPUB = .{ + .header_bytes = header_bytes, + .@"pub" = .{ + .subject = subject, + .payload = payload, + .reply_to = reply_to, + }, + }, + }; +} + +test parseHPub { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; + const expectEqual = std.testing.expectEqual; + { + var in: Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); + var res = try parseHPub(alloc, &in); + defer res.HPUB.deinit(alloc); + try expectEqualDeep( + Message{ + .HPUB = .{ + .header_bytes = 22, + .@"pub" = .{ + .subject = "foo", + .reply_to = null, + .payload = .{ + .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, + .short = blk: { + var s: [128]u8 = undefined; + const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; + @memcpy(s[0..str.len], str); + break :blk s; + }, + .long = null, + }, + }, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } + + { + var in: Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); + var res = try parseHPub(alloc, &in); + defer res.HPUB.deinit(alloc); + try expectEqualDeep( + Message{ + .HPUB = .{ + .header_bytes = 22, + .@"pub" = .{ + .subject = "foo", + .reply_to = "reply.to", + .payload = .{ + .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, + .short = blk: { + var s: [128]u8 = undefined; + const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; + @memcpy(s[0..str.len], str); + break :blk s; + }, + .long = null, + }, + }, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } + + { + var in: Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); + var res = try parseHPub(alloc, &in); + defer res.HPUB.deinit(alloc); + try expectEqualDeep( + Message{ + .HPUB = .{ + .header_bytes = 22, + .@"pub" = .{ + .subject = "foo", + .reply_to = "6", + .payload = .{ + .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, + .short = blk: { + var s: [128]u8 = undefined; + const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; + @memcpy(s[0..str.len], str); + break :blk s; + }, + .long = null, + }, + }, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } +} + +fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 { + var subject_list: ArrayList(u8) = .empty; + errdefer subject_list.deinit(alloc); + + // Handle the first character + { + const byte = try in.takeByte(); + if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>'))) + return error.InvalidStream; + + try subject_list.append(alloc, byte); + } + + switch (pub_or_sub) { + .sub => { + while (in.takeByte()) |byte| { + if (isWhitespace(byte)) break; + if (byte == '.') { + const next_byte = try in.peekByte(); + if (next_byte == '.' or isWhitespace(next_byte)) + return error.InvalidStream; + } else if (byte == '>') { + const next_byte = try in.takeByte(); + if (!isWhitespace(next_byte)) + return error.InvalidStream; + } else if (byte == '*') { + const next_byte = try in.peekByte(); + if (next_byte != '.' and !isWhitespace(next_byte)) + return error.InvalidStream; + } + try subject_list.append(alloc, byte); + } else |err| return err; + }, + .@"pub" => { + while (in.takeByte()) |byte| { + if (isWhitespace(byte)) break; + if (byte == '*' or byte == '>') return error.InvalidStream; + if (byte == '.') { + const next_byte = try in.peekByte(); + if (next_byte == '.' or isWhitespace(next_byte)) + return error.InvalidStream; + } + try subject_list.append(alloc, byte); + } else |err| return err; + }, + } + + return subject_list.toOwnedSlice(alloc); +} + +inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void { + if (!std.mem.eql(u8, try reader.take(expected.len), expected)) { + @branchHint(.unlikely); + return error.InvalidStream; + } +} + +test "parsing a stream" { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; + const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":fa" ++ + "lse,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43" ++ + ".0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r" ++ + "\nPUB hi 3\r\nfoo\r\n"; + var reader: Reader = .fixed(input); + var arena: ArenaAllocator = .init(alloc); + defer arena.deinit(); + const gpa = arena.allocator(); + + { + const msg: Message = try Message.next(gpa, &reader); + const expected: Message = .{ + .CONNECT = .{ + .verbose = false, + .pedantic = false, + .tls_required = false, + .name = "NATS CLI Version v0.2.4", + .lang = "go", + .version = "1.43.0", + .protocol = 1, + .echo = true, + .headers = true, + .no_responders = true, + }, + }; + + try expectEqualDeep(expected, msg); + } + { + const msg: Message = try Message.next(gpa, &reader); + const expected: Message = .{ + .PUB = .{ + .subject = "hi", + .payload = .{ + .len = 3, + .short = blk: { + var s: [128]u8 = undefined; + const str = "foo"; + @memcpy(s[0..str.len], str); + break :blk s; + }, + .long = null, + }, + }, + }; + try expectEqualDeep(expected, msg); + } +} |
