summaryrefslogtreecommitdiff
path: root/src/Server/parse.zig
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-08 11:48:12 -0500
committerRobby Zambito <contact@robbyzambito.me>2026-01-08 16:47:52 -0500
commitd8488fde4902565f4ac8519565f234918dab6b11 (patch)
treeb50e523a32f4ccab4c5ec093a5dce24250f5b7b8 /src/Server/parse.zig
parent45feccbad8c7306c15137a6003f3df1183d9c2a9 (diff)
support hpub
fixed issue where not all data was being sent request reply has a performance issue but technically works
Diffstat (limited to 'src/Server/parse.zig')
-rw-r--r--src/Server/parse.zig911
1 files changed, 156 insertions, 755 deletions
diff --git a/src/Server/parse.zig b/src/Server/parse.zig
index 9035311..6ae99a5 100644
--- a/src/Server/parse.zig
+++ b/src/Server/parse.zig
@@ -44,10 +44,8 @@ pub fn control(in: *Reader) !message.Control {
}
break :blk min_len;
};
- // log.debug("buffered: '{s}'", .{in.buffered()});
std.debug.assert(in.buffer.len >= longest_ctrl);
while (true) {
- // log.debug("buffered l: '{s}'", .{in.buffered()});
var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r");
if (iter.next()) |str| {
if (client_control.get(str)) |ctrl| {
@@ -128,17 +126,19 @@ pub fn @"pub"(in: *Reader) !Message.Pub {
const bytes_str = second;
const bytes = try parseUnsigned(usize, bytes_str, 10);
- // 4 bytes for CRLF on either side of the payload.
- _ = try in.take(iter.index + 2);
- defer {
- _ = in.take(2) catch {
- log.warn("very bad parsing issue", .{});
- };
+ if (in.buffered().len < iter.index + bytes + 4) {
+ try in.fill(iter.index + bytes + 4);
+ // Fill may shift buffer, so we have to retokenize it.
+ continue;
}
+
+ // 4 bytes for CRLF on either side of the payload.
+ in.toss(iter.index + 2);
+ defer in.toss(2);
return .{
.subject = subject,
.reply_to = null,
- .payload = try in.take(bytes),
+ .payload = in.take(bytes) catch unreachable,
};
}
@@ -147,15 +147,20 @@ pub fn @"pub"(in: *Reader) !Message.Pub {
const bytes = try parseUnsigned(usize, bytes_str, 10);
if (in.buffered()[iter.index] == '\r') {
- if (iter.rest().len > bytes + 2) {
- // 4 bytes for CRLF on either side of the payload.
- _ = try in.take(iter.index + bytes + 4);
- return .{
- .subject = subject,
- .reply_to = reply_to,
- .payload = iter.rest()[1 .. 1 + bytes],
- };
+ if (in.buffered().len < iter.index + bytes + 4) {
+ try in.fill(iter.index + bytes + 4);
+ // Fill may shift buffer, so we have to retokenize it.
+ continue;
}
+
+ // 4 bytes for CRLF on either side of the payload.
+ in.toss(iter.index + 2);
+ defer in.toss(2);
+ return .{
+ .subject = subject,
+ .reply_to = reply_to,
+ .payload = in.take(bytes) catch unreachable,
+ };
}
}
}
@@ -265,8 +270,14 @@ pub fn sub(in: *Reader) !Message.Sub {
if (in.buffered()[iter.index] == '\r') {
const sid = second;
+ if (in.buffered().len < iter.index + 2) {
+ try in.fill(iter.index + 2);
+ // Fill may shift buffer, so we have to retokenize it.
+ continue;
+ }
+
// 2 bytes for CRLF at the end.
- _ = try in.take(iter.index + 2);
+ in.toss(iter.index + 2);
return .{
.subject = subject,
.queue_group = null,
@@ -277,8 +288,14 @@ pub fn sub(in: *Reader) !Message.Sub {
const queue_group = second;
if (iter.next()) |sid| {
if (in.buffered()[iter.index] == '\r') {
+ if (in.buffered().len < iter.index + 2) {
+ try in.fill(iter.index + 2);
+ // Fill may shift buffer, so we have to retokenize it.
+ continue;
+ }
+
// 2 bytes for CRLF at the end.
- _ = try in.take(iter.index + 2);
+ in.toss(iter.index + 2);
return .{
.subject = subject,
.queue_group = queue_group,
@@ -367,8 +384,14 @@ pub fn unsub(in: *Reader) !Message.Unsub {
if (iter.next()) |sid| {
if (in.buffered()[iter.index] == '\r') {
+ if (in.buffered().len < iter.index + 2) {
+ try in.fill(iter.index + 2);
+ // Fill may shift buffer, so we have to retokenize it.
+ continue;
+ }
+
// 2 bytes for CRLF at the end.
- _ = try in.take(iter.index + 2);
+ in.toss(iter.index + 2);
return .{
.sid = sid,
.max_msgs = null,
@@ -377,8 +400,16 @@ pub fn unsub(in: *Reader) !Message.Unsub {
if (iter.next()) |max_msgs_str| {
if (in.buffered()[iter.index] == '\r') {
const max_msgs = try parseUnsigned(usize, max_msgs_str, 10);
+
+ if (in.buffered().len < iter.index + 2) {
+ try in.fill(iter.index + 2);
+ // Fill may shift buffer, so we have to retokenize it.
+ continue;
+ }
+
// 2 bytes for CRLF at the end.
- _ = try in.take(iter.index + 2);
+ in.toss(iter.index + 2);
+
return .{
.sid = sid,
.max_msgs = max_msgs,
@@ -454,39 +485,113 @@ test unsub {
pub fn hpub(in: *Reader) !Message.HPub {
// TODO: Add pedantic option.
// See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1
- _ = in;
- @compileError("TODO");
-}
+ while (true) {
+ var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r");
+
+ if (iter.next()) |subject| {
+ if (iter.next()) |second| {
+ if (iter.next()) |third| {
+ if (in.buffered().len > iter.index) {
+ if (in.buffered()[iter.index] == '\r') {
+ const header_bytes_str = second;
+ const total_bytes_str = third;
-test hpub {}
+ const header_bytes = try parseUnsigned(usize, header_bytes_str, 10);
+ const total_bytes = try parseUnsigned(usize, total_bytes_str, 10);
-// /// Get the next Message from the input stream.
-// pub fn next(alloc: Allocator, in: *Reader) !Message {
-// errdefer log.err("Failed to parse {s}", .{operation_string.items});
+ if (in.buffered().len < iter.index + total_bytes + 4) {
+ try in.fill(iter.index + total_bytes + 4);
+ continue;
+ }
-// switch (operation) {
-// .CONNECT => return connect(alloc, in),
-// .PUB => {
-// @branchHint(.likely);
-// return @"pub"(alloc, in);
-// },
-// .HPUB => {
-// @branchHint(.likely);
-// return hpub(alloc, in);
-// },
-// .PING => {
-// try expectStreamBytes(in, "\r\n");
-// return .PING;
-// },
-// .PONG => {
-// try expectStreamBytes(in, "\r\n");
-// return .PONG;
-// },
-// .SUB => return sub(alloc, in),
-// .UNSUB => return unsub(alloc, in),
-// else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}),
-// }
-// }
+ // 4 bytes for CRLF on either side of headers and payload.
+ in.toss(iter.index + 2);
+ defer in.toss(2);
+ return .{
+ .header_bytes = header_bytes,
+ .@"pub" = .{
+ .subject = subject,
+ .reply_to = null,
+ .payload = in.take(total_bytes) catch unreachable,
+ },
+ };
+ }
+ }
+
+ const reply_to = second;
+ const header_bytes_str = third;
+ if (iter.next()) |total_bytes_str| {
+ if (in.buffered().len > iter.index) {
+ if (in.buffered()[iter.index] == '\r') {
+ const header_bytes = try parseUnsigned(usize, header_bytes_str, 10);
+ const total_bytes = try parseUnsigned(usize, total_bytes_str, 10);
+
+ if (in.buffered().len < iter.index + total_bytes + 4) {
+ try in.fill(iter.index + total_bytes + 4);
+ continue;
+ }
+
+ // 4 bytes for CRLF on either side of headers and payload.
+ in.toss(iter.index + 2);
+ defer in.toss(2);
+ return .{
+ .header_bytes = header_bytes,
+ .@"pub" = .{
+ .subject = subject,
+ .reply_to = reply_to,
+ .payload = in.take(total_bytes) catch unreachable,
+ },
+ };
+ }
+ }
+ }
+ }
+ }
+ }
+
+ try in.fillMore();
+ }
+}
+
+test hpub {
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = " foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.HPub{
+ .header_bytes = 22,
+ .@"pub" = .{
+ .subject = "foo",
+ .reply_to = null,
+ .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!",
+ },
+ },
+ try hpub(&in.interface),
+ );
+ try std.testing.expectEqualSlices(u8, "", in.interface.buffered());
+ }
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = " foo reply 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.HPub{
+ .header_bytes = 22,
+ .@"pub" = .{
+ .subject = "foo",
+ .reply_to = "reply",
+ .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!",
+ },
+ },
+ try hpub(&in.interface),
+ );
+ try std.testing.expectEqualSlices(u8, "", in.interface.buffered());
+ }
+ // TODO: more tests
+}
pub fn connect(alloc: Allocator, in: *Reader) !Message.Connect {
// for storing the json string
@@ -518,713 +623,9 @@ pub fn connect(alloc: Allocator, in: *Reader) !Message.Connect {
return res.dupe(alloc);
}
-// pub fn sub(alloc: Allocator, in: *Reader) !Message {
-// try in.discardAll(1); // throw away space
-// const subject = try readSubject(alloc, in, .sub);
-
-// const States = enum {
-// before_second,
-// in_second,
-// after_second,
-// in_third,
-// in_end,
-// };
-
-// var second: ArrayList(u8) = .empty;
-// errdefer second.deinit(alloc);
-// var third: ?ArrayList(u8) = null;
-// errdefer if (third) |*t| t.deinit(alloc);
-
-// sw: switch (@as(States, .before_second)) {
-// .before_second => {
-// const byte = try in.peekByte();
-// if (isWhitespace(byte)) {
-// in.toss(1);
-// continue :sw .before_second;
-// }
-// continue :sw .in_second;
-// },
-// .in_second => {
-// for (1..in.buffer.len) |i| {
-// try in.fill(i + 1);
-// if (isWhitespace(in.buffered()[i])) {
-// @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]);
-// in.toss(i);
-// break;
-// }
-// } else return error.EndOfStream;
-// continue :sw .after_second;
-// },
-// .after_second => {
-// const byte = try in.peekByte();
-// if (byte == '\r') {
-// continue :sw .in_end;
-// } else if (isWhitespace(byte)) {
-// in.toss(1);
-// continue :sw .after_second;
-// }
-// third = .empty;
-// continue :sw .in_third;
-// },
-// .in_third => {
-// for (1..in.buffer.len) |i| {
-// try in.fill(i + 1);
-// if (isWhitespace(in.buffered()[i])) {
-// @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]);
-// in.toss(i);
-// break;
-// }
-// } else return error.EndOfStream;
-// continue :sw .in_end;
-// },
-// .in_end => {
-// try expectStreamBytes(in, "\r\n");
-// },
-// }
-
-// return .{
-// .SUB = .{
-// .subject = subject,
-// .queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null,
-// .sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc),
-// },
-// };
-// }
-
-// pub fn unsub(alloc: Allocator, in: *Reader) !Message {
-// const States = enum {
-// before_first,
-// in_first,
-// after_first,
-// in_second,
-// in_end,
-// };
-
-// var first: ArrayList(u8) = .empty;
-// errdefer first.deinit(alloc);
-// var second: ?ArrayList(u8) = null;
-// defer if (second) |*s| s.deinit(alloc);
-
-// sw: switch (@as(States, .before_first)) {
-// .before_first => {
-// const byte = try in.peekByte();
-// if (isWhitespace(byte)) {
-// in.toss(1);
-// continue :sw .before_first;
-// }
-// continue :sw .in_first;
-// },
-// .in_first => {
-// const byte = try in.peekByte();
-// if (!isWhitespace(byte)) {
-// try first.append(alloc, byte);
-// in.toss(1);
-// continue :sw .in_first;
-// }
-// continue :sw .after_first;
-// },
-// .after_first => {
-// const byte = try in.peekByte();
-// if (byte == '\r') {
-// continue :sw .in_end;
-// } else if (isWhitespace(byte)) {
-// in.toss(1);
-// continue :sw .after_first;
-// }
-// second = .empty;
-// continue :sw .in_second;
-// },
-// .in_second => {
-// const byte = try in.peekByte();
-// if (byte == '\r') {
-// continue :sw .in_end;
-// }
-// try second.?.append(alloc, byte);
-// in.toss(1);
-// continue :sw .in_second;
-// },
-// .in_end => {
-// try expectStreamBytes(in, "\r\n");
-// },
-// }
-
-// return .{
-// .UNSUB = .{
-// .sid = try first.toOwnedSlice(alloc),
-// .max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null,
-// },
-// };
-// }
-
-// pub fn @"pub"(alloc: Allocator, in: *Reader) !Message {
-// try in.discardAll(1); // throw away space
-
-// // Parse subject
-// const subject: []const u8 = try readSubject(alloc, in, .@"pub");
-// errdefer alloc.free(subject);
-
-// const States = enum {
-// before_second,
-// in_second,
-// after_second,
-// in_third,
-// in_end,
-// };
-
-// var second: ArrayList(u8) = .empty;
-// defer second.deinit(alloc);
-// var third: ?ArrayList(u8) = null;
-// defer if (third) |*t| t.deinit(alloc);
-
-// sw: switch (@as(States, .before_second)) {
-// .before_second => {
-// // Drop whitespace
-// const byte = try in.peekByte();
-// if (isWhitespace(byte)) {
-// in.toss(1);
-// continue :sw .before_second;
-// }
-// continue :sw .in_second;
-// },
-// .in_second => {
-// for (1..in.buffer.len) |i| {
-// try in.fill(i + 1);
-// if (isWhitespace(in.buffered()[i])) {
-// @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]);
-// in.toss(i);
-// break;
-// }
-// } else return error.EndOfStream;
-// continue :sw .after_second;
-// },
-// .after_second => {
-// const byte = try in.peekByte();
-// if (byte == '\r') {
-// continue :sw .in_end;
-// } else if (isWhitespace(byte)) {
-// in.toss(1);
-// continue :sw .after_second;
-// }
-// third = .empty;
-// continue :sw .in_third;
-// },
-// .in_third => {
-// for (1..in.buffer.len) |i| {
-// try in.fill(i + 1);
-// if (isWhitespace(in.buffered()[i])) {
-// @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]);
-// in.toss(i);
-// break;
-// }
-// } else return error.EndOfStream;
-// continue :sw .in_end;
-// },
-// .in_end => {
-// try expectStreamBytes(in, "\r\n");
-// },
-// }
-
-// const reply_to: ?[]const u8, const bytes: usize =
-// if (third) |t| .{
-// try alloc.dupe(u8, second.items),
-// try parseUnsigned(usize, t.items, 10),
-// } else .{
-// null,
-// try parseUnsigned(usize, second.items, 10),
-// };
-
-// const payload: Payload = try .read(alloc, in, bytes);
-// errdefer payload.deinit(alloc);
-// try expectStreamBytes(in, "\r\n");
-
-// return .{
-// .PUB = .{
-// .subject = subject,
-// .payload = payload,
-// .reply_to = reply_to,
-// },
-// };
-// }
-
-// pub fn hpub(alloc: Allocator, in: *Reader) !Message {
-// try in.discardAll(1); // throw away space
-
-// // Parse subject
-// const subject: []const u8 = try readSubject(alloc, in, .@"pub");
-// errdefer alloc.free(subject);
-
-// const States = enum {
-// before_second,
-// in_second,
-// after_second,
-// in_third,
-// after_third,
-// in_fourth,
-// in_end,
-// };
-
-// var second: ArrayList(u8) = .empty;
-// defer second.deinit(alloc);
-// var third: ArrayList(u8) = .empty;
-// defer third.deinit(alloc);
-// var fourth: ?ArrayList(u8) = null;
-// defer if (fourth) |*f| f.deinit(alloc);
-
-// sw: switch (@as(States, .before_second)) {
-// .before_second => {
-// // Drop whitespace
-// const byte = try in.peekByte();
-// if (isWhitespace(byte)) {
-// in.toss(1);
-// continue :sw .before_second;
-// }
-// continue :sw .in_second;
-// },
-// .in_second => {
-// for (1..in.buffer.len) |i| {
-// try in.fill(i + 1);
-// if (isWhitespace(in.buffered()[i])) {
-// @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]);
-// in.toss(i);
-// break;
-// }
-// } else return error.EndOfStream;
-// continue :sw .after_second;
-// },
-// .after_second => {
-// const byte = try in.peekByte();
-// if (byte == '\r') {
-// continue :sw .in_end;
-// } else if (isWhitespace(byte)) {
-// in.toss(1);
-// continue :sw .after_second;
-// }
-// third = .empty;
-// continue :sw .in_third;
-// },
-// .in_third => {
-// for (1..in.buffer.len) |i| {
-// try in.fill(i + 1);
-// if (isWhitespace(in.buffered()[i])) {
-// @memcpy(try third.addManyAsSlice(alloc, i), in.buffered()[0..i]);
-// in.toss(i);
-// break;
-// }
-// } else return error.EndOfStream;
-// continue :sw .after_third;
-// },
-// .after_third => {
-// const byte = try in.peekByte();
-// if (byte == '\r') {
-// continue :sw .in_end;
-// } else if (isWhitespace(byte)) {
-// in.toss(1);
-// continue :sw .after_third;
-// }
-// fourth = .empty;
-// continue :sw .in_fourth;
-// },
-// .in_fourth => {
-// for (1..in.buffer.len) |i| {
-// try in.fill(i + 1);
-// if (isWhitespace(in.buffered()[i])) {
-// @memcpy(try fourth.?.addManyAsSlice(alloc, i), in.buffered()[0..i]);
-// in.toss(i);
-// break;
-// }
-// } else return error.EndOfStream;
-// continue :sw .in_end;
-// },
-// .in_end => {
-// try expectStreamBytes(in, "\r\n");
-// },
-// }
-
-// const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize =
-// if (fourth) |f| .{
-// try alloc.dupe(u8, second.items),
-// try parseUnsigned(usize, third.items, 10),
-// try parseUnsigned(usize, f.items, 10),
-// } else .{
-// null,
-// try parseUnsigned(usize, second.items, 10),
-// try parseUnsigned(usize, third.items, 10),
-// };
-
-// const payload: Payload = try .read(alloc, in, total_bytes);
-// errdefer payload.deinit(alloc);
-// try expectStreamBytes(in, "\r\n");
-
-// return .{
-// .HPUB = .{
-// .header_bytes = header_bytes,
-// .@"pub" = .{
-// .subject = subject,
-// .payload = payload,
-// .reply_to = reply_to,
-// },
-// },
-// };
-// }
-
-// fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 {
-// var subject_list: ArrayList(u8) = .empty;
-// errdefer subject_list.deinit(alloc);
-
-// // Handle the first character
-// {
-// const byte = try in.takeByte();
-// if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>')))
-// return error.InvalidStream;
-
-// try subject_list.append(alloc, byte);
-// }
-
-// switch (pub_or_sub) {
-// .sub => {
-// while (in.takeByte()) |byte| {
-// if (isWhitespace(byte)) break;
-// if (byte == '.') {
-// const next_byte = try in.peekByte();
-// if (next_byte == '.' or isWhitespace(next_byte))
-// return error.InvalidStream;
-// } else if (byte == '>') {
-// const next_byte = try in.takeByte();
-// if (!isWhitespace(next_byte))
-// return error.InvalidStream;
-// } else if (byte == '*') {
-// const next_byte = try in.peekByte();
-// if (next_byte != '.' and !isWhitespace(next_byte))
-// return error.InvalidStream;
-// }
-// try subject_list.append(alloc, byte);
-// } else |err| return err;
-// },
-// .@"pub" => {
-// while (in.takeByte()) |byte| {
-// if (isWhitespace(byte)) break;
-// if (byte == '*' or byte == '>') return error.InvalidStream;
-// if (byte == '.') {
-// const next_byte = try in.peekByte();
-// if (next_byte == '.' or isWhitespace(next_byte))
-// return error.InvalidStream;
-// }
-// try subject_list.append(alloc, byte);
-// } else |err| return err;
-// },
-// }
-
-// return subject_list.toOwnedSlice(alloc);
-// }
-
inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void {
if (!std.mem.eql(u8, try reader.take(expected.len), expected)) {
@branchHint(.unlikely);
return error.InvalidStream;
}
}
-
-// test sub {
-// const alloc = std.testing.allocator;
-// const expectEqualDeep = std.testing.expectEqualDeep;
-// {
-// var in: Reader = .fixed(" foo 1\r\n");
-// var res = try sub(alloc, &in);
-// defer res.SUB.deinit(alloc);
-// try expectEqualDeep(
-// Message{
-// .SUB = .{
-// .subject = "foo",
-// .queue_group = null,
-// .sid = "1",
-// },
-// },
-// res,
-// );
-// }
-// {
-// var in: Reader = .fixed(" foo 1\r\n");
-// var res = try sub(alloc, &in);
-// defer res.SUB.deinit(alloc);
-// try expectEqualDeep(
-// Message{
-// .SUB = .{
-// .subject = "foo",
-// .queue_group = null,
-// .sid = "1",
-// },
-// },
-// res,
-// );
-// }
-// {
-// var in: Reader = .fixed(" foo q 1\r\n");
-// var res = try sub(alloc, &in);
-// defer res.SUB.deinit(alloc);
-// try expectEqualDeep(
-// Message{
-// .SUB = .{
-// .subject = "foo",
-// .queue_group = "q",
-// .sid = "1",
-// },
-// },
-// res,
-// );
-// }
-// {
-// var in: Reader = .fixed(" 1 q 1\r\n");
-// var res = try sub(alloc, &in);
-// defer res.SUB.deinit(alloc);
-// try expectEqualDeep(
-// Message{
-// .SUB = .{
-// .subject = "1",
-// .queue_group = "q",
-// .sid = "1",
-// },
-// },
-// res,
-// );
-// }
-// {
-// var in: Reader = .fixed(" $SRV.PING 4\r\n");
-// var res = try sub(alloc, &in);
-// defer res.SUB.deinit(alloc);
-// try expectEqualDeep(
-// Message{
-// .SUB = .{
-// .subject = "$SRV.PING",
-// .queue_group = null,
-// .sid = "4",
-// },
-// },
-// res,
-// );
-// }
-// {
-// var in: Reader = .fixed(" foo.echo q 10\r\n");
-// var res = try sub(alloc, &in);
-// defer res.SUB.deinit(alloc);
-// try expectEqualDeep(
-// Message{
-// .SUB = .{
-// .subject = "foo.echo",
-// .queue_group = "q",
-// .sid = "10",
-// },
-// },
-// res,
-// );
-// }
-// }
-
-// test unsub {
-// const alloc = std.testing.allocator;
-// const expectEqualDeep = std.testing.expectEqualDeep;
-// const expectEqual = std.testing.expectEqual;
-// {
-// var in: Reader = .fixed(" 1\r\n");
-// var res = try unsub(alloc, &in);
-// defer res.UNSUB.deinit(alloc);
-// try expectEqualDeep(
-// Message{
-// .UNSUB = .{
-// .sid = "1",
-// .max_msgs = null,
-// },
-// },
-// res,
-// );
-// try expectEqual(0, in.buffered().len);
-// }
-
-// {
-// var in: Reader = .fixed(" 1 1\r\n");
-// var res = try unsub(alloc, &in);
-// defer res.UNSUB.deinit(alloc);
-// try expectEqualDeep(
-// Message{
-// .UNSUB = .{
-// .sid = "1",
-// .max_msgs = 1,
-// },
-// },
-// res,
-// );
-// try expectEqual(0, in.buffered().len);
-// }
-// }
-
-// test @"pub" {
-// const alloc = std.testing.allocator;
-// const expectEqualDeep = std.testing.expectEqualDeep;
-// const expectEqual = std.testing.expectEqual;
-// {
-// var in: Reader = .fixed(" foo 3\r\nbar\r\n");
-// var res = try @"pub"(alloc, &in);
-// defer res.PUB.deinit(alloc);
-// try expectEqualDeep(
-// Message{
-// .PUB = .{
-// .subject = "foo",
-// .reply_to = null,
-// .payload = .{
-// .len = 3,
-// .short = blk: {
-// var s: [128]u8 = undefined;
-// @memcpy(s[0..3], "bar");
-// break :blk s;
-// },
-// .long = null,
-// },
-// },
-// },
-// res,
-// );
-// try expectEqual(0, in.buffered().len);
-// }
-
-// {
-// var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n");
-// var res = try @"pub"(alloc, &in);
-// defer res.PUB.deinit(alloc);
-// try expectEqualDeep(
-// Message{
-// .PUB = .{
-// .subject = "foo",
-// .reply_to = "reply.to",
-// .payload = .{
-// .len = 3,
-// .short = blk: {
-// var s: [128]u8 = undefined;
-// @memcpy(s[0..3], "bar");
-// break :blk s;
-// },
-// .long = null,
-// },
-// },
-// },
-// res,
-// );
-// try expectEqual(0, in.buffered().len);
-// }
-
-// // numeric reply subject
-// {
-// var in: Reader = .fixed(" foo 5 3\r\nbar\r\n");
-// var res = try @"pub"(alloc, &in);
-// defer res.PUB.deinit(alloc);
-// try expectEqualDeep(
-// Message{
-// .PUB = .{
-// .subject = "foo",
-// .reply_to = "5",
-// .payload = .{
-// .len = 3,
-// .short = blk: {
-// var s: [128]u8 = undefined;
-// @memcpy(s[0..3], "bar");
-// break :blk s;
-// },
-// .long = null,
-// },
-// },
-// },
-// res,
-// );
-// try expectEqual(0, in.buffered().len);
-// }
-// }
-
-// test hpub {
-// const alloc = std.testing.allocator;
-// const expectEqualDeep = std.testing.expectEqualDeep;
-// const expectEqual = std.testing.expectEqual;
-// {
-// var in: Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
-// var res = try hpub(alloc, &in);
-// defer res.HPUB.deinit(alloc);
-// try expectEqualDeep(
-// Message{
-// .HPUB = .{
-// .header_bytes = 22,
-// .@"pub" = .{
-// .subject = "foo",
-// .reply_to = null,
-// .payload = .{
-// .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len,
-// .short = blk: {
-// var s: [128]u8 = undefined;
-// const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!";
-// @memcpy(s[0..str.len], str);
-// break :blk s;
-// },
-// .long = null,
-// },
-// },
-// },
-// },
-// res,
-// );
-// try expectEqual(0, in.buffered().len);
-// }
-
-// {
-// var in: Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
-// var res = try hpub(alloc, &in);
-// defer res.HPUB.deinit(alloc);
-// try expectEqualDeep(
-// Message{
-// .HPUB = .{
-// .header_bytes = 22,
-// .@"pub" = .{
-// .subject = "foo",
-// .reply_to = "reply.to",
-// .payload = .{
-// .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len,
-// .short = blk: {
-// var s: [128]u8 = undefined;
-// const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!";
-// @memcpy(s[0..str.len], str);
-// break :blk s;
-// },
-// .long = null,
-// },
-// },
-// },
-// },
-// res,
-// );
-// try expectEqual(0, in.buffered().len);
-// }
-
-// {
-// var in: Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
-// var res = try hpub(alloc, &in);
-// defer res.HPUB.deinit(alloc);
-// try expectEqualDeep(
-// Message{
-// .HPUB = .{
-// .header_bytes = 22,
-// .@"pub" = .{
-// .subject = "foo",
-// .reply_to = "6",
-// .payload = .{
-// .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len,
-// .short = blk: {
-// var s: [128]u8 = undefined;
-// const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!";
-// @memcpy(s[0..str.len], str);
-// break :blk s;
-// },
-// .long = null,
-// },
-// },
-// },
-// },
-// res,
-// );
-// try expectEqual(0, in.buffered().len);
-// }
-// }