summaryrefslogtreecommitdiff
path: root/src/Server/parse.zig
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-06 21:56:39 -0500
committerRobby Zambito <contact@robbyzambito.me>2026-01-06 21:59:41 -0500
commit48969283527e0db6b71893b2b3f3bbeb21e522db (patch)
treedee322b777c79ef1cf7133bb65f226488b2cdab7 /src/Server/parse.zig
parentcc036318387cc5c44f2a0a2a1e28d067f3e6bdf6 (diff)
Major restructuring
This makes things much easier to use as a library
Diffstat (limited to 'src/Server/parse.zig')
-rw-r--r--src/Server/parse.zig835
1 files changed, 835 insertions, 0 deletions
diff --git a/src/Server/parse.zig b/src/Server/parse.zig
new file mode 100644
index 0000000..d58c0e5
--- /dev/null
+++ b/src/Server/parse.zig
@@ -0,0 +1,835 @@
+const std = @import("std");
+const ArenaAllocator = std.heap.ArenaAllocator;
+const Allocator = std.mem.Allocator;
+const ArrayList = std.ArrayList;
+const Reader = std.Io.Reader;
+const Writer = std.Io.Writer;
+const AllocatingWriter = std.Io.Writer.Allocating;
+const StaticStringMap = std.StaticStringMap;
+
+const log = std.log;
+
+const isDigit = std.ascii.isDigit;
+const isUpper = std.ascii.isUpper;
+const isWhitespace = std.ascii.isWhitespace;
+
+const parseUnsigned = std.fmt.parseUnsigned;
+
+const message = @import("./parse/message.zig");
+pub const Message = message.Message;
+pub const Payload = @import("./parse/Payload.zig");
+
+const client_types = StaticStringMap(message.Control).initComptime(
+ .{
+ // {"INFO", .info},
+ .{ @tagName(.CONNECT), .CONNECT },
+ .{ @tagName(.PUB), .PUB },
+ .{ @tagName(.HPUB), .HPUB },
+ .{ @tagName(.SUB), .SUB },
+ .{ @tagName(.UNSUB), .UNSUB },
+ // {"MSG", .msg},
+ // {"HMSG", .hmsg},
+ .{ @tagName(.PING), .PING },
+ .{ @tagName(.PONG), .PONG },
+ // {"+OK", .@"+ok"},
+ // {"-ERR", .@"-err"},
+ },
+);
+fn parseStaticStringMap(input: []const u8) ?message.Control {
+ return client_types.get(input);
+}
+
+/// Parse a string into its associated MessageType.
+const parse = parseStaticStringMap;
+
+/// Get the next Message from the input stream.
+pub fn next(alloc: Allocator, in: *Reader) !Message {
+ var operation_string: ArrayList(u8) = blk: {
+ comptime var buf_len = 0;
+ comptime {
+ for (client_types.keys()) |key| {
+ buf_len = @max(buf_len, key.len);
+ }
+ }
+ var buf: [buf_len]u8 = undefined;
+ break :blk .initBuffer(&buf);
+ };
+
+ while (in.peekByte()) |byte| {
+ if (isUpper(byte)) {
+ try operation_string.appendBounded(byte);
+ in.toss(1);
+ } else break;
+ } else |err| return err;
+
+ const operation = parse(operation_string.items) orelse {
+ log.err("Invalid operation: '{s}'", .{operation_string.items});
+ return error.InvalidOperation;
+ };
+
+ errdefer log.err("Failed to parse {s}", .{operation_string.items});
+
+ switch (operation) {
+ .CONNECT => return connect(alloc, in),
+ .PUB => {
+ @branchHint(.likely);
+ return @"pub"(alloc, in);
+ },
+ .HPUB => {
+ @branchHint(.likely);
+ return hpub(alloc, in);
+ },
+ .PING => {
+ try expectStreamBytes(in, "\r\n");
+ return .PING;
+ },
+ .PONG => {
+ try expectStreamBytes(in, "\r\n");
+ return .PONG;
+ },
+ .SUB => return sub(alloc, in),
+ .UNSUB => return unsub(alloc, in),
+ else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}),
+ }
+}
+
+pub fn connect(alloc: Allocator, in: *Reader) !Message {
+ // for storing the json string
+ var connect_string_writer_allocating: AllocatingWriter = .init(alloc);
+ defer connect_string_writer_allocating.deinit();
+ var connect_string_writer = &connect_string_writer_allocating.writer;
+
+ // for parsing the json string
+ var connect_arena_allocator: ArenaAllocator = .init(alloc);
+ defer connect_arena_allocator.deinit();
+ const connect_allocator = connect_arena_allocator.allocator();
+
+ try in.discardAll(1); // throw away space
+
+ // Should read the next JSON object to the fixed buffer writer.
+ _ = try in.streamDelimiter(connect_string_writer, '}');
+ try connect_string_writer.writeByte('}');
+ try expectStreamBytes(in, "}\r\n"); // discard '}\r\n'
+
+ const connect_str = try connect_string_writer_allocating.toOwnedSlice();
+ defer alloc.free(connect_str);
+ const res = try std.json.parseFromSliceLeaky(
+ Message.Connect,
+ connect_allocator,
+ connect_str,
+ .{ .allocate = .alloc_always },
+ );
+
+ return .{ .CONNECT = try res.dupe(alloc) };
+}
+
+pub fn sub(alloc: Allocator, in: *Reader) !Message {
+ try in.discardAll(1); // throw away space
+ const subject = try readSubject(alloc, in, .sub);
+
+ const States = enum {
+ before_second,
+ in_second,
+ after_second,
+ in_third,
+ in_end,
+ };
+
+ var second: ArrayList(u8) = .empty;
+ errdefer second.deinit(alloc);
+ var third: ?ArrayList(u8) = null;
+ errdefer if (third) |*t| t.deinit(alloc);
+
+ sw: switch (@as(States, .before_second)) {
+ .before_second => {
+ const byte = try in.peekByte();
+ if (isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .before_second;
+ }
+ continue :sw .in_second;
+ },
+ .in_second => {
+ for (1..in.buffer.len) |i| {
+ try in.fill(i + 1);
+ if (isWhitespace(in.buffered()[i])) {
+ @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]);
+ in.toss(i);
+ break;
+ }
+ } else return error.EndOfStream;
+ continue :sw .after_second;
+ },
+ .after_second => {
+ const byte = try in.peekByte();
+ if (byte == '\r') {
+ continue :sw .in_end;
+ } else if (isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .after_second;
+ }
+ third = .empty;
+ continue :sw .in_third;
+ },
+ .in_third => {
+ for (1..in.buffer.len) |i| {
+ try in.fill(i + 1);
+ if (isWhitespace(in.buffered()[i])) {
+ @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]);
+ in.toss(i);
+ break;
+ }
+ } else return error.EndOfStream;
+ continue :sw .in_end;
+ },
+ .in_end => {
+ try expectStreamBytes(in, "\r\n");
+ },
+ }
+
+ return .{
+ .SUB = .{
+ .subject = subject,
+ .queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null,
+ .sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc),
+ },
+ };
+}
+
+pub fn unsub(alloc: Allocator, in: *Reader) !Message {
+ const States = enum {
+ before_first,
+ in_first,
+ after_first,
+ in_second,
+ in_end,
+ };
+
+ var first: ArrayList(u8) = .empty;
+ errdefer first.deinit(alloc);
+ var second: ?ArrayList(u8) = null;
+ defer if (second) |*s| s.deinit(alloc);
+
+ sw: switch (@as(States, .before_first)) {
+ .before_first => {
+ const byte = try in.peekByte();
+ if (isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .before_first;
+ }
+ continue :sw .in_first;
+ },
+ .in_first => {
+ const byte = try in.peekByte();
+ if (!isWhitespace(byte)) {
+ try first.append(alloc, byte);
+ in.toss(1);
+ continue :sw .in_first;
+ }
+ continue :sw .after_first;
+ },
+ .after_first => {
+ const byte = try in.peekByte();
+ if (byte == '\r') {
+ continue :sw .in_end;
+ } else if (isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .after_first;
+ }
+ second = .empty;
+ continue :sw .in_second;
+ },
+ .in_second => {
+ const byte = try in.peekByte();
+ if (byte == '\r') {
+ continue :sw .in_end;
+ }
+ try second.?.append(alloc, byte);
+ in.toss(1);
+ continue :sw .in_second;
+ },
+ .in_end => {
+ try expectStreamBytes(in, "\r\n");
+ },
+ }
+
+ return .{
+ .UNSUB = .{
+ .sid = try first.toOwnedSlice(alloc),
+ .max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null,
+ },
+ };
+}
+
+pub fn @"pub"(alloc: Allocator, in: *Reader) !Message {
+ try in.discardAll(1); // throw away space
+
+ // Parse subject
+ const subject: []const u8 = try readSubject(alloc, in, .@"pub");
+ errdefer alloc.free(subject);
+
+ const States = enum {
+ before_second,
+ in_second,
+ after_second,
+ in_third,
+ in_end,
+ };
+
+ var second: ArrayList(u8) = .empty;
+ defer second.deinit(alloc);
+ var third: ?ArrayList(u8) = null;
+ defer if (third) |*t| t.deinit(alloc);
+
+ sw: switch (@as(States, .before_second)) {
+ .before_second => {
+ // Drop whitespace
+ const byte = try in.peekByte();
+ if (isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .before_second;
+ }
+ continue :sw .in_second;
+ },
+ .in_second => {
+ for (1..in.buffer.len) |i| {
+ try in.fill(i + 1);
+ if (isWhitespace(in.buffered()[i])) {
+ @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]);
+ in.toss(i);
+ break;
+ }
+ } else return error.EndOfStream;
+ continue :sw .after_second;
+ },
+ .after_second => {
+ const byte = try in.peekByte();
+ if (byte == '\r') {
+ continue :sw .in_end;
+ } else if (isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .after_second;
+ }
+ third = .empty;
+ continue :sw .in_third;
+ },
+ .in_third => {
+ for (1..in.buffer.len) |i| {
+ try in.fill(i + 1);
+ if (isWhitespace(in.buffered()[i])) {
+ @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]);
+ in.toss(i);
+ break;
+ }
+ } else return error.EndOfStream;
+ continue :sw .in_end;
+ },
+ .in_end => {
+ try expectStreamBytes(in, "\r\n");
+ },
+ }
+
+ const reply_to: ?[]const u8, const bytes: usize =
+ if (third) |t| .{
+ try alloc.dupe(u8, second.items),
+ try parseUnsigned(usize, t.items, 10),
+ } else .{
+ null,
+ try parseUnsigned(usize, second.items, 10),
+ };
+
+ const payload: Payload = try .read(alloc, in, bytes);
+ errdefer payload.deinit(alloc);
+ try expectStreamBytes(in, "\r\n");
+
+ return .{
+ .PUB = .{
+ .subject = subject,
+ .payload = payload,
+ .reply_to = reply_to,
+ },
+ };
+}
+
+pub fn hpub(alloc: Allocator, in: *Reader) !Message {
+ try in.discardAll(1); // throw away space
+
+ // Parse subject
+ const subject: []const u8 = try readSubject(alloc, in, .@"pub");
+ errdefer alloc.free(subject);
+
+ const States = enum {
+ before_second,
+ in_second,
+ after_second,
+ in_third,
+ after_third,
+ in_fourth,
+ in_end,
+ };
+
+ var second: ArrayList(u8) = .empty;
+ defer second.deinit(alloc);
+ var third: ArrayList(u8) = .empty;
+ defer third.deinit(alloc);
+ var fourth: ?ArrayList(u8) = null;
+ defer if (fourth) |*f| f.deinit(alloc);
+
+ sw: switch (@as(States, .before_second)) {
+ .before_second => {
+ // Drop whitespace
+ const byte = try in.peekByte();
+ if (isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .before_second;
+ }
+ continue :sw .in_second;
+ },
+ .in_second => {
+ for (1..in.buffer.len) |i| {
+ try in.fill(i + 1);
+ if (isWhitespace(in.buffered()[i])) {
+ @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]);
+ in.toss(i);
+ break;
+ }
+ } else return error.EndOfStream;
+ continue :sw .after_second;
+ },
+ .after_second => {
+ const byte = try in.peekByte();
+ if (byte == '\r') {
+ continue :sw .in_end;
+ } else if (isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .after_second;
+ }
+ third = .empty;
+ continue :sw .in_third;
+ },
+ .in_third => {
+ for (1..in.buffer.len) |i| {
+ try in.fill(i + 1);
+ if (isWhitespace(in.buffered()[i])) {
+ @memcpy(try third.addManyAsSlice(alloc, i), in.buffered()[0..i]);
+ in.toss(i);
+ break;
+ }
+ } else return error.EndOfStream;
+ continue :sw .after_third;
+ },
+ .after_third => {
+ const byte = try in.peekByte();
+ if (byte == '\r') {
+ continue :sw .in_end;
+ } else if (isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .after_third;
+ }
+ fourth = .empty;
+ continue :sw .in_fourth;
+ },
+ .in_fourth => {
+ for (1..in.buffer.len) |i| {
+ try in.fill(i + 1);
+ if (isWhitespace(in.buffered()[i])) {
+ @memcpy(try fourth.?.addManyAsSlice(alloc, i), in.buffered()[0..i]);
+ in.toss(i);
+ break;
+ }
+ } else return error.EndOfStream;
+ continue :sw .in_end;
+ },
+ .in_end => {
+ try expectStreamBytes(in, "\r\n");
+ },
+ }
+
+ const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize =
+ if (fourth) |f| .{
+ try alloc.dupe(u8, second.items),
+ try parseUnsigned(usize, third.items, 10),
+ try parseUnsigned(usize, f.items, 10),
+ } else .{
+ null,
+ try parseUnsigned(usize, second.items, 10),
+ try parseUnsigned(usize, third.items, 10),
+ };
+
+ const payload: Payload = try .read(alloc, in, total_bytes);
+ errdefer payload.deinit(alloc);
+ try expectStreamBytes(in, "\r\n");
+
+ return .{
+ .HPUB = .{
+ .header_bytes = header_bytes,
+ .@"pub" = .{
+ .subject = subject,
+ .payload = payload,
+ .reply_to = reply_to,
+ },
+ },
+ };
+}
+
+fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 {
+ var subject_list: ArrayList(u8) = .empty;
+ errdefer subject_list.deinit(alloc);
+
+ // Handle the first character
+ {
+ const byte = try in.takeByte();
+ if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>')))
+ return error.InvalidStream;
+
+ try subject_list.append(alloc, byte);
+ }
+
+ switch (pub_or_sub) {
+ .sub => {
+ while (in.takeByte()) |byte| {
+ if (isWhitespace(byte)) break;
+ if (byte == '.') {
+ const next_byte = try in.peekByte();
+ if (next_byte == '.' or isWhitespace(next_byte))
+ return error.InvalidStream;
+ } else if (byte == '>') {
+ const next_byte = try in.takeByte();
+ if (!isWhitespace(next_byte))
+ return error.InvalidStream;
+ } else if (byte == '*') {
+ const next_byte = try in.peekByte();
+ if (next_byte != '.' and !isWhitespace(next_byte))
+ return error.InvalidStream;
+ }
+ try subject_list.append(alloc, byte);
+ } else |err| return err;
+ },
+ .@"pub" => {
+ while (in.takeByte()) |byte| {
+ if (isWhitespace(byte)) break;
+ if (byte == '*' or byte == '>') return error.InvalidStream;
+ if (byte == '.') {
+ const next_byte = try in.peekByte();
+ if (next_byte == '.' or isWhitespace(next_byte))
+ return error.InvalidStream;
+ }
+ try subject_list.append(alloc, byte);
+ } else |err| return err;
+ },
+ }
+
+ return subject_list.toOwnedSlice(alloc);
+}
+
+inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void {
+ if (!std.mem.eql(u8, try reader.take(expected.len), expected)) {
+ @branchHint(.unlikely);
+ return error.InvalidStream;
+ }
+}
+
+test sub {
+ const alloc = std.testing.allocator;
+ const expectEqualDeep = std.testing.expectEqualDeep;
+ {
+ var in: Reader = .fixed(" foo 1\r\n");
+ var res = try sub(alloc, &in);
+ defer res.SUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .SUB = .{
+ .subject = "foo",
+ .queue_group = null,
+ .sid = "1",
+ },
+ },
+ res,
+ );
+ }
+ {
+ var in: Reader = .fixed(" foo 1\r\n");
+ var res = try sub(alloc, &in);
+ defer res.SUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .SUB = .{
+ .subject = "foo",
+ .queue_group = null,
+ .sid = "1",
+ },
+ },
+ res,
+ );
+ }
+ {
+ var in: Reader = .fixed(" foo q 1\r\n");
+ var res = try sub(alloc, &in);
+ defer res.SUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .SUB = .{
+ .subject = "foo",
+ .queue_group = "q",
+ .sid = "1",
+ },
+ },
+ res,
+ );
+ }
+ {
+ var in: Reader = .fixed(" 1 q 1\r\n");
+ var res = try sub(alloc, &in);
+ defer res.SUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .SUB = .{
+ .subject = "1",
+ .queue_group = "q",
+ .sid = "1",
+ },
+ },
+ res,
+ );
+ }
+ {
+ var in: Reader = .fixed(" $SRV.PING 4\r\n");
+ var res = try sub(alloc, &in);
+ defer res.SUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .SUB = .{
+ .subject = "$SRV.PING",
+ .queue_group = null,
+ .sid = "4",
+ },
+ },
+ res,
+ );
+ }
+ {
+ var in: Reader = .fixed(" foo.echo q 10\r\n");
+ var res = try sub(alloc, &in);
+ defer res.SUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .SUB = .{
+ .subject = "foo.echo",
+ .queue_group = "q",
+ .sid = "10",
+ },
+ },
+ res,
+ );
+ }
+}
+
+test unsub {
+ const alloc = std.testing.allocator;
+ const expectEqualDeep = std.testing.expectEqualDeep;
+ const expectEqual = std.testing.expectEqual;
+ {
+ var in: Reader = .fixed(" 1\r\n");
+ var res = try unsub(alloc, &in);
+ defer res.UNSUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .UNSUB = .{
+ .sid = "1",
+ .max_msgs = null,
+ },
+ },
+ res,
+ );
+ try expectEqual(0, in.buffered().len);
+ }
+
+ {
+ var in: Reader = .fixed(" 1 1\r\n");
+ var res = try unsub(alloc, &in);
+ defer res.UNSUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .UNSUB = .{
+ .sid = "1",
+ .max_msgs = 1,
+ },
+ },
+ res,
+ );
+ try expectEqual(0, in.buffered().len);
+ }
+}
+
+test @"pub" {
+ const alloc = std.testing.allocator;
+ const expectEqualDeep = std.testing.expectEqualDeep;
+ const expectEqual = std.testing.expectEqual;
+ {
+ var in: Reader = .fixed(" foo 3\r\nbar\r\n");
+ var res = try @"pub"(alloc, &in);
+ defer res.PUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .PUB = .{
+ .subject = "foo",
+ .reply_to = null,
+ .payload = .{
+ .len = 3,
+ .short = blk: {
+ var s: [128]u8 = undefined;
+ @memcpy(s[0..3], "bar");
+ break :blk s;
+ },
+ .long = null,
+ },
+ },
+ },
+ res,
+ );
+ try expectEqual(0, in.buffered().len);
+ }
+
+ {
+ var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n");
+ var res = try @"pub"(alloc, &in);
+ defer res.PUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .PUB = .{
+ .subject = "foo",
+ .reply_to = "reply.to",
+ .payload = .{
+ .len = 3,
+ .short = blk: {
+ var s: [128]u8 = undefined;
+ @memcpy(s[0..3], "bar");
+ break :blk s;
+ },
+ .long = null,
+ },
+ },
+ },
+ res,
+ );
+ try expectEqual(0, in.buffered().len);
+ }
+
+ // numeric reply subject
+ {
+ var in: Reader = .fixed(" foo 5 3\r\nbar\r\n");
+ var res = try @"pub"(alloc, &in);
+ defer res.PUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .PUB = .{
+ .subject = "foo",
+ .reply_to = "5",
+ .payload = .{
+ .len = 3,
+ .short = blk: {
+ var s: [128]u8 = undefined;
+ @memcpy(s[0..3], "bar");
+ break :blk s;
+ },
+ .long = null,
+ },
+ },
+ },
+ res,
+ );
+ try expectEqual(0, in.buffered().len);
+ }
+}
+
+test hpub {
+ const alloc = std.testing.allocator;
+ const expectEqualDeep = std.testing.expectEqualDeep;
+ const expectEqual = std.testing.expectEqual;
+ {
+ var in: Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
+ var res = try hpub(alloc, &in);
+ defer res.HPUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .HPUB = .{
+ .header_bytes = 22,
+ .@"pub" = .{
+ .subject = "foo",
+ .reply_to = null,
+ .payload = .{
+ .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len,
+ .short = blk: {
+ var s: [128]u8 = undefined;
+ const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!";
+ @memcpy(s[0..str.len], str);
+ break :blk s;
+ },
+ .long = null,
+ },
+ },
+ },
+ },
+ res,
+ );
+ try expectEqual(0, in.buffered().len);
+ }
+
+ {
+ var in: Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
+ var res = try hpub(alloc, &in);
+ defer res.HPUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .HPUB = .{
+ .header_bytes = 22,
+ .@"pub" = .{
+ .subject = "foo",
+ .reply_to = "reply.to",
+ .payload = .{
+ .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len,
+ .short = blk: {
+ var s: [128]u8 = undefined;
+ const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!";
+ @memcpy(s[0..str.len], str);
+ break :blk s;
+ },
+ .long = null,
+ },
+ },
+ },
+ },
+ res,
+ );
+ try expectEqual(0, in.buffered().len);
+ }
+
+ {
+ var in: Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
+ var res = try hpub(alloc, &in);
+ defer res.HPUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .HPUB = .{
+ .header_bytes = 22,
+ .@"pub" = .{
+ .subject = "foo",
+ .reply_to = "6",
+ .payload = .{
+ .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len,
+ .short = blk: {
+ var s: [128]u8 = undefined;
+ const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!";
+ @memcpy(s[0..str.len], str);
+ break :blk s;
+ },
+ .long = null,
+ },
+ },
+ },
+ },
+ res,
+ );
+ try expectEqual(0, in.buffered().len);
+ }
+}