diff options
| -rw-r--r-- | src/Server.zig | 66 | ||||
| -rw-r--r-- | src/Server/Client.zig | 68 | ||||
| -rw-r--r-- | src/Server/parse.zig | 911 |
3 files changed, 186 insertions, 859 deletions
diff --git a/src/Server.zig b/src/Server.zig index b5f9ee9..4ae959f 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -281,9 +281,15 @@ fn publishMessage( } }; - _ = pub_or_hpub; + const hpubmsg = switch (pub_or_hpub) { + .@"pub" => {}, + .hpub => try parse.hpub(source_client.from_client), + }; - const msg = try parse.@"pub"(source_client.from_client); + const msg: Message.Pub = switch (pub_or_hpub) { + .@"pub" => try parse.@"pub"(source_client.from_client), + .hpub => hpubmsg.@"pub", + }; // const subject = switch (pub_or_hpub) { // .PUB => |pb| pb.subject, @@ -297,6 +303,10 @@ fn publishMessage( var published_queue_sub_idxs: ArrayList(usize) = .empty; defer published_queue_sub_idxs.deinit(alloc); + var line_writer_allocating: std.Io.Writer.Allocating = .init(alloc); + defer line_writer_allocating.deinit(); + var line_writer = &line_writer_allocating.writer; + subs: for (0..server.subscriptions.items.len) |i| { const subscription = server.subscriptions.items[i]; if (subjectMatches(subscription.subject, msg.subject)) { @@ -313,45 +323,29 @@ fn publishMessage( try published_queue_sub_idxs.append(alloc, i); } - const m = msg.toMsg(subscription.sid); - var msg_line_buf: [1024]u8 = undefined; - var msg_line_writer: std.Io.Writer = .fixed(&msg_line_buf); - - // try self.to_client.print( - // , - - // ); - // try m.payload.write(self.to_client); - // try self.to_client.print("\r\n", .{}); - try msg_line_writer.print( - "MSG {s} {s} {s} {d}\r\n", - .{ - m.subject, - m.sid, - m.reply_to orelse "", - m.payload.len, + line_writer_allocating.clearRetainingCapacity(); + + switch (pub_or_hpub) { + .@"pub" => _ = try line_writer.write("MSG "), + .hpub => _ = try line_writer.write("HMSG "), + } + try line_writer.print("{s} {s} ", .{ msg.subject, subscription.sid }); + if (msg.reply_to) |reply_to| { + try line_writer.print("{s} ", .{reply_to}); + } + switch (pub_or_hpub) { + .hpub => { + try line_writer.print("{d} ", .{hpubmsg.header_bytes}); }, - ); + else => {}, + } + try line_writer.print("{d}\r\n", .{msg.payload.len}); try subscription.queue_lock.lock(io); defer subscription.queue_lock.unlock(io); - try subscription.queue.putAll(io, msg_line_writer.buffered()); - try subscription.queue.putAll(io, m.payload); + try subscription.queue.putAll(io, line_writer.buffered()); + try subscription.queue.putAll(io, msg.payload); try subscription.queue.putAll(io, "\r\n"); - - // switch (msg) { - // .PUB => |pb| { - // try subscription.queue.putOne(io, .{ - // .MSG = try pb.toMsg(subscription.alloc, subscription.sid), - // }); - // }, - // .HPUB => |hp| { - // try subscription.queue.putOne(io, .{ - // .HMSG = try hp.toHMsg(subscription.alloc, subscription.sid), - // }); - // }, - // else => unreachable, - // } } } diff --git a/src/Server/Client.zig b/src/Server/Client.zig index a7dd007..26be79f 100644 --- a/src/Server/Client.zig +++ b/src/Server/Client.zig @@ -6,11 +6,6 @@ const Queue = std.Io.Queue; const Client = @This(); -pub const Msgs = union(enum) { - MSG: Message.Msg, - HMSG: Message.HMsg, -}; - connect: ?Message.Connect, // Byte queue for this client to receive. recv_queue: *Queue(u8), @@ -48,69 +43,6 @@ pub fn start(self: *Client, io: std.Io) !void { self.to_client.end = try self.recv_queue.get(io, self.to_client.buffer, 1); try self.to_client.flush(); } - // while (true) { - // switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) { - // .msgs => |len_err| { - // @branchHint(.likely); - // const msgs = msgs_buf[0..try len_err]; - // for (0..msgs.len) |i| { - // const msg = msgs[i]; - // defer switch (msg) { - // .MSG => |m| m.deinit(self.alloc), - // .HMSG => |h| h.deinit(self.alloc), - // }; - // errdefer for (msgs[i + 1 ..]) |mg| switch (mg) { - // .MSG => |m| { - // m.deinit(self.alloc); - // }, - // .HMSG => |h| { - // h.deinit(self.alloc); - // }, - // }; - // switch (msg) { - // .MSG => |m| { - - // }, - // .HMSG => |hmsg| { - // try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{ - // hmsg.msg.subject, - // hmsg.msg.sid, - // hmsg.msg.reply_to orelse "", - // hmsg.header_bytes, - // hmsg.msg.payload.len, - // }); - // try hmsg.msg.payload.write(self.to_client); - // try self.to_client.print("\r\n", .{}); - // }, - // } - // } - // recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable; - // }, - // .proto => |msg_err| { - // @branchHint(.unlikely); - // const msg = try msg_err; - // switch (msg) { - // .@"+OK" => { - // _ = try self.to_client.write("+OK\r\n"); - // }, - // .PONG => { - // _ = try self.to_client.write("PONG\r\n"); - // }, - // .INFO => |info| { - - // }, - // .@"-ERR" => |s| { - // _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); - // }, - // else => |m| { - // std.debug.panic("unimplemented write: {any}\n", .{m}); - // }, - // } - // recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; - // }, - // } - // try self.to_client.flush(); - // } } pub fn send(self: *Client, io: std.Io, msg: []const u8) !void { diff --git a/src/Server/parse.zig b/src/Server/parse.zig index 9035311..6ae99a5 100644 --- a/src/Server/parse.zig +++ b/src/Server/parse.zig @@ -44,10 +44,8 @@ pub fn control(in: *Reader) !message.Control { } break :blk min_len; }; - // log.debug("buffered: '{s}'", .{in.buffered()}); std.debug.assert(in.buffer.len >= longest_ctrl); while (true) { - // log.debug("buffered l: '{s}'", .{in.buffered()}); var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); if (iter.next()) |str| { if (client_control.get(str)) |ctrl| { @@ -128,17 +126,19 @@ pub fn @"pub"(in: *Reader) !Message.Pub { const bytes_str = second; const bytes = try parseUnsigned(usize, bytes_str, 10); - // 4 bytes for CRLF on either side of the payload. - _ = try in.take(iter.index + 2); - defer { - _ = in.take(2) catch { - log.warn("very bad parsing issue", .{}); - }; + if (in.buffered().len < iter.index + bytes + 4) { + try in.fill(iter.index + bytes + 4); + // Fill may shift buffer, so we have to retokenize it. + continue; } + + // 4 bytes for CRLF on either side of the payload. + in.toss(iter.index + 2); + defer in.toss(2); return .{ .subject = subject, .reply_to = null, - .payload = try in.take(bytes), + .payload = in.take(bytes) catch unreachable, }; } @@ -147,15 +147,20 @@ pub fn @"pub"(in: *Reader) !Message.Pub { const bytes = try parseUnsigned(usize, bytes_str, 10); if (in.buffered()[iter.index] == '\r') { - if (iter.rest().len > bytes + 2) { - // 4 bytes for CRLF on either side of the payload. - _ = try in.take(iter.index + bytes + 4); - return .{ - .subject = subject, - .reply_to = reply_to, - .payload = iter.rest()[1 .. 1 + bytes], - }; + if (in.buffered().len < iter.index + bytes + 4) { + try in.fill(iter.index + bytes + 4); + // Fill may shift buffer, so we have to retokenize it. + continue; } + + // 4 bytes for CRLF on either side of the payload. + in.toss(iter.index + 2); + defer in.toss(2); + return .{ + .subject = subject, + .reply_to = reply_to, + .payload = in.take(bytes) catch unreachable, + }; } } } @@ -265,8 +270,14 @@ pub fn sub(in: *Reader) !Message.Sub { if (in.buffered()[iter.index] == '\r') { const sid = second; + if (in.buffered().len < iter.index + 2) { + try in.fill(iter.index + 2); + // Fill may shift buffer, so we have to retokenize it. + continue; + } + // 2 bytes for CRLF at the end. - _ = try in.take(iter.index + 2); + in.toss(iter.index + 2); return .{ .subject = subject, .queue_group = null, @@ -277,8 +288,14 @@ pub fn sub(in: *Reader) !Message.Sub { const queue_group = second; if (iter.next()) |sid| { if (in.buffered()[iter.index] == '\r') { + if (in.buffered().len < iter.index + 2) { + try in.fill(iter.index + 2); + // Fill may shift buffer, so we have to retokenize it. + continue; + } + // 2 bytes for CRLF at the end. - _ = try in.take(iter.index + 2); + in.toss(iter.index + 2); return .{ .subject = subject, .queue_group = queue_group, @@ -367,8 +384,14 @@ pub fn unsub(in: *Reader) !Message.Unsub { if (iter.next()) |sid| { if (in.buffered()[iter.index] == '\r') { + if (in.buffered().len < iter.index + 2) { + try in.fill(iter.index + 2); + // Fill may shift buffer, so we have to retokenize it. + continue; + } + // 2 bytes for CRLF at the end. - _ = try in.take(iter.index + 2); + in.toss(iter.index + 2); return .{ .sid = sid, .max_msgs = null, @@ -377,8 +400,16 @@ pub fn unsub(in: *Reader) !Message.Unsub { if (iter.next()) |max_msgs_str| { if (in.buffered()[iter.index] == '\r') { const max_msgs = try parseUnsigned(usize, max_msgs_str, 10); + + if (in.buffered().len < iter.index + 2) { + try in.fill(iter.index + 2); + // Fill may shift buffer, so we have to retokenize it. + continue; + } + // 2 bytes for CRLF at the end. - _ = try in.take(iter.index + 2); + in.toss(iter.index + 2); + return .{ .sid = sid, .max_msgs = max_msgs, @@ -454,39 +485,113 @@ test unsub { pub fn hpub(in: *Reader) !Message.HPub { // TODO: Add pedantic option. // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1 - _ = in; - @compileError("TODO"); -} + while (true) { + var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); + + if (iter.next()) |subject| { + if (iter.next()) |second| { + if (iter.next()) |third| { + if (in.buffered().len > iter.index) { + if (in.buffered()[iter.index] == '\r') { + const header_bytes_str = second; + const total_bytes_str = third; -test hpub {} + const header_bytes = try parseUnsigned(usize, header_bytes_str, 10); + const total_bytes = try parseUnsigned(usize, total_bytes_str, 10); -// /// Get the next Message from the input stream. -// pub fn next(alloc: Allocator, in: *Reader) !Message { -// errdefer log.err("Failed to parse {s}", .{operation_string.items}); + if (in.buffered().len < iter.index + total_bytes + 4) { + try in.fill(iter.index + total_bytes + 4); + continue; + } -// switch (operation) { -// .CONNECT => return connect(alloc, in), -// .PUB => { -// @branchHint(.likely); -// return @"pub"(alloc, in); -// }, -// .HPUB => { -// @branchHint(.likely); -// return hpub(alloc, in); -// }, -// .PING => { -// try expectStreamBytes(in, "\r\n"); -// return .PING; -// }, -// .PONG => { -// try expectStreamBytes(in, "\r\n"); -// return .PONG; -// }, -// .SUB => return sub(alloc, in), -// .UNSUB => return unsub(alloc, in), -// else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}), -// } -// } + // 4 bytes for CRLF on either side of headers and payload. + in.toss(iter.index + 2); + defer in.toss(2); + return .{ + .header_bytes = header_bytes, + .@"pub" = .{ + .subject = subject, + .reply_to = null, + .payload = in.take(total_bytes) catch unreachable, + }, + }; + } + } + + const reply_to = second; + const header_bytes_str = third; + if (iter.next()) |total_bytes_str| { + if (in.buffered().len > iter.index) { + if (in.buffered()[iter.index] == '\r') { + const header_bytes = try parseUnsigned(usize, header_bytes_str, 10); + const total_bytes = try parseUnsigned(usize, total_bytes_str, 10); + + if (in.buffered().len < iter.index + total_bytes + 4) { + try in.fill(iter.index + total_bytes + 4); + continue; + } + + // 4 bytes for CRLF on either side of headers and payload. + in.toss(iter.index + 2); + defer in.toss(2); + return .{ + .header_bytes = header_bytes, + .@"pub" = .{ + .subject = subject, + .reply_to = reply_to, + .payload = in.take(total_bytes) catch unreachable, + }, + }; + } + } + } + } + } + } + + try in.fillMore(); + } +} + +test hpub { + { + var buf: [64]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = " foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n" }, + }); + try std.testing.expectEqualDeep( + Message.HPub{ + .header_bytes = 22, + .@"pub" = .{ + .subject = "foo", + .reply_to = null, + .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!", + }, + }, + try hpub(&in.interface), + ); + try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); + } + { + var buf: [64]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = " foo reply 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n" }, + }); + try std.testing.expectEqualDeep( + Message.HPub{ + .header_bytes = 22, + .@"pub" = .{ + .subject = "foo", + .reply_to = "reply", + .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!", + }, + }, + try hpub(&in.interface), + ); + try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); + } + // TODO: more tests +} pub fn connect(alloc: Allocator, in: *Reader) !Message.Connect { // for storing the json string @@ -518,713 +623,9 @@ pub fn connect(alloc: Allocator, in: *Reader) !Message.Connect { return res.dupe(alloc); } -// pub fn sub(alloc: Allocator, in: *Reader) !Message { -// try in.discardAll(1); // throw away space -// const subject = try readSubject(alloc, in, .sub); - -// const States = enum { -// before_second, -// in_second, -// after_second, -// in_third, -// in_end, -// }; - -// var second: ArrayList(u8) = .empty; -// errdefer second.deinit(alloc); -// var third: ?ArrayList(u8) = null; -// errdefer if (third) |*t| t.deinit(alloc); - -// sw: switch (@as(States, .before_second)) { -// .before_second => { -// const byte = try in.peekByte(); -// if (isWhitespace(byte)) { -// in.toss(1); -// continue :sw .before_second; -// } -// continue :sw .in_second; -// }, -// .in_second => { -// for (1..in.buffer.len) |i| { -// try in.fill(i + 1); -// if (isWhitespace(in.buffered()[i])) { -// @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); -// in.toss(i); -// break; -// } -// } else return error.EndOfStream; -// continue :sw .after_second; -// }, -// .after_second => { -// const byte = try in.peekByte(); -// if (byte == '\r') { -// continue :sw .in_end; -// } else if (isWhitespace(byte)) { -// in.toss(1); -// continue :sw .after_second; -// } -// third = .empty; -// continue :sw .in_third; -// }, -// .in_third => { -// for (1..in.buffer.len) |i| { -// try in.fill(i + 1); -// if (isWhitespace(in.buffered()[i])) { -// @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); -// in.toss(i); -// break; -// } -// } else return error.EndOfStream; -// continue :sw .in_end; -// }, -// .in_end => { -// try expectStreamBytes(in, "\r\n"); -// }, -// } - -// return .{ -// .SUB = .{ -// .subject = subject, -// .queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null, -// .sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc), -// }, -// }; -// } - -// pub fn unsub(alloc: Allocator, in: *Reader) !Message { -// const States = enum { -// before_first, -// in_first, -// after_first, -// in_second, -// in_end, -// }; - -// var first: ArrayList(u8) = .empty; -// errdefer first.deinit(alloc); -// var second: ?ArrayList(u8) = null; -// defer if (second) |*s| s.deinit(alloc); - -// sw: switch (@as(States, .before_first)) { -// .before_first => { -// const byte = try in.peekByte(); -// if (isWhitespace(byte)) { -// in.toss(1); -// continue :sw .before_first; -// } -// continue :sw .in_first; -// }, -// .in_first => { -// const byte = try in.peekByte(); -// if (!isWhitespace(byte)) { -// try first.append(alloc, byte); -// in.toss(1); -// continue :sw .in_first; -// } -// continue :sw .after_first; -// }, -// .after_first => { -// const byte = try in.peekByte(); -// if (byte == '\r') { -// continue :sw .in_end; -// } else if (isWhitespace(byte)) { -// in.toss(1); -// continue :sw .after_first; -// } -// second = .empty; -// continue :sw .in_second; -// }, -// .in_second => { -// const byte = try in.peekByte(); -// if (byte == '\r') { -// continue :sw .in_end; -// } -// try second.?.append(alloc, byte); -// in.toss(1); -// continue :sw .in_second; -// }, -// .in_end => { -// try expectStreamBytes(in, "\r\n"); -// }, -// } - -// return .{ -// .UNSUB = .{ -// .sid = try first.toOwnedSlice(alloc), -// .max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null, -// }, -// }; -// } - -// pub fn @"pub"(alloc: Allocator, in: *Reader) !Message { -// try in.discardAll(1); // throw away space - -// // Parse subject -// const subject: []const u8 = try readSubject(alloc, in, .@"pub"); -// errdefer alloc.free(subject); - -// const States = enum { -// before_second, -// in_second, -// after_second, -// in_third, -// in_end, -// }; - -// var second: ArrayList(u8) = .empty; -// defer second.deinit(alloc); -// var third: ?ArrayList(u8) = null; -// defer if (third) |*t| t.deinit(alloc); - -// sw: switch (@as(States, .before_second)) { -// .before_second => { -// // Drop whitespace -// const byte = try in.peekByte(); -// if (isWhitespace(byte)) { -// in.toss(1); -// continue :sw .before_second; -// } -// continue :sw .in_second; -// }, -// .in_second => { -// for (1..in.buffer.len) |i| { -// try in.fill(i + 1); -// if (isWhitespace(in.buffered()[i])) { -// @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); -// in.toss(i); -// break; -// } -// } else return error.EndOfStream; -// continue :sw .after_second; -// }, -// .after_second => { -// const byte = try in.peekByte(); -// if (byte == '\r') { -// continue :sw .in_end; -// } else if (isWhitespace(byte)) { -// in.toss(1); -// continue :sw .after_second; -// } -// third = .empty; -// continue :sw .in_third; -// }, -// .in_third => { -// for (1..in.buffer.len) |i| { -// try in.fill(i + 1); -// if (isWhitespace(in.buffered()[i])) { -// @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); -// in.toss(i); -// break; -// } -// } else return error.EndOfStream; -// continue :sw .in_end; -// }, -// .in_end => { -// try expectStreamBytes(in, "\r\n"); -// }, -// } - -// const reply_to: ?[]const u8, const bytes: usize = -// if (third) |t| .{ -// try alloc.dupe(u8, second.items), -// try parseUnsigned(usize, t.items, 10), -// } else .{ -// null, -// try parseUnsigned(usize, second.items, 10), -// }; - -// const payload: Payload = try .read(alloc, in, bytes); -// errdefer payload.deinit(alloc); -// try expectStreamBytes(in, "\r\n"); - -// return .{ -// .PUB = .{ -// .subject = subject, -// .payload = payload, -// .reply_to = reply_to, -// }, -// }; -// } - -// pub fn hpub(alloc: Allocator, in: *Reader) !Message { -// try in.discardAll(1); // throw away space - -// // Parse subject -// const subject: []const u8 = try readSubject(alloc, in, .@"pub"); -// errdefer alloc.free(subject); - -// const States = enum { -// before_second, -// in_second, -// after_second, -// in_third, -// after_third, -// in_fourth, -// in_end, -// }; - -// var second: ArrayList(u8) = .empty; -// defer second.deinit(alloc); -// var third: ArrayList(u8) = .empty; -// defer third.deinit(alloc); -// var fourth: ?ArrayList(u8) = null; -// defer if (fourth) |*f| f.deinit(alloc); - -// sw: switch (@as(States, .before_second)) { -// .before_second => { -// // Drop whitespace -// const byte = try in.peekByte(); -// if (isWhitespace(byte)) { -// in.toss(1); -// continue :sw .before_second; -// } -// continue :sw .in_second; -// }, -// .in_second => { -// for (1..in.buffer.len) |i| { -// try in.fill(i + 1); -// if (isWhitespace(in.buffered()[i])) { -// @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); -// in.toss(i); -// break; -// } -// } else return error.EndOfStream; -// continue :sw .after_second; -// }, -// .after_second => { -// const byte = try in.peekByte(); -// if (byte == '\r') { -// continue :sw .in_end; -// } else if (isWhitespace(byte)) { -// in.toss(1); -// continue :sw .after_second; -// } -// third = .empty; -// continue :sw .in_third; -// }, -// .in_third => { -// for (1..in.buffer.len) |i| { -// try in.fill(i + 1); -// if (isWhitespace(in.buffered()[i])) { -// @memcpy(try third.addManyAsSlice(alloc, i), in.buffered()[0..i]); -// in.toss(i); -// break; -// } -// } else return error.EndOfStream; -// continue :sw .after_third; -// }, -// .after_third => { -// const byte = try in.peekByte(); -// if (byte == '\r') { -// continue :sw .in_end; -// } else if (isWhitespace(byte)) { -// in.toss(1); -// continue :sw .after_third; -// } -// fourth = .empty; -// continue :sw .in_fourth; -// }, -// .in_fourth => { -// for (1..in.buffer.len) |i| { -// try in.fill(i + 1); -// if (isWhitespace(in.buffered()[i])) { -// @memcpy(try fourth.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); -// in.toss(i); -// break; -// } -// } else return error.EndOfStream; -// continue :sw .in_end; -// }, -// .in_end => { -// try expectStreamBytes(in, "\r\n"); -// }, -// } - -// const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize = -// if (fourth) |f| .{ -// try alloc.dupe(u8, second.items), -// try parseUnsigned(usize, third.items, 10), -// try parseUnsigned(usize, f.items, 10), -// } else .{ -// null, -// try parseUnsigned(usize, second.items, 10), -// try parseUnsigned(usize, third.items, 10), -// }; - -// const payload: Payload = try .read(alloc, in, total_bytes); -// errdefer payload.deinit(alloc); -// try expectStreamBytes(in, "\r\n"); - -// return .{ -// .HPUB = .{ -// .header_bytes = header_bytes, -// .@"pub" = .{ -// .subject = subject, -// .payload = payload, -// .reply_to = reply_to, -// }, -// }, -// }; -// } - -// fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 { -// var subject_list: ArrayList(u8) = .empty; -// errdefer subject_list.deinit(alloc); - -// // Handle the first character -// { -// const byte = try in.takeByte(); -// if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>'))) -// return error.InvalidStream; - -// try subject_list.append(alloc, byte); -// } - -// switch (pub_or_sub) { -// .sub => { -// while (in.takeByte()) |byte| { -// if (isWhitespace(byte)) break; -// if (byte == '.') { -// const next_byte = try in.peekByte(); -// if (next_byte == '.' or isWhitespace(next_byte)) -// return error.InvalidStream; -// } else if (byte == '>') { -// const next_byte = try in.takeByte(); -// if (!isWhitespace(next_byte)) -// return error.InvalidStream; -// } else if (byte == '*') { -// const next_byte = try in.peekByte(); -// if (next_byte != '.' and !isWhitespace(next_byte)) -// return error.InvalidStream; -// } -// try subject_list.append(alloc, byte); -// } else |err| return err; -// }, -// .@"pub" => { -// while (in.takeByte()) |byte| { -// if (isWhitespace(byte)) break; -// if (byte == '*' or byte == '>') return error.InvalidStream; -// if (byte == '.') { -// const next_byte = try in.peekByte(); -// if (next_byte == '.' or isWhitespace(next_byte)) -// return error.InvalidStream; -// } -// try subject_list.append(alloc, byte); -// } else |err| return err; -// }, -// } - -// return subject_list.toOwnedSlice(alloc); -// } - inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void { if (!std.mem.eql(u8, try reader.take(expected.len), expected)) { @branchHint(.unlikely); return error.InvalidStream; } } - -// test sub { -// const alloc = std.testing.allocator; -// const expectEqualDeep = std.testing.expectEqualDeep; -// { -// var in: Reader = .fixed(" foo 1\r\n"); -// var res = try sub(alloc, &in); -// defer res.SUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .SUB = .{ -// .subject = "foo", -// .queue_group = null, -// .sid = "1", -// }, -// }, -// res, -// ); -// } -// { -// var in: Reader = .fixed(" foo 1\r\n"); -// var res = try sub(alloc, &in); -// defer res.SUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .SUB = .{ -// .subject = "foo", -// .queue_group = null, -// .sid = "1", -// }, -// }, -// res, -// ); -// } -// { -// var in: Reader = .fixed(" foo q 1\r\n"); -// var res = try sub(alloc, &in); -// defer res.SUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .SUB = .{ -// .subject = "foo", -// .queue_group = "q", -// .sid = "1", -// }, -// }, -// res, -// ); -// } -// { -// var in: Reader = .fixed(" 1 q 1\r\n"); -// var res = try sub(alloc, &in); -// defer res.SUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .SUB = .{ -// .subject = "1", -// .queue_group = "q", -// .sid = "1", -// }, -// }, -// res, -// ); -// } -// { -// var in: Reader = .fixed(" $SRV.PING 4\r\n"); -// var res = try sub(alloc, &in); -// defer res.SUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .SUB = .{ -// .subject = "$SRV.PING", -// .queue_group = null, -// .sid = "4", -// }, -// }, -// res, -// ); -// } -// { -// var in: Reader = .fixed(" foo.echo q 10\r\n"); -// var res = try sub(alloc, &in); -// defer res.SUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .SUB = .{ -// .subject = "foo.echo", -// .queue_group = "q", -// .sid = "10", -// }, -// }, -// res, -// ); -// } -// } - -// test unsub { -// const alloc = std.testing.allocator; -// const expectEqualDeep = std.testing.expectEqualDeep; -// const expectEqual = std.testing.expectEqual; -// { -// var in: Reader = .fixed(" 1\r\n"); -// var res = try unsub(alloc, &in); -// defer res.UNSUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .UNSUB = .{ -// .sid = "1", -// .max_msgs = null, -// }, -// }, -// res, -// ); -// try expectEqual(0, in.buffered().len); -// } - -// { -// var in: Reader = .fixed(" 1 1\r\n"); -// var res = try unsub(alloc, &in); -// defer res.UNSUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .UNSUB = .{ -// .sid = "1", -// .max_msgs = 1, -// }, -// }, -// res, -// ); -// try expectEqual(0, in.buffered().len); -// } -// } - -// test @"pub" { -// const alloc = std.testing.allocator; -// const expectEqualDeep = std.testing.expectEqualDeep; -// const expectEqual = std.testing.expectEqual; -// { -// var in: Reader = .fixed(" foo 3\r\nbar\r\n"); -// var res = try @"pub"(alloc, &in); -// defer res.PUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .PUB = .{ -// .subject = "foo", -// .reply_to = null, -// .payload = .{ -// .len = 3, -// .short = blk: { -// var s: [128]u8 = undefined; -// @memcpy(s[0..3], "bar"); -// break :blk s; -// }, -// .long = null, -// }, -// }, -// }, -// res, -// ); -// try expectEqual(0, in.buffered().len); -// } - -// { -// var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n"); -// var res = try @"pub"(alloc, &in); -// defer res.PUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .PUB = .{ -// .subject = "foo", -// .reply_to = "reply.to", -// .payload = .{ -// .len = 3, -// .short = blk: { -// var s: [128]u8 = undefined; -// @memcpy(s[0..3], "bar"); -// break :blk s; -// }, -// .long = null, -// }, -// }, -// }, -// res, -// ); -// try expectEqual(0, in.buffered().len); -// } - -// // numeric reply subject -// { -// var in: Reader = .fixed(" foo 5 3\r\nbar\r\n"); -// var res = try @"pub"(alloc, &in); -// defer res.PUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .PUB = .{ -// .subject = "foo", -// .reply_to = "5", -// .payload = .{ -// .len = 3, -// .short = blk: { -// var s: [128]u8 = undefined; -// @memcpy(s[0..3], "bar"); -// break :blk s; -// }, -// .long = null, -// }, -// }, -// }, -// res, -// ); -// try expectEqual(0, in.buffered().len); -// } -// } - -// test hpub { -// const alloc = std.testing.allocator; -// const expectEqualDeep = std.testing.expectEqualDeep; -// const expectEqual = std.testing.expectEqual; -// { -// var in: Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); -// var res = try hpub(alloc, &in); -// defer res.HPUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .HPUB = .{ -// .header_bytes = 22, -// .@"pub" = .{ -// .subject = "foo", -// .reply_to = null, -// .payload = .{ -// .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, -// .short = blk: { -// var s: [128]u8 = undefined; -// const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; -// @memcpy(s[0..str.len], str); -// break :blk s; -// }, -// .long = null, -// }, -// }, -// }, -// }, -// res, -// ); -// try expectEqual(0, in.buffered().len); -// } - -// { -// var in: Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); -// var res = try hpub(alloc, &in); -// defer res.HPUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .HPUB = .{ -// .header_bytes = 22, -// .@"pub" = .{ -// .subject = "foo", -// .reply_to = "reply.to", -// .payload = .{ -// .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, -// .short = blk: { -// var s: [128]u8 = undefined; -// const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; -// @memcpy(s[0..str.len], str); -// break :blk s; -// }, -// .long = null, -// }, -// }, -// }, -// }, -// res, -// ); -// try expectEqual(0, in.buffered().len); -// } - -// { -// var in: Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); -// var res = try hpub(alloc, &in); -// defer res.HPUB.deinit(alloc); -// try expectEqualDeep( -// Message{ -// .HPUB = .{ -// .header_bytes = 22, -// .@"pub" = .{ -// .subject = "foo", -// .reply_to = "6", -// .payload = .{ -// .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, -// .short = blk: { -// var s: [128]u8 = undefined; -// const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; -// @memcpy(s[0..str.len], str); -// break :blk s; -// }, -// .long = null, -// }, -// }, -// }, -// }, -// res, -// ); -// try expectEqual(0, in.buffered().len); -// } -// } |
