From d8488fde4902565f4ac8519565f234918dab6b11 Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Thu, 8 Jan 2026 11:48:12 -0500 Subject: support hpub fixed issue where not all data was being sent request reply has a performance issue but technically works --- src/Server/parse.zig | 911 +++++++++------------------------------------------ 1 file changed, 156 insertions(+), 755 deletions(-) (limited to 'src/Server/parse.zig') diff --git a/src/Server/parse.zig b/src/Server/parse.zig index 9035311..6ae99a5 100644 --- a/src/Server/parse.zig +++ b/src/Server/parse.zig @@ -44,10 +44,8 @@ pub fn control(in: *Reader) !message.Control { } break :blk min_len; }; - // log.debug("buffered: '{s}'", .{in.buffered()}); std.debug.assert(in.buffer.len >= longest_ctrl); while (true) { - // log.debug("buffered l: '{s}'", .{in.buffered()}); var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); if (iter.next()) |str| { if (client_control.get(str)) |ctrl| { @@ -128,17 +126,19 @@ pub fn @"pub"(in: *Reader) !Message.Pub { const bytes_str = second; const bytes = try parseUnsigned(usize, bytes_str, 10); - // 4 bytes for CRLF on either side of the payload. - _ = try in.take(iter.index + 2); - defer { - _ = in.take(2) catch { - log.warn("very bad parsing issue", .{}); - }; + if (in.buffered().len < iter.index + bytes + 4) { + try in.fill(iter.index + bytes + 4); + // Fill may shift buffer, so we have to retokenize it. + continue; } + + // 4 bytes for CRLF on either side of the payload. + in.toss(iter.index + 2); + defer in.toss(2); return .{ .subject = subject, .reply_to = null, - .payload = try in.take(bytes), + .payload = in.take(bytes) catch unreachable, }; } @@ -147,15 +147,20 @@ pub fn @"pub"(in: *Reader) !Message.Pub { const bytes = try parseUnsigned(usize, bytes_str, 10); if (in.buffered()[iter.index] == '\r') { - if (iter.rest().len > bytes + 2) { - // 4 bytes for CRLF on either side of the payload. - _ = try in.take(iter.index + bytes + 4); - return .{ - .subject = subject, - .reply_to = reply_to, - .payload = iter.rest()[1 .. 1 + bytes], - }; + if (in.buffered().len < iter.index + bytes + 4) { + try in.fill(iter.index + bytes + 4); + // Fill may shift buffer, so we have to retokenize it. + continue; } + + // 4 bytes for CRLF on either side of the payload. + in.toss(iter.index + 2); + defer in.toss(2); + return .{ + .subject = subject, + .reply_to = reply_to, + .payload = in.take(bytes) catch unreachable, + }; } } } @@ -265,8 +270,14 @@ pub fn sub(in: *Reader) !Message.Sub { if (in.buffered()[iter.index] == '\r') { const sid = second; + if (in.buffered().len < iter.index + 2) { + try in.fill(iter.index + 2); + // Fill may shift buffer, so we have to retokenize it. + continue; + } + // 2 bytes for CRLF at the end. - _ = try in.take(iter.index + 2); + in.toss(iter.index + 2); return .{ .subject = subject, .queue_group = null, @@ -277,8 +288,14 @@ pub fn sub(in: *Reader) !Message.Sub { const queue_group = second; if (iter.next()) |sid| { if (in.buffered()[iter.index] == '\r') { + if (in.buffered().len < iter.index + 2) { + try in.fill(iter.index + 2); + // Fill may shift buffer, so we have to retokenize it. + continue; + } + // 2 bytes for CRLF at the end. - _ = try in.take(iter.index + 2); + in.toss(iter.index + 2); return .{ .subject = subject, .queue_group = queue_group, @@ -367,8 +384,14 @@ pub fn unsub(in: *Reader) !Message.Unsub { if (iter.next()) |sid| { if (in.buffered()[iter.index] == '\r') { + if (in.buffered().len < iter.index + 2) { + try in.fill(iter.index + 2); + // Fill may shift buffer, so we have to retokenize it. + continue; + } + // 2 bytes for CRLF at the end. - _ = try in.take(iter.index + 2); + in.toss(iter.index + 2); return .{ .sid = sid, .max_msgs = null, @@ -377,8 +400,16 @@ pub fn unsub(in: *Reader) !Message.Unsub { if (iter.next()) |max_msgs_str| { if (in.buffered()[iter.index] == '\r') { const max_msgs = try parseUnsigned(usize, max_msgs_str, 10); + + if (in.buffered().len < iter.index + 2) { + try in.fill(iter.index + 2); + // Fill may shift buffer, so we have to retokenize it. + continue; + } + // 2 bytes for CRLF at the end. - _ = try in.take(iter.index + 2); + in.toss(iter.index + 2); + return .{ .sid = sid, .max_msgs = max_msgs, @@ -454,39 +485,113 @@ test unsub { pub fn hpub(in: *Reader) !Message.HPub { // TODO: Add pedantic option. // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1 - _ = in; - @compileError("TODO"); -} + while (true) { + var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); + + if (iter.next()) |subject| { + if (iter.next()) |second| { + if (iter.next()) |third| { + if (in.buffered().len > iter.index) { + if (in.buffered()[iter.index] == '\r') { + const header_bytes_str = second; + const total_bytes_str = third; -test hpub {} + const header_bytes = try parseUnsigned(usize, header_bytes_str, 10); + const total_bytes = try parseUnsigned(usize, total_bytes_str, 10); -// /// Get the next Message from the input stream. -// pub fn next(alloc: Allocator, in: *Reader) !Message { -// errdefer log.err("Failed to parse {s}", .{operation_string.items}); + if (in.buffered().len < iter.index + total_bytes + 4) { + try in.fill(iter.index + total_bytes + 4); + continue; + } -// switch (operation) { -// .CONNECT => return connect(alloc, in), -// .PUB => { -// @branchHint(.likely); -// return @"pub"(alloc, in); -// }, -// .HPUB => { -// @branchHint(.likely); -// return hpub(alloc, in); -// }, -// .PING => { -// try expectStreamBytes(in, "\r\n"); -// return .PING; -// }, -// .PONG => { -// try expectStreamBytes(in, "\r\n"); -// return .PONG; -// }, -// .SUB => return sub(alloc, in), -// .UNSUB => return unsub(alloc, in), -// else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}), -// } -// } + // 4 bytes for CRLF on either side of headers and payload. + in.toss(iter.index + 2); + defer in.toss(2); + return .{ + .header_bytes = header_bytes, + .@"pub" = .{ + .subject = subject, + .reply_to = null, + .payload = in.take(total_bytes) catch unreachable, + }, + }; + } + } + + const reply_to = second; + const header_bytes_str = third; + if (iter.next()) |total_bytes_str| { + if (in.buffered().len > iter.index) { + if (in.buffered()[iter.index] == '\r') { + const header_bytes = try parseUnsigned(usize, header_bytes_str, 10); + const total_bytes = try parseUnsigned(usize, total_bytes_str, 10); + + if (in.buffered().len < iter.index + total_bytes + 4) { + try in.fill(iter.index + total_bytes + 4); + continue; + } + + // 4 bytes for CRLF on either side of headers and payload. + in.toss(iter.index + 2); + defer in.toss(2); + return .{ + .header_bytes = header_bytes, + .@"pub" = .{ + .subject = subject, + .reply_to = reply_to, + .payload = in.take(total_bytes) catch unreachable, + }, + }; + } + } + } + } + } + } + + try in.fillMore(); + } +} + +test hpub { + { + var buf: [64]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = " foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n" }, + }); + try std.testing.expectEqualDeep( + Message.HPub{ + .header_bytes = 22, + .@"pub" = .{ + .subject = "foo", + .reply_to = null, + .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!", + }, + }, + try hpub(&in.interface), + ); + try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); + } + { + var buf: [64]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = " foo reply 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n" }, + }); + try std.testing.expectEqualDeep( + Message.HPub{ + .header_bytes = 22, + .@"pub" = .{ + .subject = "foo", + .reply_to = "reply", + .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!", + }, + }, + try hpub(&in.interface), + ); + try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); + } + // TODO: more tests +} pub fn connect(alloc: Allocator, in: *Reader) !Message.Connect { // for storing the json string @@ -518,713 +623,9 @@ pub fn connect(alloc: Allocator, in: *Reader) !Message.Connect { return res.dupe(alloc); } -// pub fn sub(alloc: Allocator, in: *Reader) !Message { -// try in.discardAll(1); // throw away space -// const subject = try readSubject(alloc, in, .sub); - -// const States = enum { -// before_second, -// in_second, -// after_second, -// in_third, -// in_end, -// }; - -// var second: ArrayList(u8) = .empty; -// errdefer second.deinit(alloc); -// var third: ?ArrayList(u8) = null; -// errdefer if (third) |*t| t.deinit(alloc); - -// sw: switch (@as(States, .before_second)) { -// .before_second => { -// const byte = try in.peekByte(); -// if (isWhitespace(byte)) { -// in.toss(1); -// continue :sw .before_second; -// } -// continue :sw .in_second; -// }, -// .in_second => { -// for (1..in.buffer.len) |i| { -// try in.fill(i + 1); -// if (isWhitespace(in.buffered()[i])) { -// @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); -// in.toss(i); -// break; -// } -// } else return error.EndOfStream; -// continue :sw .after_second; -// }, -// .after_second => { -// const byte = try in.peekByte(); -// if (byte == '\r') { -// continue :sw .in_end; -// } else if (isWhitespace(byte)) { -// in.toss(1); -// continue :sw .after_second; -// } -// third = .empty; -// continue :sw .in_third; -// }, -// .in_third => { -// for (1..in.buffer.len) |i| { -// try in.fill(i + 1); -// if (isWhitespace(in.buffered()[i])) { -// @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); -// in.toss(i); -// break; -// } -// } else return error.EndOfStream; -// continue :sw .in_end; -// }, -// .in_end => { -// try expectStreamBytes(in, "\r\n"); -// }, -// } - -// return .{ -// .SUB = .{ -// .subject = subject, -// .queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null, -// .sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc), -// }, -// }; -// } - -// pub fn unsub(alloc: Allocator, in: *Reader) !Message { -// const States = enum { -// before_first, -// in_first, -// after_first, -// in_second, -// in_end, -// }; - -// var first: ArrayList(u8) = .empty; -// errdefer first.deinit(alloc); -// var second: ?ArrayList(u8) = null; -// defer if (second) |*s| s.deinit(alloc); - -// sw: switch (@as(States, .before_first)) { -// .before_first => { -// const byte = try in.peekByte(); -// if (isWhitespace(byte)) { -// in.toss(1); -// continue :sw .before_first; -// } -// continue :sw .in_first; -// }, -// .in_first => { -// const byte = try in.peekByte(); -// if (!isWhitespace(byte)) { -// try first.append(alloc, byte); -// in.toss(1); -// continue :sw .in_first; -// } -// continue :sw .after_first; -// }, -// .after_first => { -// const byte = try in.peekByte(); -// if (byte == '\r') { -// continue :sw .in_end; -// } else if (isWhitespace(byte)) { -// in.toss(1); -// continue :sw .after_first; -// } -// second = .empty; -// continue :sw .in_second; -// }, -// .in_second => { -// const byte = try in.peekByte(); -// if (byte == '\r') { -// continue :sw .in_end; -// } -// try second.?.append(alloc, byte); -// in.toss(1); -// continue :sw .in_second; -// }, -// .in_end => { -// try expectStreamBytes(in, "\r\n"); -// }, -// } - -// return .{ -// .UNSUB = .{ -// .sid = try first.toOwnedSlice(alloc), -// .max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null, -// }, -// }; -// } - -// pub fn @"pub"(alloc: Allocator, in: *Reader) !Message { -// try in.discardAll(1); // throw away space - -// // Parse subject -// const subject: []const u8 = try readSubject(alloc, in, .@"pub"); -// errdefer alloc.free(subject); - -// const States = enum { -// before_second, -// in_second, -// after_second, -// in_third, -// in_end, -// }; - -// var second: ArrayList(u8) = .empty; -// defer second.deinit(alloc); -// var third: ?ArrayList(u8) = null; -// defer if (third) |*t| t.deinit(alloc); - -// sw: switch (@as(States, .before_second)) { -// .before_second => { -// // Drop whitespace -// const byte = try in.peekByte(); -// if (isWhitespace(byte)) { -// in.toss(1); -// continue :sw .before_second; -// } -// continue :sw .in_second; -// }, -// .in_second => { -// for (1..in.buffer.len) |i| { -// try in.fill(i + 1); -// if (isWhitespace(in.buffered()[i])) { -// @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); -// in.toss(i); -// break; -// } -// } else return error.EndOfStream; -// continue :sw .after_second; -// }, -// .after_second => { -// const byte = try in.peekByte(); -// if (byte == '\r') { -// continue :sw .in_end; -// } else if (isWhitespace(byte)) { -// in.toss(1); -// continue :sw .after_second; -// } -// third = .empty; -// continue :sw .in_third; -// }, -// .in_third => { -// for (1..in.buffer.len) |i| { -// try in.fill(i + 1); -// if (isWhitespace(in.buffered()[i])) { -// @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); -// in.toss(i); -// break; -// } -// } else return error.EndOfStream; -// continue :sw .in_end; -// }, -// .in_end => { -// try expectStreamBytes(in, "\r\n"); -// }, -// } - -// const reply_to: ?[]const u8, const bytes: usize = -// if (third) |t| .{ -// try alloc.dupe(u8, second.items), -// try parseUnsigned(usize, t.items, 10), -// } else .{ -// null, -// try parseUnsigned(usize, second.items, 10), -// }; - -// const payload: Payload = try .read(alloc, in, bytes); -// errdefer payload.deinit(alloc); -// try expectStreamBytes(in, "\r\n"); - -// return .{ -// .PUB = .{ -// .subject = subject, -// .payload = payload, -// .reply_to = reply_to, -// }, -// }; -// } - -// pub fn hpub(alloc: Allocator, in: *Reader) !Message { -// try in.discardAll(1); // throw away space - -// // Parse subject -// const subject: []const u8 = try readSubject(alloc, in, .@"pub"); -// errdefer alloc.free(subject); - -// const States = enum { -// before_second, -// in_second, -// after_second, -// in_third, -// after_third, -// in_fourth, -// in_end, -// }; - -// var second: ArrayList(u8) = .empty; -// defer second.deinit(alloc); -// var third: ArrayList(u8) = .empty; -// defer third.deinit(alloc); -// var fourth: ?ArrayList(u8) = null; -// defer if (fourth) |*f| f.deinit(alloc); - -// sw: switch (@as(States, .before_second)) { -// .before_second => { -// // Drop whitespace -// const byte = try in.peekByte(); -// if (isWhitespace(byte)) { -// in.toss(1); -// continue :sw .before_second; -// } -// continue :sw .in_second; -// }, -// .in_second => { -// for (1..in.buffer.len) |i| { -// try in.fill(i + 1); -// if (isWhitespace(in.buffered()[i])) { -// @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); -// in.toss(i); -// break; -// } -// } else return error.EndOfStream; -// continue :sw .after_second; -// }, -// .after_second => { -// const byte = try in.peekByte(); -// if (byte == '\r') { -// continue :sw .in_end; -// } else if (isWhitespace(byte)) { -// in.toss(1); -// continue :sw .after_second; -// } -// third = .empty; -// continue :sw .in_third; -// }, -// .in_third => { -// for (1..in.buffer.len) |i| { -// try in.fill(i + 1); -// if (isWhitespace(in.buffered()[i])) { -// @memcpy(try third.addManyAsSlice(alloc, i), in.buffered()[0..i]); -// in.toss(i); -// break; -// } -// } else return error.EndOfStream; -// continue :sw .after_third; -// }, -// .after_third => { -// const byte = try in.peekByte(); -// if (byte == '\r') { -// continue :sw .in_end; -// } else if (isWhitespace(byte)) { -// in.toss(1); -// continue :sw .after_third; -// } -// fourth = .empty; -// continue :sw .in_fourth; -// }, -// .in_fourth => { -// for (1..in.buffer.len) |i| { -// try in.fill(i + 1); -// if (isWhitespace(in.buffered()[i])) { -// @memcpy(try fourth.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); -// in.toss(i); -// break; -// } -// } else return error.EndOfStream; -// continue :sw .in_end; -// }, -// .in_end => { -// try expectStreamBytes(in, "\r\n"); -// }, -// } - -// const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize = -// if (fourth) |f| .{ -// try alloc.dupe(u8, second.items), -// try parseUnsigned(usize, third.items, 10), -// try parseUnsigned(usize, f.items, 10), -// } else .{ -// null, -// try parseUnsigned(usize, second.items, 10), -// try parseUnsigned(usize, third.items, 10), -// }; - -// const payload: Payload = try .read(alloc, in, total_bytes); -// errdefer payload.deinit(alloc); -// try expectStreamBytes(in, "\r\n"); - -// return .{ -// .HPUB = .{ -// .header_bytes = header_bytes, -// .@"pub" = .{ -// .subject = subject, -// .payload = payload, -// .reply_to = reply_to, -// }, -// }, -// }; -// } - -// fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 { -// var subject_list: ArrayList(u8) = .empty; -// errdefer subject_list.deinit(alloc); - -// // Handle the first character -// { -// const byte = try in.takeByte(); -// if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>'))) -// return error.InvalidStream; - -// try subject_list.append(alloc, byte); -// } - -// switch (pub_or_sub) { -// .sub => { -// while (in.takeByte()) |byte| { -// if (isWhitespace(byte)) break; -// if (byte == '.') { -// const next_byte = try in.peekByte(); -// if (next_byte == '.' or isWhitespace(next_byte)) -// return error.InvalidStream; -// } else if (byte == '>') { -// const next_byte = try in.takeByte(); -// if (!isWhitespace(next_byte)) -// return error.InvalidStream; -// } else if (byte == '*') { -// const next_byte = try in.peekByte(); -// if (next_byte != '.' and !isWhitespace(next_byte)) -// return error.InvalidStream; -// } -// try subject_list.append(alloc, byte); -// } else |err| return err; -// }, -// .@"pub" => { -// while (in.takeByte()) |byte| { -// if (isWhitespace(byte)) break; -// if (byte == '*' or byte == '>') return error.InvalidStream; -// if (byte == '.') { -// const next_byte = try in.peekByte(); -// if (next_byte == '.' or isWhitespace(next_byte)) -// return error.InvalidStream; -// } -// try subject_list.append(alloc, byte); -// } else |err| return err; -// }, -// } - -// return subject_list.toOwnedSlice(alloc); -// } - inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void { if (!std.mem.eql(u8, try reader.take(expected.len), expected)) { @branchHint(.unlikely); return error.InvalidStream; } } - -// test sub { -// const alloc = std.testing.allocator; -// const expectEqualDeep = std.testing.expectEqualDeep; -// { -// var in: Reader = .fixed(" foo 1\r\n"); -// var res = try sub(alloc, &in); -// defer res.SUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .SUB = .{ -// .subject = "foo", -// .queue_group = null, -// .sid = "1", -// }, -// }, -// res, -// ); -// } -// { -// var in: Reader = .fixed(" foo 1\r\n"); -// var res = try sub(alloc, &in); -// defer res.SUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .SUB = .{ -// .subject = "foo", -// .queue_group = null, -// .sid = "1", -// }, -// }, -// res, -// ); -// } -// { -// var in: Reader = .fixed(" foo q 1\r\n"); -// var res = try sub(alloc, &in); -// defer res.SUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .SUB = .{ -// .subject = "foo", -// .queue_group = "q", -// .sid = "1", -// }, -// }, -// res, -// ); -// } -// { -// var in: Reader = .fixed(" 1 q 1\r\n"); -// var res = try sub(alloc, &in); -// defer res.SUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .SUB = .{ -// .subject = "1", -// .queue_group = "q", -// .sid = "1", -// }, -// }, -// res, -// ); -// } -// { -// var in: Reader = .fixed(" $SRV.PING 4\r\n"); -// var res = try sub(alloc, &in); -// defer res.SUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .SUB = .{ -// .subject = "$SRV.PING", -// .queue_group = null, -// .sid = "4", -// }, -// }, -// res, -// ); -// } -// { -// var in: Reader = .fixed(" foo.echo q 10\r\n"); -// var res = try sub(alloc, &in); -// defer res.SUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .SUB = .{ -// .subject = "foo.echo", -// .queue_group = "q", -// .sid = "10", -// }, -// }, -// res, -// ); -// } -// } - -// test unsub { -// const alloc = std.testing.allocator; -// const expectEqualDeep = std.testing.expectEqualDeep; -// const expectEqual = std.testing.expectEqual; -// { -// var in: Reader = .fixed(" 1\r\n"); -// var res = try unsub(alloc, &in); -// defer res.UNSUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .UNSUB = .{ -// .sid = "1", -// .max_msgs = null, -// }, -// }, -// res, -// ); -// try expectEqual(0, in.buffered().len); -// } - -// { -// var in: Reader = .fixed(" 1 1\r\n"); -// var res = try unsub(alloc, &in); -// defer res.UNSUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .UNSUB = .{ -// .sid = "1", -// .max_msgs = 1, -// }, -// }, -// res, -// ); -// try expectEqual(0, in.buffered().len); -// } -// } - -// test @"pub" { -// const alloc = std.testing.allocator; -// const expectEqualDeep = std.testing.expectEqualDeep; -// const expectEqual = std.testing.expectEqual; -// { -// var in: Reader = .fixed(" foo 3\r\nbar\r\n"); -// var res = try @"pub"(alloc, &in); -// defer res.PUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .PUB = .{ -// .subject = "foo", -// .reply_to = null, -// .payload = .{ -// .len = 3, -// .short = blk: { -// var s: [128]u8 = undefined; -// @memcpy(s[0..3], "bar"); -// break :blk s; -// }, -// .long = null, -// }, -// }, -// }, -// res, -// ); -// try expectEqual(0, in.buffered().len); -// } - -// { -// var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n"); -// var res = try @"pub"(alloc, &in); -// defer res.PUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .PUB = .{ -// .subject = "foo", -// .reply_to = "reply.to", -// .payload = .{ -// .len = 3, -// .short = blk: { -// var s: [128]u8 = undefined; -// @memcpy(s[0..3], "bar"); -// break :blk s; -// }, -// .long = null, -// }, -// }, -// }, -// res, -// ); -// try expectEqual(0, in.buffered().len); -// } - -// // numeric reply subject -// { -// var in: Reader = .fixed(" foo 5 3\r\nbar\r\n"); -// var res = try @"pub"(alloc, &in); -// defer res.PUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .PUB = .{ -// .subject = "foo", -// .reply_to = "5", -// .payload = .{ -// .len = 3, -// .short = blk: { -// var s: [128]u8 = undefined; -// @memcpy(s[0..3], "bar"); -// break :blk s; -// }, -// .long = null, -// }, -// }, -// }, -// res, -// ); -// try expectEqual(0, in.buffered().len); -// } -// } - -// test hpub { -// const alloc = std.testing.allocator; -// const expectEqualDeep = std.testing.expectEqualDeep; -// const expectEqual = std.testing.expectEqual; -// { -// var in: Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); -// var res = try hpub(alloc, &in); -// defer res.HPUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .HPUB = .{ -// .header_bytes = 22, -// .@"pub" = .{ -// .subject = "foo", -// .reply_to = null, -// .payload = .{ -// .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, -// .short = blk: { -// var s: [128]u8 = undefined; -// const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; -// @memcpy(s[0..str.len], str); -// break :blk s; -// }, -// .long = null, -// }, -// }, -// }, -// }, -// res, -// ); -// try expectEqual(0, in.buffered().len); -// } - -// { -// var in: Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); -// var res = try hpub(alloc, &in); -// defer res.HPUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .HPUB = .{ -// .header_bytes = 22, -// .@"pub" = .{ -// .subject = "foo", -// .reply_to = "reply.to", -// .payload = .{ -// .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, -// .short = blk: { -// var s: [128]u8 = undefined; -// const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; -// @memcpy(s[0..str.len], str); -// break :blk s; -// }, -// .long = null, -// }, -// }, -// }, -// }, -// res, -// ); -// try expectEqual(0, in.buffered().len); -// } - -// { -// var in: Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); -// var res = try hpub(alloc, &in); -// defer res.HPUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .HPUB = .{ -// .header_bytes = 22, -// .@"pub" = .{ -// .subject = "foo", -// .reply_to = "6", -// .payload = .{ -// .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, -// .short = blk: { -// var s: [128]u8 = undefined; -// const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; -// @memcpy(s[0..str.len], str); -// break :blk s; -// }, -// .long = null, -// }, -// }, -// }, -// }, -// res, -// ); -// try expectEqual(0, in.buffered().len); -// } -// } -- cgit