From 96a3705069cf33a00ded143f876734c2a045cf1e Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Wed, 7 Jan 2026 17:26:10 -0500 Subject: starting zero alloc parsing --- src/Server/Client.zig | 192 +++-- src/Server/Payload.zig | 51 ++ src/Server/message.zig | 182 +++++ src/Server/parse.zig | 1653 +++++++++++++++++++++++------------------- src/Server/parse/Payload.zig | 51 -- src/Server/parse/message.zig | 208 ------ src/Server/serialize.zig | 6 + 7 files changed, 1216 insertions(+), 1127 deletions(-) create mode 100644 src/Server/Payload.zig create mode 100644 src/Server/message.zig delete mode 100644 src/Server/parse/Payload.zig delete mode 100644 src/Server/parse/message.zig create mode 100644 src/Server/serialize.zig (limited to 'src/Server') diff --git a/src/Server/Client.zig b/src/Server/Client.zig index 9ec928c..047f38d 100644 --- a/src/Server/Client.zig +++ b/src/Server/Client.zig @@ -11,29 +11,23 @@ pub const Msgs = union(enum) { }; connect: ?Message.Connect, -// Used to own messages that we receive in our queues. -alloc: std.mem.Allocator, - -// Messages for this client to receive. -recv_queue: *Queue(Message), -msg_queue: *Queue(Msgs), +// Byte queue for this client to receive. +recv_queue: *Queue(u8), +// Only necessary to hold this lock for writing to the queue (to avoid interleaving message writes). +recv_queue_write_lock: std.Io.Mutex = .init, from_client: *std.Io.Reader, to_client: *std.Io.Writer, pub fn init( connect: ?Message.Connect, - alloc: std.mem.Allocator, - recv_queue: *Queue(Message), - msg_queue: *Queue(Msgs), + recv_queue: *Queue(u8), in: *std.Io.Reader, out: *std.Io.Writer, ) Client { return .{ .connect = connect, - .alloc = alloc, .recv_queue = recv_queue, - .msg_queue = msg_queue, .from_client = in, .to_client = out, }; @@ -47,97 +41,91 @@ pub fn deinit(self: *Client, alloc: std.mem.Allocator) void { } pub fn start(self: *Client, io: std.Io) !void { - var msgs_buf: [1024]Msgs = undefined; - - var recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch @panic("Concurrency unavailable"); - errdefer _ = recv_msgs_task.cancel(io) catch {}; - - var recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; - errdefer _ = recv_proto_task.cancel(io) catch {}; - + std.debug.assert(self.to_client.buffer.len > 0); + std.debug.assert(self.to_client.end == 0); while (true) { - switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) { - .msgs => |len_err| { - @branchHint(.likely); - const msgs = msgs_buf[0..try len_err]; - for (0..msgs.len) |i| { - const msg = msgs[i]; - defer switch (msg) { - .MSG => |m| m.deinit(self.alloc), - .HMSG => |h| h.deinit(self.alloc), - }; - errdefer for (msgs[i + 1 ..]) |mg| switch (mg) { - .MSG => |m| { - m.deinit(self.alloc); - }, - .HMSG => |h| { - h.deinit(self.alloc); - }, - }; - switch (msg) { - .MSG => |m| { - try self.to_client.print( - "MSG {s} {s} {s} {d}\r\n", - .{ - m.subject, - m.sid, - m.reply_to orelse "", - m.payload.len, - }, - ); - try m.payload.write(self.to_client); - try self.to_client.print("\r\n", .{}); - }, - .HMSG => |hmsg| { - try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{ - hmsg.msg.subject, - hmsg.msg.sid, - hmsg.msg.reply_to orelse "", - hmsg.header_bytes, - hmsg.msg.payload.len, - }); - try hmsg.msg.payload.write(self.to_client); - try self.to_client.print("\r\n", .{}); - }, - } - } - recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable; - }, - .proto => |msg_err| { - @branchHint(.unlikely); - const msg = try msg_err; - switch (msg) { - .@"+OK" => { - _ = try self.to_client.write("+OK\r\n"); - }, - .PONG => { - _ = try self.to_client.write("PONG\r\n"); - }, - .INFO => |info| { - _ = try self.to_client.write("INFO "); - try std.json.Stringify.value(info, .{}, self.to_client); - _ = try self.to_client.write("\r\n"); - }, - .@"-ERR" => |s| { - _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); - }, - else => |m| { - std.debug.panic("unimplemented write: {any}\n", .{m}); - }, - } - recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; - }, - } + self.to_client.end = try self.recv_queue.get(io, self.to_client.buffer, 1); try self.to_client.flush(); } + // while (true) { + // switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) { + // .msgs => |len_err| { + // @branchHint(.likely); + // const msgs = msgs_buf[0..try len_err]; + // for (0..msgs.len) |i| { + // const msg = msgs[i]; + // defer switch (msg) { + // .MSG => |m| m.deinit(self.alloc), + // .HMSG => |h| h.deinit(self.alloc), + // }; + // errdefer for (msgs[i + 1 ..]) |mg| switch (mg) { + // .MSG => |m| { + // m.deinit(self.alloc); + // }, + // .HMSG => |h| { + // h.deinit(self.alloc); + // }, + // }; + // switch (msg) { + // .MSG => |m| { + // try self.to_client.print( + // "MSG {s} {s} {s} {d}\r\n", + // .{ + // m.subject, + // m.sid, + // m.reply_to orelse "", + // m.payload.len, + // }, + // ); + // try m.payload.write(self.to_client); + // try self.to_client.print("\r\n", .{}); + // }, + // .HMSG => |hmsg| { + // try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{ + // hmsg.msg.subject, + // hmsg.msg.sid, + // hmsg.msg.reply_to orelse "", + // hmsg.header_bytes, + // hmsg.msg.payload.len, + // }); + // try hmsg.msg.payload.write(self.to_client); + // try self.to_client.print("\r\n", .{}); + // }, + // } + // } + // recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable; + // }, + // .proto => |msg_err| { + // @branchHint(.unlikely); + // const msg = try msg_err; + // switch (msg) { + // .@"+OK" => { + // _ = try self.to_client.write("+OK\r\n"); + // }, + // .PONG => { + // _ = try self.to_client.write("PONG\r\n"); + // }, + // .INFO => |info| { + // _ = try self.to_client.write("INFO "); + // try std.json.Stringify.value(info, .{}, self.to_client); + // _ = try self.to_client.write("\r\n"); + // }, + // .@"-ERR" => |s| { + // _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); + // }, + // else => |m| { + // std.debug.panic("unimplemented write: {any}\n", .{m}); + // }, + // } + // recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; + // }, + // } + // try self.to_client.flush(); + // } } -pub fn send(self: *Client, io: std.Io, msg: Message) !void { - switch (msg) { - .MSG => |m| try self.msg_queue.putOne(io, .{ .MSG = m }), - .HMSG => |m| try self.msg_queue.putOne(io, .{ .HMSG = m }), - else => try self.recv_queue.putOne(io, msg), - } +pub fn send(self: *Client, io: std.Io, msg: []const u8) !void { + try self.recv_queue.putAll(io, msg); } test send { @@ -148,19 +136,15 @@ test send { var buf: [1024]u8 = undefined; break :blk &buf; }); - var recv_queue: Queue(Message) = .init(&.{}); - var msgs_queue: Queue(Msgs) = .init(blk: { - var buf: [1]Msgs = undefined; - break :blk &buf; - }); - var client: Client = .init(null, gpa, &recv_queue, &msgs_queue, undefined, &to_client); + var recv_queue: Queue(u8) = .init(&.{}); + var client: Client = .init(null, &recv_queue, undefined, &to_client); defer client.deinit(gpa); var c_task = try io.concurrent(Client.start, .{ &client, io }); defer c_task.cancel(io) catch {}; { - try client.send(io, .PONG); + try client.send(io, "PONG\r\n"); // Wait for the concurrent client task to write to the writer try io.sleep(.fromMilliseconds(1), .awake); try std.testing.expectEqualSlices(u8, "PONG\r\n", to_client.buffered()); diff --git a/src/Server/Payload.zig b/src/Server/Payload.zig new file mode 100644 index 0000000..b512a81 --- /dev/null +++ b/src/Server/Payload.zig @@ -0,0 +1,51 @@ +const std = @import("std"); +const Reader = std.Io.Reader; +const Writer = std.Io.Writer; +const Allocator = std.mem.Allocator; + +const Payload = @This(); + +len: u32, +short: [128]u8, +long: ?[]u8, + +pub fn read(alloc: Allocator, in: *Reader, bytes: usize) !Payload { + var res: Payload = .{ + .len = @intCast(bytes), + .short = undefined, + .long = null, + }; + + try in.readSliceAll(res.short[0..@min(bytes, res.short.len)]); + if (bytes > res.short.len) { + const long = try alloc.alloc(u8, bytes - res.short.len); + errdefer alloc.free(long); + try in.readSliceAll(long); + res.long = long; + } + return res; +} + +pub fn write(self: Payload, out: *Writer) !void { + std.debug.assert(out.buffer.len >= self.short.len); + std.debug.assert(self.len <= self.short.len or self.long != null); + try out.writeAll(self.short[0..@min(self.len, self.short.len)]); + if (self.long) |l| { + try out.writeAll(l); + } +} + +pub fn deinit(self: Payload, alloc: Allocator) void { + if (self.long) |l| { + alloc.free(l); + } +} + +pub fn dupe(self: Payload, alloc: Allocator) !Payload { + var res = self; + if (self.long) |l| { + res.long = try alloc.dupe(u8, l); + } + errdefer if (res.long) |l| alloc.free(l); + return res; +} diff --git a/src/Server/message.zig b/src/Server/message.zig new file mode 100644 index 0000000..f22410f --- /dev/null +++ b/src/Server/message.zig @@ -0,0 +1,182 @@ +const std = @import("std"); +const ArrayList = std.ArrayList; +const Allocator = std.mem.Allocator; +const Reader = std.Io.Reader; + +const Payload = @import("Payload.zig"); + +pub const Control = @typeInfo(Message).@"union".tag_type.?; + +pub const Message = union(enum) { + INFO: ServerInfo, + CONNECT: Connect, + PUB: Pub, + HPUB: HPub, + SUB: Sub, + UNSUB: Unsub, + MSG: Msg, + HMSG: HMsg, + PING, + PONG, + @"+OK": void, + @"-ERR": []const u8, + pub const ServerInfo = struct { + /// The unique identifier of the NATS server. + server_id: []const u8, + /// The name of the NATS server. + server_name: []const u8, + /// The version of NATS. + version: []const u8, + /// The version of golang the NATS server was built with. + go: []const u8 = "0.0.0", + /// The IP address used to start the NATS server, + /// by default this will be 0.0.0.0 and can be + /// configured with -client_advertise host:port. + host: []const u8 = "0.0.0.0", + /// The port number the NATS server is configured + /// to listen on. + port: u16 = 4222, + /// Whether the server supports headers. + headers: bool = false, + /// Maximum payload size, in bytes, that the server + /// will accept from the client. + max_payload: u64, + /// An integer indicating the protocol version of + /// the server. The server version 1.2.0 sets this + /// to 1 to indicate that it supports the "Echo" + /// feature. + proto: u32 = 1, + }; + pub const Connect = struct { + verbose: bool = false, + pedantic: bool = false, + tls_required: bool = false, + auth_token: ?[]const u8 = null, + user: ?[]const u8 = null, + pass: ?[]const u8 = null, + name: ?[]const u8 = null, + lang: []const u8, + version: []const u8, + protocol: u32, + echo: ?bool = null, + sig: ?[]const u8 = null, + jwt: ?[]const u8 = null, + no_responders: ?bool = null, + headers: ?bool = null, + nkey: ?[]const u8 = null, + + pub fn deinit(self: Connect, alloc: Allocator) void { + if (self.auth_token) |a| alloc.free(a); + if (self.user) |u| alloc.free(u); + if (self.pass) |p| alloc.free(p); + if (self.name) |n| alloc.free(n); + alloc.free(self.lang); + alloc.free(self.version); + if (self.sig) |s| alloc.free(s); + if (self.jwt) |j| alloc.free(j); + if (self.nkey) |n| alloc.free(n); + } + + pub fn dupe(self: Connect, alloc: Allocator) !Connect { + var res = self; + res.auth_token = if (self.auth_token) |a| try alloc.dupe(u8, a) else null; + errdefer if (res.auth_token) |a| alloc.free(a); + res.user = if (self.user) |u| try alloc.dupe(u8, u) else null; + errdefer if (res.user) |u| alloc.free(u); + res.pass = if (self.pass) |p| try alloc.dupe(u8, p) else null; + errdefer if (res.pass) |p| alloc.free(p); + res.name = if (self.name) |n| try alloc.dupe(u8, n) else null; + errdefer if (res.name) |n| alloc.free(n); + res.lang = try alloc.dupe(u8, self.lang); + errdefer alloc.free(res.lang); + res.version = try alloc.dupe(u8, self.version); + errdefer alloc.free(res.version); + res.sig = if (self.sig) |s| try alloc.dupe(u8, s) else null; + errdefer if (res.sig) |s| alloc.free(s); + res.jwt = if (self.jwt) |j| try alloc.dupe(u8, j) else null; + errdefer if (res.jwt) |j| alloc.free(j); + res.nkey = if (self.nkey) |n| try alloc.dupe(u8, n) else null; + errdefer if (res.nkey) |n| alloc.free(n); + return res; + } + }; + pub const Pub = struct { + /// The destination subject to publish to. + subject: []const u8, + /// The reply subject that subscribers can use to send a response back to the publisher/requestor. + reply_to: ?[]const u8 = null, + /// The message payload data. + payload: []const u8, + + pub fn toMsg(self: Pub, alloc: Allocator, sid: []const u8) !Msg { + const res: Msg = .{ + .subject = self.subject, + .sid = sid, + .reply_to = self.reply_to, + .payload = self.payload, + }; + return res.dupe(alloc); + } + }; + pub const HPub = struct { + header_bytes: usize, + @"pub": Pub, + + pub fn deinit(self: HPub, alloc: Allocator) void { + self.@"pub".deinit(alloc); + } + + pub fn toHMsg(self: HPub, alloc: Allocator, sid: []const u8) !HMsg { + return .{ + .header_bytes = self.header_bytes, + .msg = try self.@"pub".toMsg(alloc, sid), + }; + } + }; + + pub const HMsg = struct { + header_bytes: usize, + msg: Msg, + + pub fn deinit(self: HMsg, alloc: Allocator) void { + self.msg.deinit(alloc); + } + + pub fn dupe(self: HMsg, alloc: Allocator) !HMsg { + var res = self; + res.msg = try self.msg.dupe(alloc); + errdefer alloc.free(res.msg); + return res; + } + }; + pub const Sub = struct { + /// The subject name to subscribe to. + subject: []const u8, + /// If specified, the subscriber will join this queue group. + queue_group: ?[]const u8, + /// A unique alphanumeric subscription ID, generated by the client. + sid: []const u8, + + pub fn deinit(self: Sub, alloc: Allocator) void { + alloc.free(self.subject); + alloc.free(self.sid); + if (self.queue_group) |q| alloc.free(q); + } + }; + pub const Unsub = struct { + /// The unique alphanumeric subscription ID of the subject to unsubscribe from. + sid: []const u8, + /// A number of messages to wait for before automatically unsubscribing. + max_msgs: ?usize = null, + + pub fn deinit(self: Unsub, alloc: Allocator) void { + alloc.free(self.sid); + } + }; + pub const Msg = struct { + subject: []const u8, + sid: []const u8, + reply_to: ?[]const u8, + payload: []const u8, + }; +}; diff --git a/src/Server/parse.zig b/src/Server/parse.zig index 70dd2bc..6e013c4 100644 --- a/src/Server/parse.zig +++ b/src/Server/parse.zig @@ -15,9 +15,9 @@ const isWhitespace = std.ascii.isWhitespace; const parseUnsigned = std.fmt.parseUnsigned; -const message = @import("./parse/message.zig"); +const message = @import("./message.zig"); pub const Message = message.Message; -pub const Payload = @import("./parse/Payload.zig"); +pub const Payload = @import("./Payload.zig"); const client_control = StaticStringMap(message.Control).initComptime( .{ @@ -35,801 +35,926 @@ const client_control = StaticStringMap(message.Control).initComptime( // {"-ERR", .@"-err"}, }, ); -fn parseStaticStringMap(input: []const u8) ?message.Control { - return client_control.get(input); -} - -/// Parse a string into its associated MessageType. -const parse = parseStaticStringMap; -/// Get the next Message from the input stream. -pub fn next(alloc: Allocator, in: *Reader) !Message { - var operation_string: ArrayList(u8) = blk: { - comptime var buf_len = 0; - comptime { - for (client_control.keys()) |key| { - buf_len = @max(buf_len, key.len); - } +pub fn control(in: *Reader) !message.Control { + const longest_ctrl = comptime blk: { + var min_len = 0; + for (client_control.keys()) |ctrl| { + min_len = @max(ctrl.len, min_len); } - var buf: [buf_len]u8 = undefined; - break :blk .initBuffer(&buf); - }; - - while (in.peekByte()) |byte| { - if (isUpper(byte)) { - try operation_string.appendBounded(byte); - in.toss(1); - } else break; - } else |err| return err; - - const operation = parse(operation_string.items) orelse { - log.err("Invalid operation: '{s}'", .{operation_string.items}); - return error.InvalidOperation; - }; - - errdefer log.err("Failed to parse {s}", .{operation_string.items}); - - switch (operation) { - .CONNECT => return connect(alloc, in), - .PUB => { - @branchHint(.likely); - return @"pub"(alloc, in); - }, - .HPUB => { - @branchHint(.likely); - return hpub(alloc, in); - }, - .PING => { - try expectStreamBytes(in, "\r\n"); - return .PING; - }, - .PONG => { - try expectStreamBytes(in, "\r\n"); - return .PONG; - }, - .SUB => return sub(alloc, in), - .UNSUB => return unsub(alloc, in), - else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}), - } -} - -pub fn connect(alloc: Allocator, in: *Reader) !Message { - // for storing the json string - var connect_string_writer_allocating: AllocatingWriter = .init(alloc); - defer connect_string_writer_allocating.deinit(); - var connect_string_writer = &connect_string_writer_allocating.writer; - - // for parsing the json string - var connect_arena_allocator: ArenaAllocator = .init(alloc); - defer connect_arena_allocator.deinit(); - const connect_allocator = connect_arena_allocator.allocator(); - - try in.discardAll(1); // throw away space - - // Should read the next JSON object to the fixed buffer writer. - _ = try in.streamDelimiter(connect_string_writer, '}'); - try connect_string_writer.writeByte('}'); - try expectStreamBytes(in, "}\r\n"); // discard '}\r\n' - - const connect_str = try connect_string_writer_allocating.toOwnedSlice(); - defer alloc.free(connect_str); - const res = try std.json.parseFromSliceLeaky( - Message.Connect, - connect_allocator, - connect_str, - .{ .allocate = .alloc_always }, - ); - - return .{ .CONNECT = try res.dupe(alloc) }; -} - -pub fn sub(alloc: Allocator, in: *Reader) !Message { - try in.discardAll(1); // throw away space - const subject = try readSubject(alloc, in, .sub); - - const States = enum { - before_second, - in_second, - after_second, - in_third, - in_end, - }; - - var second: ArrayList(u8) = .empty; - errdefer second.deinit(alloc); - var third: ?ArrayList(u8) = null; - errdefer if (third) |*t| t.deinit(alloc); - - sw: switch (@as(States, .before_second)) { - .before_second => { - const byte = try in.peekByte(); - if (isWhitespace(byte)) { - in.toss(1); - continue :sw .before_second; - } - continue :sw .in_second; - }, - .in_second => { - for (1..in.buffer.len) |i| { - try in.fill(i + 1); - if (isWhitespace(in.buffered()[i])) { - @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); - in.toss(i); - break; - } - } else return error.EndOfStream; - continue :sw .after_second; - }, - .after_second => { - const byte = try in.peekByte(); - if (byte == '\r') { - continue :sw .in_end; - } else if (isWhitespace(byte)) { - in.toss(1); - continue :sw .after_second; - } - third = .empty; - continue :sw .in_third; - }, - .in_third => { - for (1..in.buffer.len) |i| { - try in.fill(i + 1); - if (isWhitespace(in.buffered()[i])) { - @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); - in.toss(i); - break; - } - } else return error.EndOfStream; - continue :sw .in_end; - }, - .in_end => { - try expectStreamBytes(in, "\r\n"); - }, - } - - return .{ - .SUB = .{ - .subject = subject, - .queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null, - .sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc), - }, - }; -} - -pub fn unsub(alloc: Allocator, in: *Reader) !Message { - const States = enum { - before_first, - in_first, - after_first, - in_second, - in_end, - }; - - var first: ArrayList(u8) = .empty; - errdefer first.deinit(alloc); - var second: ?ArrayList(u8) = null; - defer if (second) |*s| s.deinit(alloc); - - sw: switch (@as(States, .before_first)) { - .before_first => { - const byte = try in.peekByte(); - if (isWhitespace(byte)) { - in.toss(1); - continue :sw .before_first; - } - continue :sw .in_first; - }, - .in_first => { - const byte = try in.peekByte(); - if (!isWhitespace(byte)) { - try first.append(alloc, byte); - in.toss(1); - continue :sw .in_first; - } - continue :sw .after_first; - }, - .after_first => { - const byte = try in.peekByte(); - if (byte == '\r') { - continue :sw .in_end; - } else if (isWhitespace(byte)) { - in.toss(1); - continue :sw .after_first; - } - second = .empty; - continue :sw .in_second; - }, - .in_second => { - const byte = try in.peekByte(); - if (byte == '\r') { - continue :sw .in_end; - } - try second.?.append(alloc, byte); - in.toss(1); - continue :sw .in_second; - }, - .in_end => { - try expectStreamBytes(in, "\r\n"); - }, - } - - return .{ - .UNSUB = .{ - .sid = try first.toOwnedSlice(alloc), - .max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null, - }, - }; -} - -pub fn @"pub"(alloc: Allocator, in: *Reader) !Message { - try in.discardAll(1); // throw away space - - // Parse subject - const subject: []const u8 = try readSubject(alloc, in, .@"pub"); - errdefer alloc.free(subject); - - const States = enum { - before_second, - in_second, - after_second, - in_third, - in_end, - }; - - var second: ArrayList(u8) = .empty; - defer second.deinit(alloc); - var third: ?ArrayList(u8) = null; - defer if (third) |*t| t.deinit(alloc); - - sw: switch (@as(States, .before_second)) { - .before_second => { - // Drop whitespace - const byte = try in.peekByte(); - if (isWhitespace(byte)) { - in.toss(1); - continue :sw .before_second; - } - continue :sw .in_second; - }, - .in_second => { - for (1..in.buffer.len) |i| { - try in.fill(i + 1); - if (isWhitespace(in.buffered()[i])) { - @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); - in.toss(i); - break; - } - } else return error.EndOfStream; - continue :sw .after_second; - }, - .after_second => { - const byte = try in.peekByte(); - if (byte == '\r') { - continue :sw .in_end; - } else if (isWhitespace(byte)) { - in.toss(1); - continue :sw .after_second; - } - third = .empty; - continue :sw .in_third; - }, - .in_third => { - for (1..in.buffer.len) |i| { - try in.fill(i + 1); - if (isWhitespace(in.buffered()[i])) { - @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); - in.toss(i); - break; - } - } else return error.EndOfStream; - continue :sw .in_end; - }, - .in_end => { - try expectStreamBytes(in, "\r\n"); - }, - } - - const reply_to: ?[]const u8, const bytes: usize = - if (third) |t| .{ - try alloc.dupe(u8, second.items), - try parseUnsigned(usize, t.items, 10), - } else .{ - null, - try parseUnsigned(usize, second.items, 10), - }; - - const payload: Payload = try .read(alloc, in, bytes); - errdefer payload.deinit(alloc); - try expectStreamBytes(in, "\r\n"); - - return .{ - .PUB = .{ - .subject = subject, - .payload = payload, - .reply_to = reply_to, - }, - }; -} - -pub fn hpub(alloc: Allocator, in: *Reader) !Message { - try in.discardAll(1); // throw away space - - // Parse subject - const subject: []const u8 = try readSubject(alloc, in, .@"pub"); - errdefer alloc.free(subject); - - const States = enum { - before_second, - in_second, - after_second, - in_third, - after_third, - in_fourth, - in_end, + break :blk min_len; }; - - var second: ArrayList(u8) = .empty; - defer second.deinit(alloc); - var third: ArrayList(u8) = .empty; - defer third.deinit(alloc); - var fourth: ?ArrayList(u8) = null; - defer if (fourth) |*f| f.deinit(alloc); - - sw: switch (@as(States, .before_second)) { - .before_second => { - // Drop whitespace - const byte = try in.peekByte(); - if (isWhitespace(byte)) { - in.toss(1); - continue :sw .before_second; - } - continue :sw .in_second; - }, - .in_second => { - for (1..in.buffer.len) |i| { - try in.fill(i + 1); - if (isWhitespace(in.buffered()[i])) { - @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); - in.toss(i); - break; - } - } else return error.EndOfStream; - continue :sw .after_second; - }, - .after_second => { - const byte = try in.peekByte(); - if (byte == '\r') { - continue :sw .in_end; - } else if (isWhitespace(byte)) { - in.toss(1); - continue :sw .after_second; + std.debug.assert(in.buffer.len >= longest_ctrl); + while (true) { + var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); + if (iter.next()) |str| { + if (client_control.get(str)) |ctrl| { + in.toss(str.len); + return ctrl; + } else if (str.len >= longest_ctrl) { + return error.InvalidControl; } - third = .empty; - continue :sw .in_third; - }, - .in_third => { - for (1..in.buffer.len) |i| { - try in.fill(i + 1); - if (isWhitespace(in.buffered()[i])) { - @memcpy(try third.addManyAsSlice(alloc, i), in.buffered()[0..i]); - in.toss(i); - break; - } - } else return error.EndOfStream; - continue :sw .after_third; - }, - .after_third => { - const byte = try in.peekByte(); - if (byte == '\r') { - continue :sw .in_end; - } else if (isWhitespace(byte)) { - in.toss(1); - continue :sw .after_third; - } - fourth = .empty; - continue :sw .in_fourth; - }, - .in_fourth => { - for (1..in.buffer.len) |i| { - try in.fill(i + 1); - if (isWhitespace(in.buffered()[i])) { - @memcpy(try fourth.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); - in.toss(i); - break; - } - } else return error.EndOfStream; - continue :sw .in_end; - }, - .in_end => { - try expectStreamBytes(in, "\r\n"); - }, - } - - const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize = - if (fourth) |f| .{ - try alloc.dupe(u8, second.items), - try parseUnsigned(usize, third.items, 10), - try parseUnsigned(usize, f.items, 10), - } else .{ - null, - try parseUnsigned(usize, second.items, 10), - try parseUnsigned(usize, third.items, 10), - }; - - const payload: Payload = try .read(alloc, in, total_bytes); - errdefer payload.deinit(alloc); - try expectStreamBytes(in, "\r\n"); - - return .{ - .HPUB = .{ - .header_bytes = header_bytes, - .@"pub" = .{ - .subject = subject, - .payload = payload, - .reply_to = reply_to, - }, - }, - }; -} - -fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 { - var subject_list: ArrayList(u8) = .empty; - errdefer subject_list.deinit(alloc); - - // Handle the first character - { - const byte = try in.takeByte(); - if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>'))) - return error.InvalidStream; - - try subject_list.append(alloc, byte); - } - - switch (pub_or_sub) { - .sub => { - while (in.takeByte()) |byte| { - if (isWhitespace(byte)) break; - if (byte == '.') { - const next_byte = try in.peekByte(); - if (next_byte == '.' or isWhitespace(next_byte)) - return error.InvalidStream; - } else if (byte == '>') { - const next_byte = try in.takeByte(); - if (!isWhitespace(next_byte)) - return error.InvalidStream; - } else if (byte == '*') { - const next_byte = try in.peekByte(); - if (next_byte != '.' and !isWhitespace(next_byte)) - return error.InvalidStream; - } - try subject_list.append(alloc, byte); - } else |err| return err; - }, - .@"pub" => { - while (in.takeByte()) |byte| { - if (isWhitespace(byte)) break; - if (byte == '*' or byte == '>') return error.InvalidStream; - if (byte == '.') { - const next_byte = try in.peekByte(); - if (next_byte == '.' or isWhitespace(next_byte)) - return error.InvalidStream; - } - try subject_list.append(alloc, byte); - } else |err| return err; - }, - } - - return subject_list.toOwnedSlice(alloc); -} - -inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void { - if (!std.mem.eql(u8, try reader.take(expected.len), expected)) { - @branchHint(.unlikely); - return error.InvalidStream; + } + try in.fillMore(); } } -test sub { - const alloc = std.testing.allocator; - const expectEqualDeep = std.testing.expectEqualDeep; +test control { { - var in: Reader = .fixed(" foo 1\r\n"); - var res = try sub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "foo", - .queue_group = null, - .sid = "1", - }, - }, - res, - ); + var buf: [7]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "PUB " }, + }); + try std.testing.expectEqual(.PUB, try control(&in.interface)); + try std.testing.expectEqualSlices(u8, " ", in.interface.buffered()); } { - var in: Reader = .fixed(" foo 1\r\n"); - var res = try sub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "foo", - .queue_group = null, - .sid = "1", - }, - }, - res, - ); + var buf: [7]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "PUB" }, + }); + try std.testing.expectEqual(.PUB, try control(&in.interface)); + try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); } { - var in: Reader = .fixed(" foo q 1\r\n"); - var res = try sub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "foo", - .queue_group = "q", - .sid = "1", - }, - }, - res, - ); + var buf: [7]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "PU" }, + .{ .buffer = "B" }, + }); + try std.testing.expectEqual(.PUB, try control(&in.interface)); + try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); } { - var in: Reader = .fixed(" 1 q 1\r\n"); - var res = try sub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "1", - .queue_group = "q", - .sid = "1", - }, - }, - res, - ); + var buf: [7]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "PIN" }, + .{ .buffer = "G\r\n" }, + }); + try std.testing.expectEqual(.PING, try control(&in.interface)); + try std.testing.expectEqualSlices(u8, "\r\n", in.interface.buffered()); } { - var in: Reader = .fixed(" $SRV.PING 4\r\n"); - var res = try sub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "$SRV.PING", - .queue_group = null, - .sid = "4", - }, - }, - res, - ); + var buf: [7]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "CONNECT" }, + }); + try std.testing.expectEqual(.CONNECT, try control(&in.interface)); } { - var in: Reader = .fixed(" foo.echo q 10\r\n"); - var res = try sub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "foo.echo", - .queue_group = "q", - .sid = "10", - }, - }, - res, - ); + var buf: [7]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "CONNECC" }, + }); + try std.testing.expectError(error.InvalidControl, control(&in.interface)); } } -test unsub { - const alloc = std.testing.allocator; - const expectEqualDeep = std.testing.expectEqualDeep; - const expectEqual = std.testing.expectEqual; - { - var in: Reader = .fixed(" 1\r\n"); - var res = try unsub(alloc, &in); - defer res.UNSUB.deinit(alloc); - try expectEqualDeep( - Message{ - .UNSUB = .{ - .sid = "1", - .max_msgs = null, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); - } +/// The return value is owned by the reader passed to this function. +/// Operations that modify the readers buffer invalidates this value. +pub fn @"pub"(in: *Reader) !Message.Pub { + // TODO: Add pedantic option. + // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1 + + while (true) { + var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); + + if (iter.next()) |subject| { + if (iter.next()) |second| { + if (in.buffered()[iter.index] == '\r') { + const bytes_str = second; + const bytes = try parseUnsigned(usize, bytes_str, 10); + + // 4 bytes for CRLF on either side of the payload. + in.toss(iter.index + bytes + 4); + return .{ + .subject = subject, + .reply_to = null, + .payload = iter.rest()[1 .. 1 + bytes], + }; + } - { - var in: Reader = .fixed(" 1 1\r\n"); - var res = try unsub(alloc, &in); - defer res.UNSUB.deinit(alloc); - try expectEqualDeep( - Message{ - .UNSUB = .{ - .sid = "1", - .max_msgs = 1, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); + const reply_to = second; + if (iter.next()) |bytes_str| { + const bytes = try parseUnsigned(usize, bytes_str, 10); + + if (in.buffered()[iter.index] == '\r') { + if (iter.rest().len > bytes) { + // 4 bytes for CRLF on either side of the payload. + in.toss(iter.index + bytes + 4); + return .{ + .subject = subject, + .reply_to = reply_to, + .payload = iter.rest()[1 .. 1 + bytes], + }; + } + } + } + } + } + + try in.fillMore(); } } test @"pub" { - const alloc = std.testing.allocator; - const expectEqualDeep = std.testing.expectEqualDeep; - const expectEqual = std.testing.expectEqual; { - var in: Reader = .fixed(" foo 3\r\nbar\r\n"); - var res = try @"pub"(alloc, &in); - defer res.PUB.deinit(alloc); - try expectEqualDeep( - Message{ - .PUB = .{ - .subject = "foo", - .reply_to = null, - .payload = .{ - .len = 3, - .short = blk: { - var s: [128]u8 = undefined; - @memcpy(s[0..3], "bar"); - break :blk s; - }, - .long = null, - }, - }, + var buf: [64]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "foo bar 2\r\nhi\r\n" }, + }); + try std.testing.expectEqualDeep( + Message.Pub{ + .subject = "foo", + .reply_to = "bar", + .payload = "hi", }, - res, + try @"pub"(&in.interface), ); - try expectEqual(0, in.buffered().len); + try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); } - { - var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n"); - var res = try @"pub"(alloc, &in); - defer res.PUB.deinit(alloc); - try expectEqualDeep( - Message{ - .PUB = .{ - .subject = "foo", - .reply_to = "reply.to", - .payload = .{ - .len = 3, - .short = blk: { - var s: [128]u8 = undefined; - @memcpy(s[0..3], "bar"); - break :blk s; - }, - .long = null, - }, - }, + var buf: [64]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "foo 2\r\nhi\r\n" }, + }); + try std.testing.expectEqualDeep( + Message.Pub{ + .subject = "foo", + .reply_to = null, + .payload = "hi", }, - res, + try @"pub"(&in.interface), ); - try expectEqual(0, in.buffered().len); - } - - // numeric reply subject - { - var in: Reader = .fixed(" foo 5 3\r\nbar\r\n"); - var res = try @"pub"(alloc, &in); - defer res.PUB.deinit(alloc); - try expectEqualDeep( - Message{ - .PUB = .{ - .subject = "foo", - .reply_to = "5", - .payload = .{ - .len = 3, - .short = blk: { - var s: [128]u8 = undefined; - @memcpy(s[0..3], "bar"); - break :blk s; - }, - .long = null, - }, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); + try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); } } -test hpub { - const alloc = std.testing.allocator; - const expectEqualDeep = std.testing.expectEqualDeep; - const expectEqual = std.testing.expectEqual; - { - var in: Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); - var res = try hpub(alloc, &in); - defer res.HPUB.deinit(alloc); - try expectEqualDeep( - Message{ - .HPUB = .{ - .header_bytes = 22, - .@"pub" = .{ - .subject = "foo", - .reply_to = null, - .payload = .{ - .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, - .short = blk: { - var s: [128]u8 = undefined; - const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; - @memcpy(s[0..str.len], str); - break :blk s; - }, - .long = null, - }, - }, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); - } - - { - var in: Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); - var res = try hpub(alloc, &in); - defer res.HPUB.deinit(alloc); - try expectEqualDeep( - Message{ - .HPUB = .{ - .header_bytes = 22, - .@"pub" = .{ - .subject = "foo", - .reply_to = "reply.to", - .payload = .{ - .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, - .short = blk: { - var s: [128]u8 = undefined; - const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; - @memcpy(s[0..str.len], str); - break :blk s; - }, - .long = null, - }, - }, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); - } - - { - var in: Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); - var res = try hpub(alloc, &in); - defer res.HPUB.deinit(alloc); - try expectEqualDeep( - Message{ - .HPUB = .{ - .header_bytes = 22, - .@"pub" = .{ - .subject = "foo", - .reply_to = "6", - .payload = .{ - .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, - .short = blk: { - var s: [128]u8 = undefined; - const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; - @memcpy(s[0..str.len], str); - break :blk s; - }, - .long = null, - }, - }, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); - } -} +// /// Get the next Message from the input stream. +// pub fn next(alloc: Allocator, in: *Reader) !Message { +// errdefer log.err("Failed to parse {s}", .{operation_string.items}); + +// switch (operation) { +// .CONNECT => return connect(alloc, in), +// .PUB => { +// @branchHint(.likely); +// return @"pub"(alloc, in); +// }, +// .HPUB => { +// @branchHint(.likely); +// return hpub(alloc, in); +// }, +// .PING => { +// try expectStreamBytes(in, "\r\n"); +// return .PING; +// }, +// .PONG => { +// try expectStreamBytes(in, "\r\n"); +// return .PONG; +// }, +// .SUB => return sub(alloc, in), +// .UNSUB => return unsub(alloc, in), +// else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}), +// } +// } + +// pub fn connect(alloc: Allocator, in: *Reader) !Message { +// // for storing the json string +// var connect_string_writer_allocating: AllocatingWriter = .init(alloc); +// defer connect_string_writer_allocating.deinit(); +// var connect_string_writer = &connect_string_writer_allocating.writer; + +// // for parsing the json string +// var connect_arena_allocator: ArenaAllocator = .init(alloc); +// defer connect_arena_allocator.deinit(); +// const connect_allocator = connect_arena_allocator.allocator(); + +// try in.discardAll(1); // throw away space + +// // Should read the next JSON object to the fixed buffer writer. +// _ = try in.streamDelimiter(connect_string_writer, '}'); +// try connect_string_writer.writeByte('}'); +// try expectStreamBytes(in, "}\r\n"); // discard '}\r\n' + +// const connect_str = try connect_string_writer_allocating.toOwnedSlice(); +// defer alloc.free(connect_str); +// const res = try std.json.parseFromSliceLeaky( +// Message.Connect, +// connect_allocator, +// connect_str, +// .{ .allocate = .alloc_always }, +// ); + +// return .{ .CONNECT = try res.dupe(alloc) }; +// } + +// pub fn sub(alloc: Allocator, in: *Reader) !Message { +// try in.discardAll(1); // throw away space +// const subject = try readSubject(alloc, in, .sub); + +// const States = enum { +// before_second, +// in_second, +// after_second, +// in_third, +// in_end, +// }; + +// var second: ArrayList(u8) = .empty; +// errdefer second.deinit(alloc); +// var third: ?ArrayList(u8) = null; +// errdefer if (third) |*t| t.deinit(alloc); + +// sw: switch (@as(States, .before_second)) { +// .before_second => { +// const byte = try in.peekByte(); +// if (isWhitespace(byte)) { +// in.toss(1); +// continue :sw .before_second; +// } +// continue :sw .in_second; +// }, +// .in_second => { +// for (1..in.buffer.len) |i| { +// try in.fill(i + 1); +// if (isWhitespace(in.buffered()[i])) { +// @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); +// in.toss(i); +// break; +// } +// } else return error.EndOfStream; +// continue :sw .after_second; +// }, +// .after_second => { +// const byte = try in.peekByte(); +// if (byte == '\r') { +// continue :sw .in_end; +// } else if (isWhitespace(byte)) { +// in.toss(1); +// continue :sw .after_second; +// } +// third = .empty; +// continue :sw .in_third; +// }, +// .in_third => { +// for (1..in.buffer.len) |i| { +// try in.fill(i + 1); +// if (isWhitespace(in.buffered()[i])) { +// @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); +// in.toss(i); +// break; +// } +// } else return error.EndOfStream; +// continue :sw .in_end; +// }, +// .in_end => { +// try expectStreamBytes(in, "\r\n"); +// }, +// } + +// return .{ +// .SUB = .{ +// .subject = subject, +// .queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null, +// .sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc), +// }, +// }; +// } + +// pub fn unsub(alloc: Allocator, in: *Reader) !Message { +// const States = enum { +// before_first, +// in_first, +// after_first, +// in_second, +// in_end, +// }; + +// var first: ArrayList(u8) = .empty; +// errdefer first.deinit(alloc); +// var second: ?ArrayList(u8) = null; +// defer if (second) |*s| s.deinit(alloc); + +// sw: switch (@as(States, .before_first)) { +// .before_first => { +// const byte = try in.peekByte(); +// if (isWhitespace(byte)) { +// in.toss(1); +// continue :sw .before_first; +// } +// continue :sw .in_first; +// }, +// .in_first => { +// const byte = try in.peekByte(); +// if (!isWhitespace(byte)) { +// try first.append(alloc, byte); +// in.toss(1); +// continue :sw .in_first; +// } +// continue :sw .after_first; +// }, +// .after_first => { +// const byte = try in.peekByte(); +// if (byte == '\r') { +// continue :sw .in_end; +// } else if (isWhitespace(byte)) { +// in.toss(1); +// continue :sw .after_first; +// } +// second = .empty; +// continue :sw .in_second; +// }, +// .in_second => { +// const byte = try in.peekByte(); +// if (byte == '\r') { +// continue :sw .in_end; +// } +// try second.?.append(alloc, byte); +// in.toss(1); +// continue :sw .in_second; +// }, +// .in_end => { +// try expectStreamBytes(in, "\r\n"); +// }, +// } + +// return .{ +// .UNSUB = .{ +// .sid = try first.toOwnedSlice(alloc), +// .max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null, +// }, +// }; +// } + +// pub fn @"pub"(alloc: Allocator, in: *Reader) !Message { +// try in.discardAll(1); // throw away space + +// // Parse subject +// const subject: []const u8 = try readSubject(alloc, in, .@"pub"); +// errdefer alloc.free(subject); + +// const States = enum { +// before_second, +// in_second, +// after_second, +// in_third, +// in_end, +// }; + +// var second: ArrayList(u8) = .empty; +// defer second.deinit(alloc); +// var third: ?ArrayList(u8) = null; +// defer if (third) |*t| t.deinit(alloc); + +// sw: switch (@as(States, .before_second)) { +// .before_second => { +// // Drop whitespace +// const byte = try in.peekByte(); +// if (isWhitespace(byte)) { +// in.toss(1); +// continue :sw .before_second; +// } +// continue :sw .in_second; +// }, +// .in_second => { +// for (1..in.buffer.len) |i| { +// try in.fill(i + 1); +// if (isWhitespace(in.buffered()[i])) { +// @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); +// in.toss(i); +// break; +// } +// } else return error.EndOfStream; +// continue :sw .after_second; +// }, +// .after_second => { +// const byte = try in.peekByte(); +// if (byte == '\r') { +// continue :sw .in_end; +// } else if (isWhitespace(byte)) { +// in.toss(1); +// continue :sw .after_second; +// } +// third = .empty; +// continue :sw .in_third; +// }, +// .in_third => { +// for (1..in.buffer.len) |i| { +// try in.fill(i + 1); +// if (isWhitespace(in.buffered()[i])) { +// @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); +// in.toss(i); +// break; +// } +// } else return error.EndOfStream; +// continue :sw .in_end; +// }, +// .in_end => { +// try expectStreamBytes(in, "\r\n"); +// }, +// } + +// const reply_to: ?[]const u8, const bytes: usize = +// if (third) |t| .{ +// try alloc.dupe(u8, second.items), +// try parseUnsigned(usize, t.items, 10), +// } else .{ +// null, +// try parseUnsigned(usize, second.items, 10), +// }; + +// const payload: Payload = try .read(alloc, in, bytes); +// errdefer payload.deinit(alloc); +// try expectStreamBytes(in, "\r\n"); + +// return .{ +// .PUB = .{ +// .subject = subject, +// .payload = payload, +// .reply_to = reply_to, +// }, +// }; +// } + +// pub fn hpub(alloc: Allocator, in: *Reader) !Message { +// try in.discardAll(1); // throw away space + +// // Parse subject +// const subject: []const u8 = try readSubject(alloc, in, .@"pub"); +// errdefer alloc.free(subject); + +// const States = enum { +// before_second, +// in_second, +// after_second, +// in_third, +// after_third, +// in_fourth, +// in_end, +// }; + +// var second: ArrayList(u8) = .empty; +// defer second.deinit(alloc); +// var third: ArrayList(u8) = .empty; +// defer third.deinit(alloc); +// var fourth: ?ArrayList(u8) = null; +// defer if (fourth) |*f| f.deinit(alloc); + +// sw: switch (@as(States, .before_second)) { +// .before_second => { +// // Drop whitespace +// const byte = try in.peekByte(); +// if (isWhitespace(byte)) { +// in.toss(1); +// continue :sw .before_second; +// } +// continue :sw .in_second; +// }, +// .in_second => { +// for (1..in.buffer.len) |i| { +// try in.fill(i + 1); +// if (isWhitespace(in.buffered()[i])) { +// @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); +// in.toss(i); +// break; +// } +// } else return error.EndOfStream; +// continue :sw .after_second; +// }, +// .after_second => { +// const byte = try in.peekByte(); +// if (byte == '\r') { +// continue :sw .in_end; +// } else if (isWhitespace(byte)) { +// in.toss(1); +// continue :sw .after_second; +// } +// third = .empty; +// continue :sw .in_third; +// }, +// .in_third => { +// for (1..in.buffer.len) |i| { +// try in.fill(i + 1); +// if (isWhitespace(in.buffered()[i])) { +// @memcpy(try third.addManyAsSlice(alloc, i), in.buffered()[0..i]); +// in.toss(i); +// break; +// } +// } else return error.EndOfStream; +// continue :sw .after_third; +// }, +// .after_third => { +// const byte = try in.peekByte(); +// if (byte == '\r') { +// continue :sw .in_end; +// } else if (isWhitespace(byte)) { +// in.toss(1); +// continue :sw .after_third; +// } +// fourth = .empty; +// continue :sw .in_fourth; +// }, +// .in_fourth => { +// for (1..in.buffer.len) |i| { +// try in.fill(i + 1); +// if (isWhitespace(in.buffered()[i])) { +// @memcpy(try fourth.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); +// in.toss(i); +// break; +// } +// } else return error.EndOfStream; +// continue :sw .in_end; +// }, +// .in_end => { +// try expectStreamBytes(in, "\r\n"); +// }, +// } + +// const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize = +// if (fourth) |f| .{ +// try alloc.dupe(u8, second.items), +// try parseUnsigned(usize, third.items, 10), +// try parseUnsigned(usize, f.items, 10), +// } else .{ +// null, +// try parseUnsigned(usize, second.items, 10), +// try parseUnsigned(usize, third.items, 10), +// }; + +// const payload: Payload = try .read(alloc, in, total_bytes); +// errdefer payload.deinit(alloc); +// try expectStreamBytes(in, "\r\n"); + +// return .{ +// .HPUB = .{ +// .header_bytes = header_bytes, +// .@"pub" = .{ +// .subject = subject, +// .payload = payload, +// .reply_to = reply_to, +// }, +// }, +// }; +// } + +// fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 { +// var subject_list: ArrayList(u8) = .empty; +// errdefer subject_list.deinit(alloc); + +// // Handle the first character +// { +// const byte = try in.takeByte(); +// if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>'))) +// return error.InvalidStream; + +// try subject_list.append(alloc, byte); +// } + +// switch (pub_or_sub) { +// .sub => { +// while (in.takeByte()) |byte| { +// if (isWhitespace(byte)) break; +// if (byte == '.') { +// const next_byte = try in.peekByte(); +// if (next_byte == '.' or isWhitespace(next_byte)) +// return error.InvalidStream; +// } else if (byte == '>') { +// const next_byte = try in.takeByte(); +// if (!isWhitespace(next_byte)) +// return error.InvalidStream; +// } else if (byte == '*') { +// const next_byte = try in.peekByte(); +// if (next_byte != '.' and !isWhitespace(next_byte)) +// return error.InvalidStream; +// } +// try subject_list.append(alloc, byte); +// } else |err| return err; +// }, +// .@"pub" => { +// while (in.takeByte()) |byte| { +// if (isWhitespace(byte)) break; +// if (byte == '*' or byte == '>') return error.InvalidStream; +// if (byte == '.') { +// const next_byte = try in.peekByte(); +// if (next_byte == '.' or isWhitespace(next_byte)) +// return error.InvalidStream; +// } +// try subject_list.append(alloc, byte); +// } else |err| return err; +// }, +// } + +// return subject_list.toOwnedSlice(alloc); +// } + +// inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void { +// if (!std.mem.eql(u8, try reader.take(expected.len), expected)) { +// @branchHint(.unlikely); +// return error.InvalidStream; +// } +// } + +// test sub { +// const alloc = std.testing.allocator; +// const expectEqualDeep = std.testing.expectEqualDeep; +// { +// var in: Reader = .fixed(" foo 1\r\n"); +// var res = try sub(alloc, &in); +// defer res.SUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .SUB = .{ +// .subject = "foo", +// .queue_group = null, +// .sid = "1", +// }, +// }, +// res, +// ); +// } +// { +// var in: Reader = .fixed(" foo 1\r\n"); +// var res = try sub(alloc, &in); +// defer res.SUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .SUB = .{ +// .subject = "foo", +// .queue_group = null, +// .sid = "1", +// }, +// }, +// res, +// ); +// } +// { +// var in: Reader = .fixed(" foo q 1\r\n"); +// var res = try sub(alloc, &in); +// defer res.SUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .SUB = .{ +// .subject = "foo", +// .queue_group = "q", +// .sid = "1", +// }, +// }, +// res, +// ); +// } +// { +// var in: Reader = .fixed(" 1 q 1\r\n"); +// var res = try sub(alloc, &in); +// defer res.SUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .SUB = .{ +// .subject = "1", +// .queue_group = "q", +// .sid = "1", +// }, +// }, +// res, +// ); +// } +// { +// var in: Reader = .fixed(" $SRV.PING 4\r\n"); +// var res = try sub(alloc, &in); +// defer res.SUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .SUB = .{ +// .subject = "$SRV.PING", +// .queue_group = null, +// .sid = "4", +// }, +// }, +// res, +// ); +// } +// { +// var in: Reader = .fixed(" foo.echo q 10\r\n"); +// var res = try sub(alloc, &in); +// defer res.SUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .SUB = .{ +// .subject = "foo.echo", +// .queue_group = "q", +// .sid = "10", +// }, +// }, +// res, +// ); +// } +// } + +// test unsub { +// const alloc = std.testing.allocator; +// const expectEqualDeep = std.testing.expectEqualDeep; +// const expectEqual = std.testing.expectEqual; +// { +// var in: Reader = .fixed(" 1\r\n"); +// var res = try unsub(alloc, &in); +// defer res.UNSUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .UNSUB = .{ +// .sid = "1", +// .max_msgs = null, +// }, +// }, +// res, +// ); +// try expectEqual(0, in.buffered().len); +// } + +// { +// var in: Reader = .fixed(" 1 1\r\n"); +// var res = try unsub(alloc, &in); +// defer res.UNSUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .UNSUB = .{ +// .sid = "1", +// .max_msgs = 1, +// }, +// }, +// res, +// ); +// try expectEqual(0, in.buffered().len); +// } +// } + +// test @"pub" { +// const alloc = std.testing.allocator; +// const expectEqualDeep = std.testing.expectEqualDeep; +// const expectEqual = std.testing.expectEqual; +// { +// var in: Reader = .fixed(" foo 3\r\nbar\r\n"); +// var res = try @"pub"(alloc, &in); +// defer res.PUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .PUB = .{ +// .subject = "foo", +// .reply_to = null, +// .payload = .{ +// .len = 3, +// .short = blk: { +// var s: [128]u8 = undefined; +// @memcpy(s[0..3], "bar"); +// break :blk s; +// }, +// .long = null, +// }, +// }, +// }, +// res, +// ); +// try expectEqual(0, in.buffered().len); +// } + +// { +// var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n"); +// var res = try @"pub"(alloc, &in); +// defer res.PUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .PUB = .{ +// .subject = "foo", +// .reply_to = "reply.to", +// .payload = .{ +// .len = 3, +// .short = blk: { +// var s: [128]u8 = undefined; +// @memcpy(s[0..3], "bar"); +// break :blk s; +// }, +// .long = null, +// }, +// }, +// }, +// res, +// ); +// try expectEqual(0, in.buffered().len); +// } + +// // numeric reply subject +// { +// var in: Reader = .fixed(" foo 5 3\r\nbar\r\n"); +// var res = try @"pub"(alloc, &in); +// defer res.PUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .PUB = .{ +// .subject = "foo", +// .reply_to = "5", +// .payload = .{ +// .len = 3, +// .short = blk: { +// var s: [128]u8 = undefined; +// @memcpy(s[0..3], "bar"); +// break :blk s; +// }, +// .long = null, +// }, +// }, +// }, +// res, +// ); +// try expectEqual(0, in.buffered().len); +// } +// } + +// test hpub { +// const alloc = std.testing.allocator; +// const expectEqualDeep = std.testing.expectEqualDeep; +// const expectEqual = std.testing.expectEqual; +// { +// var in: Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); +// var res = try hpub(alloc, &in); +// defer res.HPUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .HPUB = .{ +// .header_bytes = 22, +// .@"pub" = .{ +// .subject = "foo", +// .reply_to = null, +// .payload = .{ +// .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, +// .short = blk: { +// var s: [128]u8 = undefined; +// const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; +// @memcpy(s[0..str.len], str); +// break :blk s; +// }, +// .long = null, +// }, +// }, +// }, +// }, +// res, +// ); +// try expectEqual(0, in.buffered().len); +// } + +// { +// var in: Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); +// var res = try hpub(alloc, &in); +// defer res.HPUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .HPUB = .{ +// .header_bytes = 22, +// .@"pub" = .{ +// .subject = "foo", +// .reply_to = "reply.to", +// .payload = .{ +// .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, +// .short = blk: { +// var s: [128]u8 = undefined; +// const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; +// @memcpy(s[0..str.len], str); +// break :blk s; +// }, +// .long = null, +// }, +// }, +// }, +// }, +// res, +// ); +// try expectEqual(0, in.buffered().len); +// } + +// { +// var in: Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); +// var res = try hpub(alloc, &in); +// defer res.HPUB.deinit(alloc); +// try expectEqualDeep( +// Message{ +// .HPUB = .{ +// .header_bytes = 22, +// .@"pub" = .{ +// .subject = "foo", +// .reply_to = "6", +// .payload = .{ +// .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, +// .short = blk: { +// var s: [128]u8 = undefined; +// const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; +// @memcpy(s[0..str.len], str); +// break :blk s; +// }, +// .long = null, +// }, +// }, +// }, +// }, +// res, +// ); +// try expectEqual(0, in.buffered().len); +// } +// } diff --git a/src/Server/parse/Payload.zig b/src/Server/parse/Payload.zig deleted file mode 100644 index b512a81..0000000 --- a/src/Server/parse/Payload.zig +++ /dev/null @@ -1,51 +0,0 @@ -const std = @import("std"); -const Reader = std.Io.Reader; -const Writer = std.Io.Writer; -const Allocator = std.mem.Allocator; - -const Payload = @This(); - -len: u32, -short: [128]u8, -long: ?[]u8, - -pub fn read(alloc: Allocator, in: *Reader, bytes: usize) !Payload { - var res: Payload = .{ - .len = @intCast(bytes), - .short = undefined, - .long = null, - }; - - try in.readSliceAll(res.short[0..@min(bytes, res.short.len)]); - if (bytes > res.short.len) { - const long = try alloc.alloc(u8, bytes - res.short.len); - errdefer alloc.free(long); - try in.readSliceAll(long); - res.long = long; - } - return res; -} - -pub fn write(self: Payload, out: *Writer) !void { - std.debug.assert(out.buffer.len >= self.short.len); - std.debug.assert(self.len <= self.short.len or self.long != null); - try out.writeAll(self.short[0..@min(self.len, self.short.len)]); - if (self.long) |l| { - try out.writeAll(l); - } -} - -pub fn deinit(self: Payload, alloc: Allocator) void { - if (self.long) |l| { - alloc.free(l); - } -} - -pub fn dupe(self: Payload, alloc: Allocator) !Payload { - var res = self; - if (self.long) |l| { - res.long = try alloc.dupe(u8, l); - } - errdefer if (res.long) |l| alloc.free(l); - return res; -} diff --git a/src/Server/parse/message.zig b/src/Server/parse/message.zig deleted file mode 100644 index c8a308f..0000000 --- a/src/Server/parse/message.zig +++ /dev/null @@ -1,208 +0,0 @@ -const std = @import("std"); -const ArrayList = std.ArrayList; -const Allocator = std.mem.Allocator; -const Reader = std.Io.Reader; - -const Payload = @import("Payload.zig"); - -pub const Control = @typeInfo(Message).@"union".tag_type.?; - -pub const Message = union(enum) { - INFO: ServerInfo, - CONNECT: Connect, - PUB: Pub, - HPUB: HPub, - SUB: Sub, - UNSUB: Unsub, - MSG: Msg, - HMSG: HMsg, - PING, - PONG, - @"+OK": void, - @"-ERR": []const u8, - pub const ServerInfo = struct { - /// The unique identifier of the NATS server. - server_id: []const u8, - /// The name of the NATS server. - server_name: []const u8, - /// The version of NATS. - version: []const u8, - /// The version of golang the NATS server was built with. - go: []const u8 = "0.0.0", - /// The IP address used to start the NATS server, - /// by default this will be 0.0.0.0 and can be - /// configured with -client_advertise host:port. - host: []const u8 = "0.0.0.0", - /// The port number the NATS server is configured - /// to listen on. - port: u16 = 4222, - /// Whether the server supports headers. - headers: bool = false, - /// Maximum payload size, in bytes, that the server - /// will accept from the client. - max_payload: u64, - /// An integer indicating the protocol version of - /// the server. The server version 1.2.0 sets this - /// to 1 to indicate that it supports the "Echo" - /// feature. - proto: u32 = 1, - }; - pub const Connect = struct { - verbose: bool = false, - pedantic: bool = false, - tls_required: bool = false, - auth_token: ?[]const u8 = null, - user: ?[]const u8 = null, - pass: ?[]const u8 = null, - name: ?[]const u8 = null, - lang: []const u8, - version: []const u8, - protocol: u32, - echo: ?bool = null, - sig: ?[]const u8 = null, - jwt: ?[]const u8 = null, - no_responders: ?bool = null, - headers: ?bool = null, - nkey: ?[]const u8 = null, - - pub fn deinit(self: Connect, alloc: Allocator) void { - if (self.auth_token) |a| alloc.free(a); - if (self.user) |u| alloc.free(u); - if (self.pass) |p| alloc.free(p); - if (self.name) |n| alloc.free(n); - alloc.free(self.lang); - alloc.free(self.version); - if (self.sig) |s| alloc.free(s); - if (self.jwt) |j| alloc.free(j); - if (self.nkey) |n| alloc.free(n); - } - - pub fn dupe(self: Connect, alloc: Allocator) !Connect { - var res = self; - res.auth_token = if (self.auth_token) |a| try alloc.dupe(u8, a) else null; - errdefer if (res.auth_token) |a| alloc.free(a); - res.user = if (self.user) |u| try alloc.dupe(u8, u) else null; - errdefer if (res.user) |u| alloc.free(u); - res.pass = if (self.pass) |p| try alloc.dupe(u8, p) else null; - errdefer if (res.pass) |p| alloc.free(p); - res.name = if (self.name) |n| try alloc.dupe(u8, n) else null; - errdefer if (res.name) |n| alloc.free(n); - res.lang = try alloc.dupe(u8, self.lang); - errdefer alloc.free(res.lang); - res.version = try alloc.dupe(u8, self.version); - errdefer alloc.free(res.version); - res.sig = if (self.sig) |s| try alloc.dupe(u8, s) else null; - errdefer if (res.sig) |s| alloc.free(s); - res.jwt = if (self.jwt) |j| try alloc.dupe(u8, j) else null; - errdefer if (res.jwt) |j| alloc.free(j); - res.nkey = if (self.nkey) |n| try alloc.dupe(u8, n) else null; - errdefer if (res.nkey) |n| alloc.free(n); - return res; - } - }; - pub const Pub = struct { - /// The destination subject to publish to. - subject: []const u8, - /// The reply subject that subscribers can use to send a response back to the publisher/requestor. - reply_to: ?[]const u8 = null, - /// The message payload data. - payload: Payload, - - pub fn deinit(self: Pub, alloc: Allocator) void { - alloc.free(self.subject); - self.payload.deinit(alloc); - if (self.reply_to) |r| alloc.free(r); - } - - pub fn toMsg(self: Pub, alloc: Allocator, sid: []const u8) !Msg { - const res: Msg = .{ - .subject = self.subject, - .sid = sid, - .reply_to = self.reply_to, - .payload = self.payload, - }; - return res.dupe(alloc); - } - }; - pub const HPub = struct { - header_bytes: usize, - @"pub": Pub, - - pub fn deinit(self: HPub, alloc: Allocator) void { - self.@"pub".deinit(alloc); - } - - pub fn toHMsg(self: HPub, alloc: Allocator, sid: []const u8) !HMsg { - return .{ - .header_bytes = self.header_bytes, - .msg = try self.@"pub".toMsg(alloc, sid), - }; - } - }; - - pub const HMsg = struct { - header_bytes: usize, - msg: Msg, - - pub fn deinit(self: HMsg, alloc: Allocator) void { - self.msg.deinit(alloc); - } - - pub fn dupe(self: HMsg, alloc: Allocator) !HMsg { - var res = self; - res.msg = try self.msg.dupe(alloc); - errdefer alloc.free(res.msg); - return res; - } - }; - pub const Sub = struct { - /// The subject name to subscribe to. - subject: []const u8, - /// If specified, the subscriber will join this queue group. - queue_group: ?[]const u8, - /// A unique alphanumeric subscription ID, generated by the client. - sid: []const u8, - - pub fn deinit(self: Sub, alloc: Allocator) void { - alloc.free(self.subject); - alloc.free(self.sid); - if (self.queue_group) |q| alloc.free(q); - } - }; - pub const Unsub = struct { - /// The unique alphanumeric subscription ID of the subject to unsubscribe from. - sid: []const u8, - /// A number of messages to wait for before automatically unsubscribing. - max_msgs: ?usize = null, - - pub fn deinit(self: Unsub, alloc: Allocator) void { - alloc.free(self.sid); - } - }; - pub const Msg = struct { - subject: []const u8, - sid: []const u8, - reply_to: ?[]const u8, - payload: Payload, - - pub fn deinit(self: Msg, alloc: Allocator) void { - alloc.free(self.subject); - alloc.free(self.sid); - if (self.reply_to) |r| alloc.free(r); - self.payload.deinit(alloc); - } - - pub fn dupe(self: Msg, alloc: Allocator) !Msg { - var res: Msg = undefined; - res.subject = try alloc.dupe(u8, self.subject); - errdefer alloc.free(res.subject); - res.sid = try alloc.dupe(u8, self.sid); - errdefer alloc.free(res.sid); - res.reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null; - errdefer if (res.reply_to) |r| alloc.free(r); - res.payload = try self.payload.dupe(alloc); - errdefer alloc.free(res.payload); - return res; - } - }; -}; diff --git a/src/Server/serialize.zig b/src/Server/serialize.zig new file mode 100644 index 0000000..b79beca --- /dev/null +++ b/src/Server/serialize.zig @@ -0,0 +1,6 @@ +const std = @import("std"); + +const message = @import("./message.zig"); +const Message = message.Message; + +pub fn message(msg: Message) -- cgit