From 48969283527e0db6b71893b2b3f3bbeb21e522db Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Tue, 6 Jan 2026 21:56:39 -0500 Subject: Major restructuring This makes things much easier to use as a library --- src/Server/message_parser.zig | 1141 ----------------------------------------- 1 file changed, 1141 deletions(-) delete mode 100644 src/Server/message_parser.zig (limited to 'src/Server/message_parser.zig') diff --git a/src/Server/message_parser.zig b/src/Server/message_parser.zig deleted file mode 100644 index fd1b5b1..0000000 --- a/src/Server/message_parser.zig +++ /dev/null @@ -1,1141 +0,0 @@ -const std = @import("std"); -const Allocator = std.mem.Allocator; -const ArenaAllocator = std.heap.ArenaAllocator; -const ArrayList = std.ArrayList; -const StaticStringMap = std.StaticStringMap; - -const Io = std.Io; -const Writer = Io.Writer; -const AllocatingWriter = Writer.Allocating; -const Reader = Io.Reader; - -const ascii = std.ascii; -const isDigit = std.ascii.isDigit; -const isUpper = std.ascii.isUpper; -const isWhitespace = std.ascii.isWhitespace; - -const parseUnsigned = std.fmt.parseUnsigned; - -const log = std.log; - -pub const Payload = struct { - len: u32, - short: [128]u8, - long: ?[]u8, - - pub fn read(alloc: Allocator, in: *Reader, bytes: usize) !Payload { - var res: Payload = .{ - .len = @intCast(bytes), - .short = undefined, - .long = null, - }; - - try in.readSliceAll(res.short[0..@min(bytes, res.short.len)]); - if (bytes > res.short.len) { - const long = try alloc.alloc(u8, bytes - res.short.len); - errdefer alloc.free(long); - try in.readSliceAll(long); - res.long = long; - } - return res; - } - - pub fn write(self: Payload, out: *Writer) !void { - std.debug.assert(out.buffer.len >= self.short.len); - std.debug.assert(self.len <= self.short.len or self.long != null); - try out.writeAll(self.short[0..@min(self.len, self.short.len)]); - if (self.long) |l| { - try out.writeAll(l); - } - } - - pub fn deinit(self: Payload, alloc: Allocator) void { - if (self.long) |l| { - alloc.free(l); - } - } - - pub fn dupe(self: Payload, alloc: Allocator) !Payload { - var res = self; - if (self.long) |l| { - res.long = try alloc.dupe(u8, l); - } - errdefer if (res.long) |l| alloc.free(l); - return res; - } -}; - -pub const MessageType = @typeInfo(Message).@"union".tag_type.?; - -pub const Message = union(enum) { - INFO: ServerInfo, - CONNECT: Connect, - PUB: Pub, - HPUB: HPub, - SUB: Sub, - UNSUB: Unsub, - MSG: Msg, - HMSG: HMsg, - PING, - PONG, - @"+OK": void, - @"-ERR": []const u8, - pub const ServerInfo = struct { - /// The unique identifier of the NATS server. - server_id: []const u8, - /// The name of the NATS server. - server_name: []const u8, - /// The version of NATS. - version: []const u8, - /// The version of golang the NATS server was built with. - go: []const u8 = "0.0.0", - /// The IP address used to start the NATS server, - /// by default this will be 0.0.0.0 and can be - /// configured with -client_advertise host:port. - host: []const u8 = "0.0.0.0", - /// The port number the NATS server is configured - /// to listen on. - port: u16 = 4222, - /// Whether the server supports headers. - headers: bool = false, - /// Maximum payload size, in bytes, that the server - /// will accept from the client. - max_payload: u64, - /// An integer indicating the protocol version of - /// the server. The server version 1.2.0 sets this - /// to 1 to indicate that it supports the "Echo" - /// feature. - proto: u32 = 1, - }; - pub const Connect = struct { - verbose: bool = false, - pedantic: bool = false, - tls_required: bool = false, - auth_token: ?[]const u8 = null, - user: ?[]const u8 = null, - pass: ?[]const u8 = null, - name: ?[]const u8 = null, - lang: []const u8, - version: []const u8, - protocol: u32, - echo: ?bool = null, - sig: ?[]const u8 = null, - jwt: ?[]const u8 = null, - no_responders: ?bool = null, - headers: ?bool = null, - nkey: ?[]const u8 = null, - - pub fn deinit(self: Connect, alloc: Allocator) void { - if (self.auth_token) |a| alloc.free(a); - if (self.user) |u| alloc.free(u); - if (self.pass) |p| alloc.free(p); - if (self.name) |n| alloc.free(n); - alloc.free(self.lang); - alloc.free(self.version); - if (self.sig) |s| alloc.free(s); - if (self.jwt) |j| alloc.free(j); - if (self.nkey) |n| alloc.free(n); - } - - pub fn dupe(self: Connect, alloc: Allocator) !Connect { - var res = self; - res.auth_token = if (self.auth_token) |a| try alloc.dupe(u8, a) else null; - errdefer if (res.auth_token) |a| alloc.free(a); - res.user = if (self.user) |u| try alloc.dupe(u8, u) else null; - errdefer if (res.user) |u| alloc.free(u); - res.pass = if (self.pass) |p| try alloc.dupe(u8, p) else null; - errdefer if (res.pass) |p| alloc.free(p); - res.name = if (self.name) |n| try alloc.dupe(u8, n) else null; - errdefer if (res.name) |n| alloc.free(n); - res.lang = try alloc.dupe(u8, self.lang); - errdefer alloc.free(res.lang); - res.version = try alloc.dupe(u8, self.version); - errdefer alloc.free(res.version); - res.sig = if (self.sig) |s| try alloc.dupe(u8, s) else null; - errdefer if (res.sig) |s| alloc.free(s); - res.jwt = if (self.jwt) |j| try alloc.dupe(u8, j) else null; - errdefer if (res.jwt) |j| alloc.free(j); - res.nkey = if (self.nkey) |n| try alloc.dupe(u8, n) else null; - errdefer if (res.nkey) |n| alloc.free(n); - return res; - } - }; - pub const Pub = struct { - /// The destination subject to publish to. - subject: []const u8, - /// The reply subject that subscribers can use to send a response back to the publisher/requestor. - reply_to: ?[]const u8 = null, - /// The message payload data. - payload: Payload, - - pub fn deinit(self: Pub, alloc: Allocator) void { - alloc.free(self.subject); - self.payload.deinit(alloc); - if (self.reply_to) |r| alloc.free(r); - } - - pub fn toMsg(self: Pub, alloc: Allocator, sid: []const u8) !Msg { - const res: Msg = .{ - .subject = self.subject, - .sid = sid, - .reply_to = self.reply_to, - .payload = self.payload, - }; - return res.dupe(alloc); - } - }; - pub const HPub = struct { - header_bytes: usize, - @"pub": Pub, - - pub fn deinit(self: HPub, alloc: Allocator) void { - self.@"pub".deinit(alloc); - } - - pub fn toHMsg(self: HPub, alloc: Allocator, sid: []const u8) !HMsg { - return .{ - .header_bytes = self.header_bytes, - .msg = try self.@"pub".toMsg(alloc, sid), - }; - } - }; - - pub const HMsg = struct { - header_bytes: usize, - msg: Msg, - - pub fn deinit(self: HMsg, alloc: Allocator) void { - self.msg.deinit(alloc); - } - - pub fn dupe(self: HMsg, alloc: Allocator) !HMsg { - var res = self; - res.msg = try self.msg.dupe(alloc); - errdefer alloc.free(res.msg); - return res; - } - }; - pub const Sub = struct { - /// The subject name to subscribe to. - subject: []const u8, - /// If specified, the subscriber will join this queue group. - queue_group: ?[]const u8, - /// A unique alphanumeric subscription ID, generated by the client. - sid: []const u8, - - pub fn deinit(self: Sub, alloc: Allocator) void { - alloc.free(self.subject); - alloc.free(self.sid); - if (self.queue_group) |q| alloc.free(q); - } - }; - pub const Unsub = struct { - /// The unique alphanumeric subscription ID of the subject to unsubscribe from. - sid: []const u8, - /// A number of messages to wait for before automatically unsubscribing. - max_msgs: ?usize = null, - - pub fn deinit(self: Unsub, alloc: Allocator) void { - alloc.free(self.sid); - } - }; - pub const Msg = struct { - subject: []const u8, - sid: []const u8, - reply_to: ?[]const u8, - payload: Payload, - - pub fn deinit(self: Msg, alloc: Allocator) void { - alloc.free(self.subject); - alloc.free(self.sid); - if (self.reply_to) |r| alloc.free(r); - self.payload.deinit(alloc); - } - - pub fn dupe(self: Msg, alloc: Allocator) !Msg { - var res: Msg = undefined; - res.subject = try alloc.dupe(u8, self.subject); - errdefer alloc.free(res.subject); - res.sid = try alloc.dupe(u8, self.sid); - errdefer alloc.free(res.sid); - res.reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null; - errdefer if (res.reply_to) |r| alloc.free(r); - res.payload = try self.payload.dupe(alloc); - errdefer alloc.free(res.payload); - return res; - } - }; - - const client_types = StaticStringMap(MessageType).initComptime( - .{ - // {"INFO", .info}, - .{ @tagName(.CONNECT), .CONNECT }, - .{ @tagName(.PUB), .PUB }, - .{ @tagName(.HPUB), .HPUB }, - .{ @tagName(.SUB), .SUB }, - .{ @tagName(.UNSUB), .UNSUB }, - // {"MSG", .msg}, - // {"HMSG", .hmsg}, - .{ @tagName(.PING), .PING }, - .{ @tagName(.PONG), .PONG }, - // {"+OK", .@"+ok"}, - // {"-ERR", .@"-err"}, - }, - ); - fn parseStaticStringMap(input: []const u8) ?MessageType { - return client_types.get(input); - } - - pub const parse = parseStaticStringMap; - - /// An error should be handled by cleaning up this connection. - pub fn next(alloc: Allocator, in: *Reader) !Message { - var operation_string: ArrayList(u8) = blk: { - comptime var buf_len = 0; - comptime { - for (client_types.keys()) |key| { - buf_len = @max(buf_len, key.len); - } - } - var buf: [buf_len]u8 = undefined; - break :blk .initBuffer(&buf); - }; - - while (in.peekByte()) |byte| { - if (isUpper(byte)) { - try operation_string.appendBounded(byte); - in.toss(1); - } else break; - } else |err| return err; - - const operation = parse(operation_string.items) orelse { - log.err("Invalid operation: '{s}'", .{operation_string.items}); - return error.InvalidOperation; - }; - - errdefer log.err("Failed to parse {s}", .{operation_string.items}); - - switch (operation) { - .CONNECT => { - return parseConnect(alloc, in); - }, - .PUB => { - @branchHint(.likely); - return parsePub(alloc, in); - }, - .HPUB => { - @branchHint(.likely); - return parseHPub(alloc, in); - }, - .PING => { - try expectStreamBytes(in, "\r\n"); - return .PING; - }, - .PONG => { - try expectStreamBytes(in, "\r\n"); - return .PONG; - }, - .SUB => { - return parseSub(alloc, in); - }, - .UNSUB => { - return parseUnsub(alloc, in); - }, - else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}), - } - } -}; - -fn parseConnect(alloc: Allocator, in: *Reader) !Message { - // for storing the json string - var connect_string_writer_allocating: AllocatingWriter = .init(alloc); - defer connect_string_writer_allocating.deinit(); - var connect_string_writer = &connect_string_writer_allocating.writer; - - // for parsing the json string - var connect_arena_allocator: ArenaAllocator = .init(alloc); - defer connect_arena_allocator.deinit(); - const connect_allocator = connect_arena_allocator.allocator(); - - try in.discardAll(1); // throw away space - - // Should read the next JSON object to the fixed buffer writer. - _ = try in.streamDelimiter(connect_string_writer, '}'); - try connect_string_writer.writeByte('}'); - try expectStreamBytes(in, "}\r\n"); // discard '}\r\n' - - const connect_str = try connect_string_writer_allocating.toOwnedSlice(); - defer alloc.free(connect_str); - // TODO: should be CONNECTION allocator - const res = try std.json.parseFromSliceLeaky( - Message.Connect, - connect_allocator, - connect_str, - .{ .allocate = .alloc_always }, - ); - - return .{ .CONNECT = try res.dupe(alloc) }; -} - -fn parseSub(alloc: Allocator, in: *Reader) !Message { - try in.discardAll(1); // throw away space - const subject = try readSubject(alloc, in, .sub); - - const States = enum { - before_second, - in_second, - after_second, - in_third, - in_end, - }; - - var second: ArrayList(u8) = .empty; - errdefer second.deinit(alloc); - var third: ?ArrayList(u8) = null; - errdefer if (third) |*t| t.deinit(alloc); - - sw: switch (@as(States, .before_second)) { - .before_second => { - const byte = try in.peekByte(); - if (isWhitespace(byte)) { - in.toss(1); - continue :sw .before_second; - } - continue :sw .in_second; - }, - .in_second => { - for (1..in.buffer.len) |i| { - try in.fill(i + 1); - if (isWhitespace(in.buffered()[i])) { - @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); - in.toss(i); - break; - } - } else return error.EndOfStream; - continue :sw .after_second; - }, - .after_second => { - const byte = try in.peekByte(); - if (byte == '\r') { - continue :sw .in_end; - } else if (isWhitespace(byte)) { - in.toss(1); - continue :sw .after_second; - } - third = .empty; - continue :sw .in_third; - }, - .in_third => { - for (1..in.buffer.len) |i| { - try in.fill(i + 1); - if (isWhitespace(in.buffered()[i])) { - @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); - in.toss(i); - break; - } - } else return error.EndOfStream; - continue :sw .in_end; - }, - .in_end => { - try expectStreamBytes(in, "\r\n"); - }, - } - - return .{ - .SUB = .{ - .subject = subject, - .queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null, - .sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc), - }, - }; -} - -test parseSub { - const alloc = std.testing.allocator; - const expectEqualDeep = std.testing.expectEqualDeep; - { - var in: Reader = .fixed(" foo 1\r\n"); - var res = try parseSub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "foo", - .queue_group = null, - .sid = "1", - }, - }, - res, - ); - } - { - var in: Reader = .fixed(" foo 1\r\n"); - var res = try parseSub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "foo", - .queue_group = null, - .sid = "1", - }, - }, - res, - ); - } - { - var in: Reader = .fixed(" foo q 1\r\n"); - var res = try parseSub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "foo", - .queue_group = "q", - .sid = "1", - }, - }, - res, - ); - } - { - var in: Reader = .fixed(" 1 q 1\r\n"); - var res = try parseSub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "1", - .queue_group = "q", - .sid = "1", - }, - }, - res, - ); - } - { - var in: Reader = .fixed(" $SRV.PING 4\r\n"); - var res = try parseSub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "$SRV.PING", - .queue_group = null, - .sid = "4", - }, - }, - res, - ); - } - { - var in: Reader = .fixed(" foo.echo q 10\r\n"); - var res = try parseSub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "foo.echo", - .queue_group = "q", - .sid = "10", - }, - }, - res, - ); - } -} - -fn parseUnsub(alloc: Allocator, in: *Reader) !Message { - const States = enum { - before_first, - in_first, - after_first, - in_second, - in_end, - }; - - var first: ArrayList(u8) = .empty; - errdefer first.deinit(alloc); - var second: ?ArrayList(u8) = null; - defer if (second) |*s| s.deinit(alloc); - - sw: switch (@as(States, .before_first)) { - .before_first => { - const byte = try in.peekByte(); - if (isWhitespace(byte)) { - in.toss(1); - continue :sw .before_first; - } - continue :sw .in_first; - }, - .in_first => { - const byte = try in.peekByte(); - if (!isWhitespace(byte)) { - try first.append(alloc, byte); - in.toss(1); - continue :sw .in_first; - } - continue :sw .after_first; - }, - .after_first => { - const byte = try in.peekByte(); - if (byte == '\r') { - continue :sw .in_end; - } else if (isWhitespace(byte)) { - in.toss(1); - continue :sw .after_first; - } - second = .empty; - continue :sw .in_second; - }, - .in_second => { - const byte = try in.peekByte(); - if (byte == '\r') { - continue :sw .in_end; - } - try second.?.append(alloc, byte); - in.toss(1); - continue :sw .in_second; - }, - .in_end => { - try expectStreamBytes(in, "\r\n"); - }, - } - - return .{ - .UNSUB = .{ - .sid = try first.toOwnedSlice(alloc), - .max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null, - }, - }; -} - -test parseUnsub { - const alloc = std.testing.allocator; - const expectEqualDeep = std.testing.expectEqualDeep; - const expectEqual = std.testing.expectEqual; - { - var in: Reader = .fixed(" 1\r\n"); - var res = try parseUnsub(alloc, &in); - defer res.UNSUB.deinit(alloc); - try expectEqualDeep( - Message{ - .UNSUB = .{ - .sid = "1", - .max_msgs = null, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); - } - - { - var in: Reader = .fixed(" 1 1\r\n"); - var res = try parseUnsub(alloc, &in); - defer res.UNSUB.deinit(alloc); - try expectEqualDeep( - Message{ - .UNSUB = .{ - .sid = "1", - .max_msgs = 1, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); - } -} - -fn parsePub(alloc: Allocator, in: *Reader) !Message { - try in.discardAll(1); // throw away space - - // Parse subject - const subject: []const u8 = try readSubject(alloc, in, .@"pub"); - errdefer alloc.free(subject); - - const States = enum { - before_second, - in_second, - after_second, - in_third, - in_end, - }; - - var second: ArrayList(u8) = .empty; - defer second.deinit(alloc); - var third: ?ArrayList(u8) = null; - defer if (third) |*t| t.deinit(alloc); - - sw: switch (@as(States, .before_second)) { - .before_second => { - // Drop whitespace - const byte = try in.peekByte(); - if (isWhitespace(byte)) { - in.toss(1); - continue :sw .before_second; - } - continue :sw .in_second; - }, - .in_second => { - for (1..in.buffer.len) |i| { - try in.fill(i + 1); - if (isWhitespace(in.buffered()[i])) { - @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); - in.toss(i); - break; - } - } else return error.EndOfStream; - continue :sw .after_second; - }, - .after_second => { - const byte = try in.peekByte(); - if (byte == '\r') { - continue :sw .in_end; - } else if (isWhitespace(byte)) { - in.toss(1); - continue :sw .after_second; - } - third = .empty; - continue :sw .in_third; - }, - .in_third => { - for (1..in.buffer.len) |i| { - try in.fill(i + 1); - if (isWhitespace(in.buffered()[i])) { - @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); - in.toss(i); - break; - } - } else return error.EndOfStream; - continue :sw .in_end; - }, - .in_end => { - try expectStreamBytes(in, "\r\n"); - }, - } - - const reply_to: ?[]const u8, const bytes: usize = - if (third) |t| .{ - try alloc.dupe(u8, second.items), - try parseUnsigned(usize, t.items, 10), - } else .{ - null, - try parseUnsigned(usize, second.items, 10), - }; - - const payload: Payload = try .read(alloc, in, bytes); - errdefer payload.deinit(alloc); - try expectStreamBytes(in, "\r\n"); - - return .{ - .PUB = .{ - .subject = subject, - .payload = payload, - .reply_to = reply_to, - }, - }; -} - -test parsePub { - const alloc = std.testing.allocator; - const expectEqualDeep = std.testing.expectEqualDeep; - const expectEqual = std.testing.expectEqual; - { - var in: Reader = .fixed(" foo 3\r\nbar\r\n"); - var res = try parsePub(alloc, &in); - defer res.PUB.deinit(alloc); - try expectEqualDeep( - Message{ - .PUB = .{ - .subject = "foo", - .reply_to = null, - .payload = .{ - .len = 3, - .short = blk: { - var s: [128]u8 = undefined; - @memcpy(s[0..3], "bar"); - break :blk s; - }, - .long = null, - }, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); - } - - { - var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n"); - var res = try parsePub(alloc, &in); - defer res.PUB.deinit(alloc); - try expectEqualDeep( - Message{ - .PUB = .{ - .subject = "foo", - .reply_to = "reply.to", - .payload = .{ - .len = 3, - .short = blk: { - var s: [128]u8 = undefined; - @memcpy(s[0..3], "bar"); - break :blk s; - }, - .long = null, - }, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); - } - - // numeric reply subject - { - var in: Reader = .fixed(" foo 5 3\r\nbar\r\n"); - var res = try parsePub(alloc, &in); - defer res.PUB.deinit(alloc); - try expectEqualDeep( - Message{ - .PUB = .{ - .subject = "foo", - .reply_to = "5", - .payload = .{ - .len = 3, - .short = blk: { - var s: [128]u8 = undefined; - @memcpy(s[0..3], "bar"); - break :blk s; - }, - .long = null, - }, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); - } -} - -fn parseHPub(alloc: Allocator, in: *Reader) !Message { - try in.discardAll(1); // throw away space - - // Parse subject - const subject: []const u8 = try readSubject(alloc, in, .@"pub"); - errdefer alloc.free(subject); - - const States = enum { - before_second, - in_second, - after_second, - in_third, - after_third, - in_fourth, - in_end, - }; - - var second: ArrayList(u8) = .empty; - defer second.deinit(alloc); - var third: ArrayList(u8) = .empty; - defer third.deinit(alloc); - var fourth: ?ArrayList(u8) = null; - defer if (fourth) |*f| f.deinit(alloc); - - sw: switch (@as(States, .before_second)) { - .before_second => { - // Drop whitespace - const byte = try in.peekByte(); - if (isWhitespace(byte)) { - in.toss(1); - continue :sw .before_second; - } - continue :sw .in_second; - }, - .in_second => { - for (1..in.buffer.len) |i| { - try in.fill(i + 1); - if (isWhitespace(in.buffered()[i])) { - @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); - in.toss(i); - break; - } - } else return error.EndOfStream; - continue :sw .after_second; - }, - .after_second => { - const byte = try in.peekByte(); - if (byte == '\r') { - continue :sw .in_end; - } else if (isWhitespace(byte)) { - in.toss(1); - continue :sw .after_second; - } - third = .empty; - continue :sw .in_third; - }, - .in_third => { - for (1..in.buffer.len) |i| { - try in.fill(i + 1); - if (isWhitespace(in.buffered()[i])) { - @memcpy(try third.addManyAsSlice(alloc, i), in.buffered()[0..i]); - in.toss(i); - break; - } - } else return error.EndOfStream; - continue :sw .after_third; - }, - .after_third => { - const byte = try in.peekByte(); - if (byte == '\r') { - continue :sw .in_end; - } else if (isWhitespace(byte)) { - in.toss(1); - continue :sw .after_third; - } - fourth = .empty; - continue :sw .in_fourth; - }, - .in_fourth => { - for (1..in.buffer.len) |i| { - try in.fill(i + 1); - if (isWhitespace(in.buffered()[i])) { - @memcpy(try fourth.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); - in.toss(i); - break; - } - } else return error.EndOfStream; - continue :sw .in_end; - }, - .in_end => { - try expectStreamBytes(in, "\r\n"); - }, - } - - const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize = - if (fourth) |f| .{ - try alloc.dupe(u8, second.items), - try parseUnsigned(usize, third.items, 10), - try parseUnsigned(usize, f.items, 10), - } else .{ - null, - try parseUnsigned(usize, second.items, 10), - try parseUnsigned(usize, third.items, 10), - }; - - const payload: Payload = try .read(alloc, in, total_bytes); - errdefer payload.deinit(alloc); - try expectStreamBytes(in, "\r\n"); - - return .{ - .HPUB = .{ - .header_bytes = header_bytes, - .@"pub" = .{ - .subject = subject, - .payload = payload, - .reply_to = reply_to, - }, - }, - }; -} - -test parseHPub { - const alloc = std.testing.allocator; - const expectEqualDeep = std.testing.expectEqualDeep; - const expectEqual = std.testing.expectEqual; - { - var in: Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); - var res = try parseHPub(alloc, &in); - defer res.HPUB.deinit(alloc); - try expectEqualDeep( - Message{ - .HPUB = .{ - .header_bytes = 22, - .@"pub" = .{ - .subject = "foo", - .reply_to = null, - .payload = .{ - .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, - .short = blk: { - var s: [128]u8 = undefined; - const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; - @memcpy(s[0..str.len], str); - break :blk s; - }, - .long = null, - }, - }, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); - } - - { - var in: Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); - var res = try parseHPub(alloc, &in); - defer res.HPUB.deinit(alloc); - try expectEqualDeep( - Message{ - .HPUB = .{ - .header_bytes = 22, - .@"pub" = .{ - .subject = "foo", - .reply_to = "reply.to", - .payload = .{ - .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, - .short = blk: { - var s: [128]u8 = undefined; - const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; - @memcpy(s[0..str.len], str); - break :blk s; - }, - .long = null, - }, - }, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); - } - - { - var in: Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); - var res = try parseHPub(alloc, &in); - defer res.HPUB.deinit(alloc); - try expectEqualDeep( - Message{ - .HPUB = .{ - .header_bytes = 22, - .@"pub" = .{ - .subject = "foo", - .reply_to = "6", - .payload = .{ - .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, - .short = blk: { - var s: [128]u8 = undefined; - const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; - @memcpy(s[0..str.len], str); - break :blk s; - }, - .long = null, - }, - }, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); - } -} - -fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 { - var subject_list: ArrayList(u8) = .empty; - errdefer subject_list.deinit(alloc); - - // Handle the first character - { - const byte = try in.takeByte(); - if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>'))) - return error.InvalidStream; - - try subject_list.append(alloc, byte); - } - - switch (pub_or_sub) { - .sub => { - while (in.takeByte()) |byte| { - if (isWhitespace(byte)) break; - if (byte == '.') { - const next_byte = try in.peekByte(); - if (next_byte == '.' or isWhitespace(next_byte)) - return error.InvalidStream; - } else if (byte == '>') { - const next_byte = try in.takeByte(); - if (!isWhitespace(next_byte)) - return error.InvalidStream; - } else if (byte == '*') { - const next_byte = try in.peekByte(); - if (next_byte != '.' and !isWhitespace(next_byte)) - return error.InvalidStream; - } - try subject_list.append(alloc, byte); - } else |err| return err; - }, - .@"pub" => { - while (in.takeByte()) |byte| { - if (isWhitespace(byte)) break; - if (byte == '*' or byte == '>') return error.InvalidStream; - if (byte == '.') { - const next_byte = try in.peekByte(); - if (next_byte == '.' or isWhitespace(next_byte)) - return error.InvalidStream; - } - try subject_list.append(alloc, byte); - } else |err| return err; - }, - } - - return subject_list.toOwnedSlice(alloc); -} - -inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void { - if (!std.mem.eql(u8, try reader.take(expected.len), expected)) { - @branchHint(.unlikely); - return error.InvalidStream; - } -} - -test "parsing a stream" { - const alloc = std.testing.allocator; - const expectEqualDeep = std.testing.expectEqualDeep; - const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":fa" ++ - "lse,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43" ++ - ".0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r" ++ - "\nPUB hi 3\r\nfoo\r\n"; - var reader: Reader = .fixed(input); - var arena: ArenaAllocator = .init(alloc); - defer arena.deinit(); - const gpa = arena.allocator(); - - { - const msg: Message = try Message.next(gpa, &reader); - const expected: Message = .{ - .CONNECT = .{ - .verbose = false, - .pedantic = false, - .tls_required = false, - .name = "NATS CLI Version v0.2.4", - .lang = "go", - .version = "1.43.0", - .protocol = 1, - .echo = true, - .headers = true, - .no_responders = true, - }, - }; - - try expectEqualDeep(expected, msg); - } - { - const msg: Message = try Message.next(gpa, &reader); - const expected: Message = .{ - .PUB = .{ - .subject = "hi", - .payload = .{ - .len = 3, - .short = blk: { - var s: [128]u8 = undefined; - const str = "foo"; - @memcpy(s[0..str.len], str); - break :blk s; - }, - .long = null, - }, - }, - }; - try expectEqualDeep(expected, msg); - } -} -- cgit