diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-07 17:26:10 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-07 23:19:19 -0500 |
| commit | 96a3705069cf33a00ded143f876734c2a045cf1e (patch) | |
| tree | 61fca89bddd554fb7f8e4800eddde965f7163b0b /src/Server | |
| parent | e2a60c9427bfaf63149b4692459e86749553f755 (diff) | |
starting zero alloc parsing
Diffstat (limited to 'src/Server')
| -rw-r--r-- | src/Server/Client.zig | 192 | ||||
| -rw-r--r-- | src/Server/Payload.zig (renamed from src/Server/parse/Payload.zig) | 0 | ||||
| -rw-r--r-- | src/Server/message.zig (renamed from src/Server/parse/message.zig) | 30 | ||||
| -rw-r--r-- | src/Server/parse.zig | 1653 | ||||
| -rw-r--r-- | src/Server/serialize.zig | 6 |
5 files changed, 985 insertions, 896 deletions
diff --git a/src/Server/Client.zig b/src/Server/Client.zig index 9ec928c..047f38d 100644 --- a/src/Server/Client.zig +++ b/src/Server/Client.zig @@ -11,29 +11,23 @@ pub const Msgs = union(enum) { }; connect: ?Message.Connect, -// Used to own messages that we receive in our queues. -alloc: std.mem.Allocator, - -// Messages for this client to receive. -recv_queue: *Queue(Message), -msg_queue: *Queue(Msgs), +// Byte queue for this client to receive. +recv_queue: *Queue(u8), +// Only necessary to hold this lock for writing to the queue (to avoid interleaving message writes). +recv_queue_write_lock: std.Io.Mutex = .init, from_client: *std.Io.Reader, to_client: *std.Io.Writer, pub fn init( connect: ?Message.Connect, - alloc: std.mem.Allocator, - recv_queue: *Queue(Message), - msg_queue: *Queue(Msgs), + recv_queue: *Queue(u8), in: *std.Io.Reader, out: *std.Io.Writer, ) Client { return .{ .connect = connect, - .alloc = alloc, .recv_queue = recv_queue, - .msg_queue = msg_queue, .from_client = in, .to_client = out, }; @@ -47,97 +41,91 @@ pub fn deinit(self: *Client, alloc: std.mem.Allocator) void { } pub fn start(self: *Client, io: std.Io) !void { - var msgs_buf: [1024]Msgs = undefined; - - var recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch @panic("Concurrency unavailable"); - errdefer _ = recv_msgs_task.cancel(io) catch {}; - - var recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; - errdefer _ = recv_proto_task.cancel(io) catch {}; - + std.debug.assert(self.to_client.buffer.len > 0); + std.debug.assert(self.to_client.end == 0); while (true) { - switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) { - .msgs => |len_err| { - @branchHint(.likely); - const msgs = msgs_buf[0..try len_err]; - for (0..msgs.len) |i| { - const msg = msgs[i]; - defer switch (msg) { - .MSG => |m| m.deinit(self.alloc), - .HMSG => |h| h.deinit(self.alloc), - }; - errdefer for (msgs[i + 1 ..]) |mg| switch (mg) { - .MSG => |m| { - m.deinit(self.alloc); - }, - .HMSG => |h| { - h.deinit(self.alloc); - }, - }; - switch (msg) { - .MSG => |m| { - try self.to_client.print( - "MSG {s} {s} {s} {d}\r\n", - .{ - m.subject, - m.sid, - m.reply_to orelse "", - m.payload.len, - }, - ); - try m.payload.write(self.to_client); - try self.to_client.print("\r\n", .{}); - }, - .HMSG => |hmsg| { - try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{ - hmsg.msg.subject, - hmsg.msg.sid, - hmsg.msg.reply_to orelse "", - hmsg.header_bytes, - hmsg.msg.payload.len, - }); - try hmsg.msg.payload.write(self.to_client); - try self.to_client.print("\r\n", .{}); - }, - } - } - recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable; - }, - .proto => |msg_err| { - @branchHint(.unlikely); - const msg = try msg_err; - switch (msg) { - .@"+OK" => { - _ = try self.to_client.write("+OK\r\n"); - }, - .PONG => { - _ = try self.to_client.write("PONG\r\n"); - }, - .INFO => |info| { - _ = try self.to_client.write("INFO "); - try std.json.Stringify.value(info, .{}, self.to_client); - _ = try self.to_client.write("\r\n"); - }, - .@"-ERR" => |s| { - _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); - }, - else => |m| { - std.debug.panic("unimplemented write: {any}\n", .{m}); - }, - } - recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; - }, - } + self.to_client.end = try self.recv_queue.get(io, self.to_client.buffer, 1); try self.to_client.flush(); } + // while (true) { + // switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) { + // .msgs => |len_err| { + // @branchHint(.likely); + // const msgs = msgs_buf[0..try len_err]; + // for (0..msgs.len) |i| { + // const msg = msgs[i]; + // defer switch (msg) { + // .MSG => |m| m.deinit(self.alloc), + // .HMSG => |h| h.deinit(self.alloc), + // }; + // errdefer for (msgs[i + 1 ..]) |mg| switch (mg) { + // .MSG => |m| { + // m.deinit(self.alloc); + // }, + // .HMSG => |h| { + // h.deinit(self.alloc); + // }, + // }; + // switch (msg) { + // .MSG => |m| { + // try self.to_client.print( + // "MSG {s} {s} {s} {d}\r\n", + // .{ + // m.subject, + // m.sid, + // m.reply_to orelse "", + // m.payload.len, + // }, + // ); + // try m.payload.write(self.to_client); + // try self.to_client.print("\r\n", .{}); + // }, + // .HMSG => |hmsg| { + // try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{ + // hmsg.msg.subject, + // hmsg.msg.sid, + // hmsg.msg.reply_to orelse "", + // hmsg.header_bytes, + // hmsg.msg.payload.len, + // }); + // try hmsg.msg.payload.write(self.to_client); + // try self.to_client.print("\r\n", .{}); + // }, + // } + // } + // recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable; + // }, + // .proto => |msg_err| { + // @branchHint(.unlikely); + // const msg = try msg_err; + // switch (msg) { + // .@"+OK" => { + // _ = try self.to_client.write("+OK\r\n"); + // }, + // .PONG => { + // _ = try self.to_client.write("PONG\r\n"); + // }, + // .INFO => |info| { + // _ = try self.to_client.write("INFO "); + // try std.json.Stringify.value(info, .{}, self.to_client); + // _ = try self.to_client.write("\r\n"); + // }, + // .@"-ERR" => |s| { + // _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); + // }, + // else => |m| { + // std.debug.panic("unimplemented write: {any}\n", .{m}); + // }, + // } + // recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; + // }, + // } + // try self.to_client.flush(); + // } } -pub fn send(self: *Client, io: std.Io, msg: Message) !void { - switch (msg) { - .MSG => |m| try self.msg_queue.putOne(io, .{ .MSG = m }), - .HMSG => |m| try self.msg_queue.putOne(io, .{ .HMSG = m }), - else => try self.recv_queue.putOne(io, msg), - } +pub fn send(self: *Client, io: std.Io, msg: []const u8) !void { + try self.recv_queue.putAll(io, msg); } test send { @@ -148,19 +136,15 @@ test send { var buf: [1024]u8 = undefined; break :blk &buf; }); - var recv_queue: Queue(Message) = .init(&.{}); - var msgs_queue: Queue(Msgs) = .init(blk: { - var buf: [1]Msgs = undefined; - break :blk &buf; - }); - var client: Client = .init(null, gpa, &recv_queue, &msgs_queue, undefined, &to_client); + var recv_queue: Queue(u8) = .init(&.{}); + var client: Client = .init(null, &recv_queue, undefined, &to_client); defer client.deinit(gpa); var c_task = try io.concurrent(Client.start, .{ &client, io }); defer c_task.cancel(io) catch {}; { - try client.send(io, .PONG); + try client.send(io, "PONG\r\n"); // Wait for the concurrent client task to write to the writer try io.sleep(.fromMilliseconds(1), .awake); try std.testing.expectEqualSlices(u8, "PONG\r\n", to_client.buffered()); diff --git a/src/Server/parse/Payload.zig b/src/Server/Payload.zig index b512a81..b512a81 100644 --- a/src/Server/parse/Payload.zig +++ b/src/Server/Payload.zig diff --git a/src/Server/parse/message.zig b/src/Server/message.zig index c8a308f..f22410f 100644 --- a/src/Server/parse/message.zig +++ b/src/Server/message.zig @@ -106,13 +106,7 @@ pub const Message = union(enum) { /// The reply subject that subscribers can use to send a response back to the publisher/requestor. reply_to: ?[]const u8 = null, /// The message payload data. - payload: Payload, - - pub fn deinit(self: Pub, alloc: Allocator) void { - alloc.free(self.subject); - self.payload.deinit(alloc); - if (self.reply_to) |r| alloc.free(r); - } + payload: []const u8, pub fn toMsg(self: Pub, alloc: Allocator, sid: []const u8) !Msg { const res: Msg = .{ @@ -183,26 +177,6 @@ pub const Message = union(enum) { subject: []const u8, sid: []const u8, reply_to: ?[]const u8, - payload: Payload, - - pub fn deinit(self: Msg, alloc: Allocator) void { - alloc.free(self.subject); - alloc.free(self.sid); - if (self.reply_to) |r| alloc.free(r); - self.payload.deinit(alloc); - } - - pub fn dupe(self: Msg, alloc: Allocator) !Msg { - var res: Msg = undefined; - res.subject = try alloc.dupe(u8, self.subject); - errdefer alloc.free(res.subject); - res.sid = try alloc.dupe(u8, self.sid); - errdefer alloc.free(res.sid); - res.reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null; - errdefer if (res.reply_to) |r| alloc.free(r); - res.payload = try self.payload.dupe(alloc); - errdefer alloc.free(res.payload); - return res; - } + payload: []const u8, }; }; diff --git a/src/Server/parse.zig b/src/Server/parse.zig index 70dd2bc..6e013c4 100644 --- a/src/Server/parse.zig +++ b/src/Server/parse.zig @@ -15,9 +15,9 @@ const isWhitespace = std.ascii.isWhitespace; const parseUnsigned = std.fmt.parseUnsigned; -const message = @import("./parse/message.zig"); +const message = @import("./message.zig"); pub const Message = message.Message; -pub const Payload = @import("./parse/Payload.zig"); +pub const Payload = @import("./Payload.zig"); const client_control = StaticStringMap(message.Control).initComptime( .{ @@ -35,801 +35,926 @@ const client_control = StaticStringMap(message.Control).initComptime( // {"-ERR", .@"-err"}, }, ); -fn parseStaticStringMap(input: []const u8) ?message.Control { - return client_control.get(input); -} - -/// Parse a string into its associated MessageType. -const parse = parseStaticStringMap; -/// Get the next Message from the input stream. -pub fn next(alloc: Allocator, in: *Reader) !Message { - var operation_string: ArrayList(u8) = blk: { - comptime var buf_len = 0; - comptime { - for (client_control.keys()) |key| { - buf_len = @max(buf_len, key.len); - } +pub fn control(in: *Reader) !message.Control { + const longest_ctrl = comptime blk: { + var min_len = 0; + for (client_control.keys()) |ctrl| { + min_len = @max(ctrl.len, min_len); } - var buf: [buf_len]u8 = undefined; - break :blk .initBuffer(&buf); - }; - - while (in.peekByte()) |byte| { - if (isUpper(byte)) { - try operation_string.appendBounded(byte); - in.toss(1); - } else break; - } else |err| return err; - - const operation = parse(operation_string.items) orelse { - log.err("Invalid operation: '{s}'", .{operation_string.items}); - return error.InvalidOperation; - }; - - errdefer log.err("Failed to parse {s}", .{operation_string.items}); - - switch (operation) { - .CONNECT => return connect(alloc, in), - .PUB => { - @branchHint(.likely); - return @"pub"(alloc, in); - }, - .HPUB => { - @branchHint(.likely); - return hpub(alloc, in); - }, - .PING => { - try expectStreamBytes(in, "\r\n"); - return .PING; - }, - .PONG => { - try expectStreamBytes(in, "\r\n"); - return .PONG; - }, - .SUB => return sub(alloc, in), - .UNSUB => return unsub(alloc, in), - else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}), - } -} - -pub fn connect(alloc: Allocator, in: *Reader) !Message { - // for storing the json string - var connect_string_writer_allocating: AllocatingWriter = .init(alloc); - defer connect_string_writer_allocating.deinit(); - var connect_string_writer = &connect_string_writer_allocating.writer; - - // for parsing the json string - var connect_arena_allocator: ArenaAllocator = .init(alloc); - defer connect_arena_allocator.deinit(); - const connect_allocator = connect_arena_allocator.allocator(); - - try in.discardAll(1); // throw away space - - // Should read the next JSON object to the fixed buffer writer. - _ = try in.streamDelimiter(connect_string_writer, '}'); - try connect_string_writer.writeByte('}'); - try expectStreamBytes(in, "}\r\n"); // discard '}\r\n' - - const connect_str = try connect_string_writer_allocating.toOwnedSlice(); - defer alloc.free(connect_str); - const res = try std.json.parseFromSliceLeaky( - Message.Connect, - connect_allocator, - connect_str, - .{ .allocate = .alloc_always }, - ); - - return .{ .CONNECT = try res.dupe(alloc) }; -} - -pub fn sub(alloc: Allocator, in: *Reader) !Message { - try in.discardAll(1); // throw away space - const subject = try readSubject(alloc, in, .sub); - - const States = enum { - before_second, - in_second, - after_second, - in_third, - in_end, - }; - - var second: ArrayList(u8) = .empty; - errdefer second.deinit(alloc); - var third: ?ArrayList(u8) = null; - errdefer if (third) |*t| t.deinit(alloc); - - sw: switch (@as(States, .before_second)) { - .before_second => { - const byte = try in.peekByte(); - if (isWhitespace(byte)) { - in.toss(1); - continue :sw .before_second; - } - continue :sw .in_second; - }, - .in_second => { - for (1..in.buffer.len) |i| { - try in.fill(i + 1); - if (isWhitespace(in.buffered()[i])) { - @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); - in.toss(i); - break; - } - } else return error.EndOfStream; - continue :sw .after_second; - }, - .after_second => { - const byte = try in.peekByte(); - if (byte == '\r') { - continue :sw .in_end; - } else if (isWhitespace(byte)) { - in.toss(1); - continue :sw .after_second; - } - third = .empty; - continue :sw .in_third; - }, - .in_third => { - for (1..in.buffer.len) |i| { - try in.fill(i + 1); - if (isWhitespace(in.buffered()[i])) { - @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); - in.toss(i); - break; - } - } else return error.EndOfStream; - continue :sw .in_end; - }, - .in_end => { - try expectStreamBytes(in, "\r\n"); - }, - } - - return .{ - .SUB = .{ - .subject = subject, - .queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null, - .sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc), - }, - }; -} - -pub fn unsub(alloc: Allocator, in: *Reader) !Message { - const States = enum { - before_first, - in_first, - after_first, - in_second, - in_end, - }; - - var first: ArrayList(u8) = .empty; - errdefer first.deinit(alloc); - var second: ?ArrayList(u8) = null; - defer if (second) |*s| s.deinit(alloc); - - sw: switch (@as(States, .before_first)) { - .before_first => { - const byte = try in.peekByte(); - if (isWhitespace(byte)) { - in.toss(1); - continue :sw .before_first; - } - continue :sw .in_first; - }, - .in_first => { - const byte = try in.peekByte(); - if (!isWhitespace(byte)) { - try first.append(alloc, byte); - in.toss(1); - continue :sw .in_first; - } - continue :sw .after_first; - }, - .after_first => { - const byte = try in.peekByte(); - if (byte == '\r') { - continue :sw .in_end; - } else if (isWhitespace(byte)) { - in.toss(1); - continue :sw .after_first; - } - second = .empty; - continue :sw .in_second; - }, - .in_second => { - const byte = try in.peekByte(); - if (byte == '\r') { - continue :sw .in_end; - } - try second.?.append(alloc, byte); - in.toss(1); - continue :sw .in_second; - }, - .in_end => { - try expectStreamBytes(in, "\r\n"); - }, - } - - return .{ - .UNSUB = .{ - .sid = try first.toOwnedSlice(alloc), - .max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null, - }, - }; -} - -pub fn @"pub"(alloc: Allocator, in: *Reader) !Message { - try in.discardAll(1); // throw away space - - // Parse subject - const subject: []const u8 = try readSubject(alloc, in, .@"pub"); - errdefer alloc.free(subject); - - const States = enum { - before_second, - in_second, - after_second, - in_third, - in_end, - }; - - var second: ArrayList(u8) = .empty; - defer second.deinit(alloc); - var third: ?ArrayList(u8) = null; - defer if (third) |*t| t.deinit(alloc); - - sw: switch (@as(States, .before_second)) { - .before_second => { - // Drop whitespace - const byte = try in.peekByte(); - if (isWhitespace(byte)) { - in.toss(1); - continue :sw .before_second; - } - continue :sw .in_second; - }, - .in_second => { - for (1..in.buffer.len) |i| { - try in.fill(i + 1); - if (isWhitespace(in.buffered()[i])) { - @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); - in.toss(i); - break; - } - } else return error.EndOfStream; - continue :sw .after_second; - }, - .after_second => { - const byte = try in.peekByte(); - if (byte == '\r') { - continue :sw .in_end; - } else if (isWhitespace(byte)) { - in.toss(1); - continue :sw .after_second; - } - third = .empty; - continue :sw .in_third; - }, - .in_third => { - for (1..in.buffer.len) |i| { - try in.fill(i + 1); - if (isWhitespace(in.buffered()[i])) { - @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); - in.toss(i); - break; - } - } else return error.EndOfStream; - continue :sw .in_end; - }, - .in_end => { - try expectStreamBytes(in, "\r\n"); - }, - } - - const reply_to: ?[]const u8, const bytes: usize = - if (third) |t| .{ - try alloc.dupe(u8, second.items), - try parseUnsigned(usize, t.items, 10), - } else .{ - null, - try parseUnsigned(usize, second.items, 10), - }; - - const payload: Payload = try .read(alloc, in, bytes); - errdefer payload.deinit(alloc); - try expectStreamBytes(in, "\r\n"); - - return .{ - .PUB = .{ - .subject = subject, - .payload = payload, - .reply_to = reply_to, - }, - }; -} - -pub fn hpub(alloc: Allocator, in: *Reader) !Message { - try in.discardAll(1); // throw away space - - // Parse subject - const subject: []const u8 = try readSubject(alloc, in, .@"pub"); - errdefer alloc.free(subject); - - const States = enum { - before_second, - in_second, - after_second, - in_third, - after_third, - in_fourth, - in_end, + break :blk min_len; }; - - var second: ArrayList(u8) = .empty; - defer second.deinit(alloc); - var third: ArrayList(u8) = .empty; - defer third.deinit(alloc); - var fourth: ?ArrayList(u8) = null; - defer if (fourth) |*f| f.deinit(alloc); - - sw: switch (@as(States, .before_second)) { - .before_second => { - // Drop whitespace - const byte = try in.peekByte(); - if (isWhitespace(byte)) { - in.toss(1); - continue :sw .before_second; - } - continue :sw .in_second; - }, - .in_second => { - for (1..in.buffer.len) |i| { - try in.fill(i + 1); - if (isWhitespace(in.buffered()[i])) { - @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); - in.toss(i); - break; - } - } else return error.EndOfStream; - continue :sw .after_second; - }, - .after_second => { - const byte = try in.peekByte(); - if (byte == '\r') { - continue :sw .in_end; - } else if (isWhitespace(byte)) { - in.toss(1); - continue :sw .after_second; + std.debug.assert(in.buffer.len >= longest_ctrl); + while (true) { + var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); + if (iter.next()) |str| { + if (client_control.get(str)) |ctrl| { + in.toss(str.len); + return ctrl; + } else if (str.len >= longest_ctrl) { + return error.InvalidControl; } - third = .empty; - continue :sw .in_third; - }, - .in_third => { - for (1..in.buffer.len) |i| { - try in.fill(i + 1); - if (isWhitespace(in.buffered()[i])) { - @memcpy(try third.addManyAsSlice(alloc, i), in.buffered()[0..i]); - in.toss(i); - break; - } - } else return error.EndOfStream; - continue :sw .after_third; - }, - .after_third => { - const byte = try in.peekByte(); - if (byte == '\r') { - continue :sw .in_end; - } else if (isWhitespace(byte)) { - in.toss(1); - continue :sw .after_third; - } - fourth = .empty; - continue :sw .in_fourth; - }, - .in_fourth => { - for (1..in.buffer.len) |i| { - try in.fill(i + 1); - if (isWhitespace(in.buffered()[i])) { - @memcpy(try fourth.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); - in.toss(i); - break; - } - } else return error.EndOfStream; - continue :sw .in_end; - }, - .in_end => { - try expectStreamBytes(in, "\r\n"); - }, - } - - const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize = - if (fourth) |f| .{ - try alloc.dupe(u8, second.items), - try parseUnsigned(usize, third.items, 10), - try parseUnsigned(usize, f.items, 10), - } else .{ - null, - try parseUnsigned(usize, second.items, 10), - try parseUnsigned(usize, third.items, 10), - }; - - const payload: Payload = try .read(alloc, in, total_bytes); - errdefer payload.deinit(alloc); - try expectStreamBytes(in, "\r\n"); - - return .{ - .HPUB = .{ - .header_bytes = header_bytes, - .@"pub" = .{ - .subject = subject, - .payload = payload, - .reply_to = reply_to, - }, - }, - }; -} - -fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 { - var subject_list: ArrayList(u8) = .empty; - errdefer subject_list.deinit(alloc); - - // Handle the first character - { - const byte = try in.takeByte(); - if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>'))) - return error.InvalidStream; - - try subject_list.append(alloc, byte); - } - - switch (pub_or_sub) { - .sub => { - while (in.takeByte()) |byte| { - if (isWhitespace(byte)) break; - if (byte == '.') { - const next_byte = try in.peekByte(); - if (next_byte == '.' or isWhitespace(next_byte)) - return error.InvalidStream; - } else if (byte == '>') { - const next_byte = try in.takeByte(); - if (!isWhitespace(next_byte)) - return error.InvalidStream; - } else if (byte == '*') { - const next_byte = try in.peekByte(); - if (next_byte != '.' and !isWhitespace(next_byte)) - return error.InvalidStream; - } - try subject_list.append(alloc, byte); - } else |err| return err; - }, - .@"pub" => { - while (in.takeByte()) |byte| { - if (isWhitespace(byte)) break; - if (byte == '*' or byte == '>') return error.InvalidStream; - if (byte == '.') { - const next_byte = try in.peekByte(); - if (next_byte == '.' or isWhitespace(next_byte)) - return error.InvalidStream; - } - try subject_list.append(alloc, byte); - } else |err| return err; - }, - } - - return subject_list.toOwnedSlice(alloc); -} - -inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void { - if (!std.mem.eql(u8, try reader.take(expected.len), expected)) { - @branchHint(.unlikely); - return error.InvalidStream; + } + try in.fillMore(); } } -test sub { - const alloc = std.testing.allocator; - const expectEqualDeep = std.testing.expectEqualDeep; +test control { { - var in: Reader = .fixed(" foo 1\r\n"); - var res = try sub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "foo", - .queue_group = null, - .sid = "1", - }, - }, - res, - ); + var buf: [7]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "PUB " }, + }); + try std.testing.expectEqual(.PUB, try control(&in.interface)); + try std.testing.expectEqualSlices(u8, " ", in.interface.buffered()); } { - var in: Reader = .fixed(" foo 1\r\n"); - var res = try sub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "foo", - .queue_group = null, - .sid = "1", - }, - }, - res, - ); + var buf: [7]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "PUB" }, + }); + try std.testing.expectEqual(.PUB, try control(&in.interface)); + try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); } { - var in: Reader = .fixed(" foo q 1\r\n"); - var res = try sub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "foo", - .queue_group = "q", - .sid = "1", - }, - }, - res, - ); + var buf: [7]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "PU" }, + .{ .buffer = "B" }, + }); + try std.testing.expectEqual(.PUB, try control(&in.interface)); + try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); } { - var in: Reader = .fixed(" 1 q 1\r\n"); - var res = try sub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "1", - .queue_group = "q", - .sid = "1", - }, - }, - res, - ); + var buf: [7]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "PIN" }, + .{ .buffer = "G\r\n" }, + }); + try std.testing.expectEqual(.PING, try control(&in.interface)); + try std.testing.expectEqualSlices(u8, "\r\n", in.interface.buffered()); } { - var in: Reader = .fixed(" $SRV.PING 4\r\n"); - var res = try sub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "$SRV.PING", - .queue_group = null, - .sid = "4", - }, - }, - res, - ); + var buf: [7]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "CONNECT" }, + }); + try std.testing.expectEqual(.CONNECT, try control(&in.interface)); } { - var in: Reader = .fixed(" foo.echo q 10\r\n"); - var res = try sub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "foo.echo", - .queue_group = "q", - .sid = "10", - }, - }, - res, - ); + var buf: [7]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "CONNECC" }, + }); + try std.testing.expectError(error.InvalidControl, control(&in.interface)); } } -test unsub { - const alloc = std.testing.allocator; - const expectEqualDeep = std.testing.expectEqualDeep; - const expectEqual = std.testing.expectEqual; - { - var in: Reader = .fixed(" 1\r\n"); - var res = try unsub(alloc, &in); - defer res.UNSUB.deinit(alloc); - try expectEqualDeep( - Message{ - .UNSUB = .{ - .sid = "1", - .max_msgs = null, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); - } +/// The return value is owned by the reader passed to this function. +/// Operations that modify the readers buffer invalidates this value. +pub fn @"pub"(in: *Reader) !Message.Pub { + // TODO: Add pedantic option. + // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1 + + while (true) { + var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); + + if (iter.next()) |subject| { + if (iter.next()) |second| { + if (in.buffered()[iter.index] == '\r') { + const bytes_str = second; + const bytes = try parseUnsigned(usize, bytes_str, 10); + + // 4 bytes for CRLF on either side of the payload. + in.toss(iter.index + bytes + 4); + return .{ + .subject = subject, + .reply_to = null, + .payload = iter.rest()[1 .. 1 + bytes], + }; + } - { - var in: Reader = .fixed(" 1 1\r\n"); - var res = try unsub(alloc, &in); - defer res.UNSUB.deinit(alloc); - try expectEqualDeep( - Message{ - .UNSUB = .{ - .sid = "1", - .max_msgs = 1, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); + const reply_to = second; + if (iter.next()) |bytes_str| { + const bytes = try parseUnsigned(usize, bytes_str, 10); + + if (in.buffered()[iter.index] == '\r') { + if (iter.rest().len > bytes) { + // 4 bytes for CRLF on either side of the payload. + in.toss(iter.index + bytes + 4); + return .{ + .subject = subject, + .reply_to = reply_to, + .payload = iter.rest()[1 .. 1 + bytes], + }; + } + } + } + } + } + + try in.fillMore(); } } test @"pub" { - const alloc = std.testing.allocator; - const expectEqualDeep = std.testing.expectEqualDeep; - const expectEqual = std.testing.expectEqual; { - var in: Reader = .fixed(" foo 3\r\nbar\r\n"); - var res = try @"pub"(alloc, &in); - defer res.PUB.deinit(alloc); - try expectEqualDeep( - Message{ - .PUB = .{ - .subject = "foo", - .reply_to = null, - .payload = .{ - .len = 3, - .short = blk: { - var s: [128]u8 = undefined; - @memcpy(s[0..3], "bar"); - break :blk s; - }, - .long = null, - }, - }, + var buf: [64]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "foo bar 2\r\nhi\r\n" }, + }); + try std.testing.expectEqualDeep( + Message.Pub{ + .subject = "foo", + .reply_to = "bar", + .payload = "hi", }, - res, + try @"pub"(&in.interface), ); - try expectEqual(0, in.buffered().len); + try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); } - { - var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n"); - var res = try @"pub"(alloc, &in); - defer res.PUB.deinit(alloc); - try expectEqualDeep( - Message{ - .PUB = .{ - .subject = "foo", - .reply_to = "reply.to", - .payload = .{ - .len = 3, - .short = blk: { - var s: [128]u8 = undefined; - @memcpy(s[0..3], "bar"); - break :blk s; - }, - .long = null, - }, - }, + var buf: [64]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "foo 2\r\nhi\r\n" }, + }); + try std.testing.expectEqualDeep( + Message.Pub{ + .subject = "foo", + .reply_to = null, + .payload = "hi", }, - res, + try @"pub"(&in.interface), ); - try expectEqual(0, in.buffered().len); - } - - // numeric reply subject - { - var in: Reader = .fixed(" foo 5 3\r\nbar\r\n"); - var res = try @"pub"(alloc, &in); - defer res.PUB.deinit(alloc); - try expectEqualDeep( - Message{ - .PUB = .{ - .subject = "foo", - .reply_to = "5", - .payload = .{ - .len = 3, - .short = blk: { - var s: [128]u8 = undefined; - @memcpy(s[0..3], "bar"); - break :blk s; - }, - .long = null, - }, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); + try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); } } -test hpub { - const alloc = std.testing.allocator; - const expectEqualDeep = std.testing.expectEqualDeep; - const expectEqual = std.testing.expectEqual; - { - var in: Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); - var res = try hpub(alloc, &in); - defer res.HPUB.deinit(alloc); - try expectEqualDeep( - Message{ - .HPUB = .{ - .header_bytes = 22, - .@"pub" = .{ - .subject = "foo", - .reply_to = null, - .payload = .{ - .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, - .short = blk: { - var s: [128]u8 = undefined; - const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; - @memcpy(s[0..str.len], str); - break :blk s; - }, - .long = null, - }, - }, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); - } - - { - var in: Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); - var res = try hpub(alloc, &in); - defer res.HPUB.deinit(alloc); - try expectEqualDeep( - Message{ - .HPUB = .{ - .header_bytes = 22, - .@"pub" = .{ - .subject = "foo", - .reply_to = "reply.to", - .payload = .{ - .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, - .short = blk: { - var s: [128]u8 = undefined; - const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; - @memcpy(s[0..str.len], str); - break :blk s; - }, - .long = null, - }, - }, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); - } - - { - var in: Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); - var res = try hpub(alloc, &in); - defer res.HPUB.deinit(alloc); - try expectEqualDeep( - Message{ - .HPUB = .{ - .header_bytes = 22, - .@"pub" = .{ - .subject = "foo", - .reply_to = "6", - .payload = .{ - .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, - .short = blk: { - var s: [128]u8 = undefined; - const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; - @memcpy(s[0..str.len], str); - break :blk s; - }, - .long = null, - }, - }, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); - } -} +// /// Get the next Message from the input stream. +// pub fn next(alloc: Allocator, in: *Reader) !Message { +// errdefer log.err("Failed to parse {s}", .{operation_string.items}); + +// switch (operation) { +// .CONNECT => return connect(alloc, in), +// .PUB => { +// @branchHint(.likely); +// return @"pub"(alloc, in); +// }, +// .HPUB => { +// @branchHint(.likely); +// return hpub(alloc, in); +// }, +// .PING => { +// try expectStreamBytes(in, "\r\n"); +// return .PING; +// }, +// .PONG => { +// try expectStreamBytes(in, "\r\n"); +// return .PONG; +// }, +// .SUB => return sub(alloc, in), +// .UNSUB => return unsub(alloc, in), +// else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}), +// } +// } + +// pub fn connect(alloc: Allocator, in: *Reader) !Message { +// // for storing the json string +// var connect_string_writer_allocating: AllocatingWriter = .init(alloc); +// defer connect_string_writer_allocating.deinit(); +// var connect_string_writer = &connect_string_writer_allocating.writer; + +// // for parsing the json string +// var connect_arena_allocator: ArenaAllocator = .init(alloc); +// defer connect_arena_allocator.deinit(); +// const connect_allocator = connect_arena_allocator.allocator(); + +// try in.discardAll(1); // throw away space + +// // Should read the next JSON object to the fixed buffer writer. +// _ = try in.streamDelimiter(connect_string_writer, '}'); +// try connect_string_writer.writeByte('}'); +// try expectStreamBytes(in, "}\r\n"); // discard '}\r\n' + +// const connect_str = try connect_string_writer_allocating.toOwnedSlice(); +// defer alloc.free(connect_str); +// const res = try std.json.parseFromSliceLeaky( +// Message.Connect, +// connect_allocator, +// connect_str, +// .{ .allocate = .alloc_always }, +// ); + +// return .{ .CONNECT = try res.dupe(alloc) }; +// } + +// pub fn sub(alloc: Allocator, in: *Reader) !Message { +// try in.discardAll(1); // throw away space +// const subject = try readSubject(alloc, in, .sub); + +// const States = enum { +// before_second, +// in_second, +// after_second, +// in_third, +// in_end, +// }; + +// var second: ArrayList(u8) = .empty; +// errdefer second.deinit(alloc); +// var third: ?ArrayList(u8) = null; +// errdefer if (third) |*t| t.deinit(alloc); + +// sw: switch (@as(States, .before_second)) { +// .before_second => { +// const byte = try in.peekByte(); +// if (isWhitespace(byte)) { +// in.toss(1); +// continue :sw .before_second; +// } +// continue :sw .in_second; +// }, +// .in_second => { +// for (1..in.buffer.len) |i| { +// try in.fill(i + 1); +// if (isWhitespace(in.buffered()[i])) { +// @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); +// in.toss(i); +// break; +// } +// } else return error.EndOfStream; +// continue :sw .after_second; +// }, +// .after_second => { +// const byte = try in.peekByte(); +// if (byte == '\r') { +// continue :sw .in_end; +// } else if (isWhitespace(byte)) { +// in.toss(1); +// continue :sw .after_second; +// } +// third = .empty; +// continue :sw .in_third; +// }, +// .in_third => { +// for (1..in.buffer.len) |i| { +// try in.fill(i + 1); +// if (isWhitespace(in.buffered()[i])) { +// @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); +// in.toss(i); +// break; +// } +// } else return error.EndOfStream; +// continue :sw .in_end; +// }, +// .in_end => { +// try expectStreamBytes(in, "\r\n"); +// }, +// } + +// return .{ +// .SUB = .{ +// .subject = subject, +// .queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null, +// .sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc), +// }, +// }; +// } + +// pub fn unsub(alloc: Allocator, in: *Reader) !Message { +// const States = enum { +// before_first, +// in_first, +// after_first, +// in_second, +// in_end, +// }; + +// var first: ArrayList(u8) = .empty; +// errdefer first.deinit(alloc); +// var second: ?ArrayList(u8) = null; +// defer if (second) |*s| s.deinit(alloc); + +// sw: switch (@as(States, .before_first)) { +// .before_first => { +// const byte = try in.peekByte(); +// if (isWhitespace(byte)) { +// in.toss(1); +// continue :sw .before_first; +// } +// continue :sw .in_first; +// }, +// .in_first => { +// const byte = try in.peekByte(); +// if (!isWhitespace(byte)) { +// try first.append(alloc, byte); +// in.toss(1); +// continue :sw .in_first; +// } +// continue :sw .after_first; +// }, +// .after_first => { +// const byte = try in.peekByte(); +// if (byte == '\r') { +// continue :sw .in_end; +// } else if (isWhitespace(byte)) { +// in.toss(1); +// continue :sw .after_first; +// } +// second = .empty; +// continue :sw .in_second; +// }, +// .in_second => { +// const byte = try in.peekByte(); +// if (byte == '\r') { +// continue :sw .in_end; +// } +// try second.?.append(alloc, byte); +// in.toss(1); +// continue :sw .in_second; +// }, +// .in_end => { +// try expectStreamBytes(in, "\r\n"); +// }, +// } + +// return .{ +// .UNSUB = .{ +// .sid = try first.toOwnedSlice(alloc), +// .max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null, +// }, +// }; +// } + +// pub fn @"pub"(alloc: Allocator, in: *Reader) !Message { +// try in.discardAll(1); // throw away space + +// // Parse subject +// const subject: []const u8 = try readSubject(alloc, in, .@"pub"); +// errdefer alloc.free(subject); + +// const States = enum { +// before_second, +// in_second, +// after_second, +// in_third, +// in_end, +// }; + +// var second: ArrayList(u8) = .empty; +// defer second.deinit(alloc); +// var third: ?ArrayList(u8) = null; +// defer if (third) |*t| t.deinit(alloc); + +// sw: switch (@as(States, .before_second)) { +// .before_second => { +// // Drop whitespace +// const byte = try in.peekByte(); +// if (isWhitespace(byte)) { +// in.toss(1); +// continue :sw .before_second; +// } +// continue :sw .in_second; +// }, +// .in_second => { +// for (1..in.buffer.len) |i| { +// try in.fill(i + 1); +// if (isWhitespace(in.buffered()[i])) { +// @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); +// in.toss(i); +// break; +// } +// } else return error.EndOfStream; +// continue :sw .after_second; +// }, +// .after_second => { +// const byte = try in.peekByte(); +// if (byte == '\r') { +// continue :sw .in_end; +// } else if (isWhitespace(byte)) { +// in.toss(1); +// continue :sw .after_second; +// } +// third = .empty; +// continue :sw .in_third; +// }, +// .in_third => { +// for (1..in.buffer.len) |i| { +// try in.fill(i + 1); +// if (isWhitespace(in.buffered()[i])) { +// @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); +// in.toss(i); +// break; +// } +// } else return error.EndOfStream; +// continue :sw .in_end; +// }, +// .in_end => { +// try expectStreamBytes(in, "\r\n"); +// }, +// } + +// const reply_to: ?[]const u8, const bytes: usize = +// if (third) |t| .{ +// try alloc.dupe(u8, second.items), +// try parseUnsigned(usize, t.items, 10), +// } else .{ +// null, +// try parseUnsigned(usize, second.items, 10), +// }; + +// const payload: Payload = try .read(alloc, in, bytes); +// errdefer payload.deinit(alloc); +// try expectStreamBytes(in, "\r\n"); + +// return .{ +// .PUB = .{ +// .subject = subject, +// .payload = payload, +// .reply_to = reply_to, +// }, +// }; +// } + +// pub fn hpub(alloc: Allocator, in: *Reader) !Message { +// try in.discardAll(1); // throw away space + +// // Parse subject +// const subject: []const u8 = try readSubject(alloc, in, .@"pub"); +// errdefer alloc.free(subject); + +// const States = enum { +// before_second, +// in_second, +// after_second, +// in_third, +// after_third, +// in_fourth, +// in_end, +// }; + +// var second: ArrayList(u8) = .empty; +// defer second.deinit(alloc); +// var third: ArrayList(u8) = .empty; +// defer third.deinit(alloc); +// var fourth: ?ArrayList(u8) = null; +// defer if (fourth) |*f| f.deinit(alloc); + +// sw: switch (@as(States, .before_second)) { +// .before_second => { +// // Drop whitespace +// const byte = try in.peekByte(); +// if (isWhitespace(byte)) { +// in.toss(1); +// continue :sw .before_second; +// } +// continue :sw .in_second; +// }, +// .in_second => { +// for (1..in.buffer.len) |i| { +// try in.fill(i + 1); +// if (isWhitespace(in.buffered()[i])) { +// @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); +// in.toss(i); +// break; +// } +// } else return error.EndOfStream; +// continue :sw .after_second; +// }, +// .after_second => { +// const byte = try in.peekByte(); +// if (byte == '\r') { +// continue :sw .in_end; +// } else if (isWhitespace(byte)) { +// in.toss(1); +// continue :sw .after_second; +// } +// third = .empty; +// continue :sw .in_third; +// }, +// .in_third => { +// for (1..in.buffer.len) |i| { +// try in.fill(i + 1); +// if (isWhitespace(in.buffered()[i])) { +// @memcpy(try third.addManyAsSlice(alloc, i), in.buffered()[0..i]); +// in.toss(i); +// break; +// } +// } else return error.EndOfStream; +// continue :sw .after_third; +// }, +// .after_third => { +// const byte = try in.peekByte(); +// if (byte == '\r') { +// continue :sw .in_end; +// } else if (isWhitespace(byte)) { +// in.toss(1); +// continue :sw .after_third; +// } +// fourth = .empty; +// continue :sw .in_fourth; +// }, +// .in_fourth => { +// for (1..in.buffer.len) |i| { +// try in.fill(i + 1); +// if (isWhitespace(in.buffered()[i])) { +// @memcpy(try fourth.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); +// in.toss(i); +// break; +// } +// } else return error.EndOfStream; +// continue :sw .in_end; +// }, +// .in_end => { +// try expectStreamBytes(in, "\r\n"); +// }, +// } + +// const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize = +// if (fourth) |f| .{ +// try alloc.dupe(u8, second.items), +// try parseUnsigned(usize, third.items, 10), +// try parseUnsigned(usize, f.items, 10), +// } else .{ +// null, +// try parseUnsigned(usize, second.items, 10), +// try parseUnsigned(usize, third.items, 10), +// }; + +// const payload: Payload = try .read(alloc, in, total_bytes); +// errdefer payload.deinit(alloc); +// try expectStreamBytes(in, "\r\n"); + +// return .{ +// .HPUB = .{ +// .header_bytes = header_bytes, +// .@"pub" = .{ +// .subject = subject, +// .payload = payload, +// .reply_to = reply_to, +// }, +// }, +// }; +// } + +// fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 { +// var subject_list: ArrayList(u8) = .empty; +// errdefer subject_list.deinit(alloc); + +// // Handle the first character +// { +// const byte = try in.takeByte(); +// if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>'))) +// return error.InvalidStream; + +// try subject_list.append(alloc, byte); +// } + +// switch (pub_or_sub) { +// .sub => { +// while (in.takeByte()) |byte| { +// if (isWhitespace(byte)) break; +// if (byte == '.') { +// const next_byte = try in.peekByte(); +// if (next_byte == '.' or isWhitespace(next_byte)) +// return error.InvalidStream; +// } else if (byte == '>') { +// const next_byte = try in.takeByte(); +// if (!isWhitespace(next_byte)) +// return error.InvalidStream; +// } else if (byte == '*') { +// const next_byte = try in.peekByte(); +// if (next_byte != '.' and !isWhitespace(next_byte)) +// return error.InvalidStream; +// } +// try subject_list.append(alloc, byte); +// } else |err| return err; +// }, +// .@"pub" => { +// while (in.takeByte()) |byte| { +// if (isWhitespace(byte)) break; +// if (byte == '*' or byte == '>') return error.InvalidStream; +// if (byte == '.') { +// const next_byte = try in.peekByte(); +// if (next_byte == '.' or isWhitespace(next_byte)) +// return error.InvalidStream; +// } +// try subject_list.append(alloc, byte); +// } else |err| return err; +// }, +// } + +// return subject_list.toOwnedSlice(alloc); +// } + +// inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void { +// if (!std.mem.eql(u8, try reader.take(expected.len), expected)) { +// @branchHint(.unlikely); +// return error.InvalidStream; +// } +// } + +// test sub { +// const alloc = std.testing.allocator; +// const expectEqualDeep = std.testing.expectEqualDeep; +// { +// var in: Reader = .fixed(" foo 1\r\n"); +// var res = try sub(alloc, &in); +// defer res.SUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .SUB = .{ +// .subject = "foo", +// .queue_group = null, +// .sid = "1", +// }, +// }, +// res, +// ); +// } +// { +// var in: Reader = .fixed(" foo 1\r\n"); +// var res = try sub(alloc, &in); +// defer res.SUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .SUB = .{ +// .subject = "foo", +// .queue_group = null, +// .sid = "1", +// }, +// }, +// res, +// ); +// } +// { +// var in: Reader = .fixed(" foo q 1\r\n"); +// var res = try sub(alloc, &in); +// defer res.SUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .SUB = .{ +// .subject = "foo", +// .queue_group = "q", +// .sid = "1", +// }, +// }, +// res, +// ); +// } +// { +// var in: Reader = .fixed(" 1 q 1\r\n"); +// var res = try sub(alloc, &in); +// defer res.SUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .SUB = .{ +// .subject = "1", +// .queue_group = "q", +// .sid = "1", +// }, +// }, +// res, +// ); +// } +// { +// var in: Reader = .fixed(" $SRV.PING 4\r\n"); +// var res = try sub(alloc, &in); +// defer res.SUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .SUB = .{ +// .subject = "$SRV.PING", +// .queue_group = null, +// .sid = "4", +// }, +// }, +// res, +// ); +// } +// { +// var in: Reader = .fixed(" foo.echo q 10\r\n"); +// var res = try sub(alloc, &in); +// defer res.SUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .SUB = .{ +// .subject = "foo.echo", +// .queue_group = "q", +// .sid = "10", +// }, +// }, +// res, +// ); +// } +// } + +// test unsub { +// const alloc = std.testing.allocator; +// const expectEqualDeep = std.testing.expectEqualDeep; +// const expectEqual = std.testing.expectEqual; +// { +// var in: Reader = .fixed(" 1\r\n"); +// var res = try unsub(alloc, &in); +// defer res.UNSUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .UNSUB = .{ +// .sid = "1", +// .max_msgs = null, +// }, +// }, +// res, +// ); +// try expectEqual(0, in.buffered().len); +// } + +// { +// var in: Reader = .fixed(" 1 1\r\n"); +// var res = try unsub(alloc, &in); +// defer res.UNSUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .UNSUB = .{ +// .sid = "1", +// .max_msgs = 1, +// }, +// }, +// res, +// ); +// try expectEqual(0, in.buffered().len); +// } +// } + +// test @"pub" { +// const alloc = std.testing.allocator; +// const expectEqualDeep = std.testing.expectEqualDeep; +// const expectEqual = std.testing.expectEqual; +// { +// var in: Reader = .fixed(" foo 3\r\nbar\r\n"); +// var res = try @"pub"(alloc, &in); +// defer res.PUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .PUB = .{ +// .subject = "foo", +// .reply_to = null, +// .payload = .{ +// .len = 3, +// .short = blk: { +// var s: [128]u8 = undefined; +// @memcpy(s[0..3], "bar"); +// break :blk s; +// }, +// .long = null, +// }, +// }, +// }, +// res, +// ); +// try expectEqual(0, in.buffered().len); +// } + +// { +// var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n"); +// var res = try @"pub"(alloc, &in); +// defer res.PUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .PUB = .{ +// .subject = "foo", +// .reply_to = "reply.to", +// .payload = .{ +// .len = 3, +// .short = blk: { +// var s: [128]u8 = undefined; +// @memcpy(s[0..3], "bar"); +// break :blk s; +// }, +// .long = null, +// }, +// }, +// }, +// res, +// ); +// try expectEqual(0, in.buffered().len); +// } + +// // numeric reply subject +// { +// var in: Reader = .fixed(" foo 5 3\r\nbar\r\n"); +// var res = try @"pub"(alloc, &in); +// defer res.PUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .PUB = .{ +// .subject = "foo", +// .reply_to = "5", +// .payload = .{ +// .len = 3, +// .short = blk: { +// var s: [128]u8 = undefined; +// @memcpy(s[0..3], "bar"); +// break :blk s; +// }, +// .long = null, +// }, +// }, +// }, +// res, +// ); +// try expectEqual(0, in.buffered().len); +// } +// } + +// test hpub { +// const alloc = std.testing.allocator; +// const expectEqualDeep = std.testing.expectEqualDeep; +// const expectEqual = std.testing.expectEqual; +// { +// var in: Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); +// var res = try hpub(alloc, &in); +// defer res.HPUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .HPUB = .{ +// .header_bytes = 22, +// .@"pub" = .{ +// .subject = "foo", +// .reply_to = null, +// .payload = .{ +// .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, +// .short = blk: { +// var s: [128]u8 = undefined; +// const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; +// @memcpy(s[0..str.len], str); +// break :blk s; +// }, +// .long = null, +// }, +// }, +// }, +// }, +// res, +// ); +// try expectEqual(0, in.buffered().len); +// } + +// { +// var in: Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); +// var res = try hpub(alloc, &in); +// defer res.HPUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .HPUB = .{ +// .header_bytes = 22, +// .@"pub" = .{ +// .subject = "foo", +// .reply_to = "reply.to", +// .payload = .{ +// .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, +// .short = blk: { +// var s: [128]u8 = undefined; +// const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; +// @memcpy(s[0..str.len], str); +// break :blk s; +// }, +// .long = null, +// }, +// }, +// }, +// }, +// res, +// ); +// try expectEqual(0, in.buffered().len); +// } + +// { +// var in: Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); +// var res = try hpub(alloc, &in); +// defer res.HPUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .HPUB = .{ +// .header_bytes = 22, +// .@"pub" = .{ +// .subject = "foo", +// .reply_to = "6", +// .payload = .{ +// .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, +// .short = blk: { +// var s: [128]u8 = undefined; +// const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; +// @memcpy(s[0..str.len], str); +// break :blk s; +// }, +// .long = null, +// }, +// }, +// }, +// }, +// res, +// ); +// try expectEqual(0, in.buffered().len); +// } +// } diff --git a/src/Server/serialize.zig b/src/Server/serialize.zig new file mode 100644 index 0000000..b79beca --- /dev/null +++ b/src/Server/serialize.zig @@ -0,0 +1,6 @@ +const std = @import("std"); + +const message = @import("./message.zig"); +const Message = message.Message; + +pub fn message(msg: Message) |
