summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-06 21:56:39 -0500
committerRobby Zambito <contact@robbyzambito.me>2026-01-06 21:59:41 -0500
commit48969283527e0db6b71893b2b3f3bbeb21e522db (patch)
treedee322b777c79ef1cf7133bb65f226488b2cdab7
parentcc036318387cc5c44f2a0a2a1e28d067f3e6bdf6 (diff)
Major restructuring
This makes things much easier to use as a library
-rw-r--r--src/Server.zig6
-rw-r--r--src/Server/Client.zig5
-rw-r--r--src/Server/parse.zig (renamed from src/Server/message_parser.zig)1006
-rw-r--r--src/Server/parse/Payload.zig51
-rw-r--r--src/Server/parse/message.zig208
-rw-r--r--src/main.zig6
-rw-r--r--src/subcommand/server.zig2
7 files changed, 619 insertions, 665 deletions
diff --git a/src/Server.zig b/src/Server.zig
index 85ddd9e..c2cc17d 100644
--- a/src/Server.zig
+++ b/src/Server.zig
@@ -13,10 +13,10 @@ const Stream = std.Io.net.Stream;
pub const Client = @import("./Server/Client.zig");
-const message_parser = @import("./Server/message_parser.zig");
+pub const parse = @import("./Server/parse.zig");
-pub const MessageType = message_parser.MessageType;
-pub const Message = message_parser.Message;
+const MessageType = parse.MessageType;
+const Message = parse.Message;
const ServerInfo = Message.ServerInfo;
const Msgs = Client.Msgs;
diff --git a/src/Server/Client.zig b/src/Server/Client.zig
index dff3534..9ec928c 100644
--- a/src/Server/Client.zig
+++ b/src/Server/Client.zig
@@ -1,4 +1,5 @@
-const Message = @import("message_parser.zig").Message;
+const parse = @import("parse.zig");
+const Message = parse.Message;
const std = @import("std");
const Queue = std.Io.Queue;
@@ -193,7 +194,7 @@ test send {
}
pub fn next(self: *Client, allocator: std.mem.Allocator) !Message {
- return Message.next(allocator, self.from_client);
+ return parse.next(allocator, self.from_client);
}
test next {
diff --git a/src/Server/message_parser.zig b/src/Server/parse.zig
index fd1b5b1..d58c0e5 100644
--- a/src/Server/message_parser.zig
+++ b/src/Server/parse.zig
@@ -1,352 +1,99 @@
const std = @import("std");
-const Allocator = std.mem.Allocator;
const ArenaAllocator = std.heap.ArenaAllocator;
+const Allocator = std.mem.Allocator;
const ArrayList = std.ArrayList;
+const Reader = std.Io.Reader;
+const Writer = std.Io.Writer;
+const AllocatingWriter = std.Io.Writer.Allocating;
const StaticStringMap = std.StaticStringMap;
-const Io = std.Io;
-const Writer = Io.Writer;
-const AllocatingWriter = Writer.Allocating;
-const Reader = Io.Reader;
+const log = std.log;
-const ascii = std.ascii;
const isDigit = std.ascii.isDigit;
const isUpper = std.ascii.isUpper;
const isWhitespace = std.ascii.isWhitespace;
const parseUnsigned = std.fmt.parseUnsigned;
-const log = std.log;
-
-pub const Payload = struct {
- len: u32,
- short: [128]u8,
- long: ?[]u8,
-
- pub fn read(alloc: Allocator, in: *Reader, bytes: usize) !Payload {
- var res: Payload = .{
- .len = @intCast(bytes),
- .short = undefined,
- .long = null,
- };
-
- try in.readSliceAll(res.short[0..@min(bytes, res.short.len)]);
- if (bytes > res.short.len) {
- const long = try alloc.alloc(u8, bytes - res.short.len);
- errdefer alloc.free(long);
- try in.readSliceAll(long);
- res.long = long;
- }
- return res;
- }
-
- pub fn write(self: Payload, out: *Writer) !void {
- std.debug.assert(out.buffer.len >= self.short.len);
- std.debug.assert(self.len <= self.short.len or self.long != null);
- try out.writeAll(self.short[0..@min(self.len, self.short.len)]);
- if (self.long) |l| {
- try out.writeAll(l);
- }
- }
-
- pub fn deinit(self: Payload, alloc: Allocator) void {
- if (self.long) |l| {
- alloc.free(l);
- }
- }
-
- pub fn dupe(self: Payload, alloc: Allocator) !Payload {
- var res = self;
- if (self.long) |l| {
- res.long = try alloc.dupe(u8, l);
- }
- errdefer if (res.long) |l| alloc.free(l);
- return res;
- }
-};
-
-pub const MessageType = @typeInfo(Message).@"union".tag_type.?;
-
-pub const Message = union(enum) {
- INFO: ServerInfo,
- CONNECT: Connect,
- PUB: Pub,
- HPUB: HPub,
- SUB: Sub,
- UNSUB: Unsub,
- MSG: Msg,
- HMSG: HMsg,
- PING,
- PONG,
- @"+OK": void,
- @"-ERR": []const u8,
- pub const ServerInfo = struct {
- /// The unique identifier of the NATS server.
- server_id: []const u8,
- /// The name of the NATS server.
- server_name: []const u8,
- /// The version of NATS.
- version: []const u8,
- /// The version of golang the NATS server was built with.
- go: []const u8 = "0.0.0",
- /// The IP address used to start the NATS server,
- /// by default this will be 0.0.0.0 and can be
- /// configured with -client_advertise host:port.
- host: []const u8 = "0.0.0.0",
- /// The port number the NATS server is configured
- /// to listen on.
- port: u16 = 4222,
- /// Whether the server supports headers.
- headers: bool = false,
- /// Maximum payload size, in bytes, that the server
- /// will accept from the client.
- max_payload: u64,
- /// An integer indicating the protocol version of
- /// the server. The server version 1.2.0 sets this
- /// to 1 to indicate that it supports the "Echo"
- /// feature.
- proto: u32 = 1,
- };
- pub const Connect = struct {
- verbose: bool = false,
- pedantic: bool = false,
- tls_required: bool = false,
- auth_token: ?[]const u8 = null,
- user: ?[]const u8 = null,
- pass: ?[]const u8 = null,
- name: ?[]const u8 = null,
- lang: []const u8,
- version: []const u8,
- protocol: u32,
- echo: ?bool = null,
- sig: ?[]const u8 = null,
- jwt: ?[]const u8 = null,
- no_responders: ?bool = null,
- headers: ?bool = null,
- nkey: ?[]const u8 = null,
-
- pub fn deinit(self: Connect, alloc: Allocator) void {
- if (self.auth_token) |a| alloc.free(a);
- if (self.user) |u| alloc.free(u);
- if (self.pass) |p| alloc.free(p);
- if (self.name) |n| alloc.free(n);
- alloc.free(self.lang);
- alloc.free(self.version);
- if (self.sig) |s| alloc.free(s);
- if (self.jwt) |j| alloc.free(j);
- if (self.nkey) |n| alloc.free(n);
- }
-
- pub fn dupe(self: Connect, alloc: Allocator) !Connect {
- var res = self;
- res.auth_token = if (self.auth_token) |a| try alloc.dupe(u8, a) else null;
- errdefer if (res.auth_token) |a| alloc.free(a);
- res.user = if (self.user) |u| try alloc.dupe(u8, u) else null;
- errdefer if (res.user) |u| alloc.free(u);
- res.pass = if (self.pass) |p| try alloc.dupe(u8, p) else null;
- errdefer if (res.pass) |p| alloc.free(p);
- res.name = if (self.name) |n| try alloc.dupe(u8, n) else null;
- errdefer if (res.name) |n| alloc.free(n);
- res.lang = try alloc.dupe(u8, self.lang);
- errdefer alloc.free(res.lang);
- res.version = try alloc.dupe(u8, self.version);
- errdefer alloc.free(res.version);
- res.sig = if (self.sig) |s| try alloc.dupe(u8, s) else null;
- errdefer if (res.sig) |s| alloc.free(s);
- res.jwt = if (self.jwt) |j| try alloc.dupe(u8, j) else null;
- errdefer if (res.jwt) |j| alloc.free(j);
- res.nkey = if (self.nkey) |n| try alloc.dupe(u8, n) else null;
- errdefer if (res.nkey) |n| alloc.free(n);
- return res;
- }
- };
- pub const Pub = struct {
- /// The destination subject to publish to.
- subject: []const u8,
- /// The reply subject that subscribers can use to send a response back to the publisher/requestor.
- reply_to: ?[]const u8 = null,
- /// The message payload data.
- payload: Payload,
-
- pub fn deinit(self: Pub, alloc: Allocator) void {
- alloc.free(self.subject);
- self.payload.deinit(alloc);
- if (self.reply_to) |r| alloc.free(r);
- }
-
- pub fn toMsg(self: Pub, alloc: Allocator, sid: []const u8) !Msg {
- const res: Msg = .{
- .subject = self.subject,
- .sid = sid,
- .reply_to = self.reply_to,
- .payload = self.payload,
- };
- return res.dupe(alloc);
- }
- };
- pub const HPub = struct {
- header_bytes: usize,
- @"pub": Pub,
-
- pub fn deinit(self: HPub, alloc: Allocator) void {
- self.@"pub".deinit(alloc);
- }
-
- pub fn toHMsg(self: HPub, alloc: Allocator, sid: []const u8) !HMsg {
- return .{
- .header_bytes = self.header_bytes,
- .msg = try self.@"pub".toMsg(alloc, sid),
- };
- }
- };
-
- pub const HMsg = struct {
- header_bytes: usize,
- msg: Msg,
+const message = @import("./parse/message.zig");
+pub const Message = message.Message;
+pub const Payload = @import("./parse/Payload.zig");
+
+const client_types = StaticStringMap(message.Control).initComptime(
+ .{
+ // {"INFO", .info},
+ .{ @tagName(.CONNECT), .CONNECT },
+ .{ @tagName(.PUB), .PUB },
+ .{ @tagName(.HPUB), .HPUB },
+ .{ @tagName(.SUB), .SUB },
+ .{ @tagName(.UNSUB), .UNSUB },
+ // {"MSG", .msg},
+ // {"HMSG", .hmsg},
+ .{ @tagName(.PING), .PING },
+ .{ @tagName(.PONG), .PONG },
+ // {"+OK", .@"+ok"},
+ // {"-ERR", .@"-err"},
+ },
+);
+fn parseStaticStringMap(input: []const u8) ?message.Control {
+ return client_types.get(input);
+}
- pub fn deinit(self: HMsg, alloc: Allocator) void {
- self.msg.deinit(alloc);
- }
+/// Parse a string into its associated MessageType.
+const parse = parseStaticStringMap;
- pub fn dupe(self: HMsg, alloc: Allocator) !HMsg {
- var res = self;
- res.msg = try self.msg.dupe(alloc);
- errdefer alloc.free(res.msg);
- return res;
+/// Get the next Message from the input stream.
+pub fn next(alloc: Allocator, in: *Reader) !Message {
+ var operation_string: ArrayList(u8) = blk: {
+ comptime var buf_len = 0;
+ comptime {
+ for (client_types.keys()) |key| {
+ buf_len = @max(buf_len, key.len);
+ }
}
+ var buf: [buf_len]u8 = undefined;
+ break :blk .initBuffer(&buf);
};
- pub const Sub = struct {
- /// The subject name to subscribe to.
- subject: []const u8,
- /// If specified, the subscriber will join this queue group.
- queue_group: ?[]const u8,
- /// A unique alphanumeric subscription ID, generated by the client.
- sid: []const u8,
- pub fn deinit(self: Sub, alloc: Allocator) void {
- alloc.free(self.subject);
- alloc.free(self.sid);
- if (self.queue_group) |q| alloc.free(q);
- }
- };
- pub const Unsub = struct {
- /// The unique alphanumeric subscription ID of the subject to unsubscribe from.
- sid: []const u8,
- /// A number of messages to wait for before automatically unsubscribing.
- max_msgs: ?usize = null,
+ while (in.peekByte()) |byte| {
+ if (isUpper(byte)) {
+ try operation_string.appendBounded(byte);
+ in.toss(1);
+ } else break;
+ } else |err| return err;
- pub fn deinit(self: Unsub, alloc: Allocator) void {
- alloc.free(self.sid);
- }
+ const operation = parse(operation_string.items) orelse {
+ log.err("Invalid operation: '{s}'", .{operation_string.items});
+ return error.InvalidOperation;
};
- pub const Msg = struct {
- subject: []const u8,
- sid: []const u8,
- reply_to: ?[]const u8,
- payload: Payload,
- pub fn deinit(self: Msg, alloc: Allocator) void {
- alloc.free(self.subject);
- alloc.free(self.sid);
- if (self.reply_to) |r| alloc.free(r);
- self.payload.deinit(alloc);
- }
+ errdefer log.err("Failed to parse {s}", .{operation_string.items});
- pub fn dupe(self: Msg, alloc: Allocator) !Msg {
- var res: Msg = undefined;
- res.subject = try alloc.dupe(u8, self.subject);
- errdefer alloc.free(res.subject);
- res.sid = try alloc.dupe(u8, self.sid);
- errdefer alloc.free(res.sid);
- res.reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null;
- errdefer if (res.reply_to) |r| alloc.free(r);
- res.payload = try self.payload.dupe(alloc);
- errdefer alloc.free(res.payload);
- return res;
- }
- };
-
- const client_types = StaticStringMap(MessageType).initComptime(
- .{
- // {"INFO", .info},
- .{ @tagName(.CONNECT), .CONNECT },
- .{ @tagName(.PUB), .PUB },
- .{ @tagName(.HPUB), .HPUB },
- .{ @tagName(.SUB), .SUB },
- .{ @tagName(.UNSUB), .UNSUB },
- // {"MSG", .msg},
- // {"HMSG", .hmsg},
- .{ @tagName(.PING), .PING },
- .{ @tagName(.PONG), .PONG },
- // {"+OK", .@"+ok"},
- // {"-ERR", .@"-err"},
+ switch (operation) {
+ .CONNECT => return connect(alloc, in),
+ .PUB => {
+ @branchHint(.likely);
+ return @"pub"(alloc, in);
},
- );
- fn parseStaticStringMap(input: []const u8) ?MessageType {
- return client_types.get(input);
- }
-
- pub const parse = parseStaticStringMap;
-
- /// An error should be handled by cleaning up this connection.
- pub fn next(alloc: Allocator, in: *Reader) !Message {
- var operation_string: ArrayList(u8) = blk: {
- comptime var buf_len = 0;
- comptime {
- for (client_types.keys()) |key| {
- buf_len = @max(buf_len, key.len);
- }
- }
- var buf: [buf_len]u8 = undefined;
- break :blk .initBuffer(&buf);
- };
-
- while (in.peekByte()) |byte| {
- if (isUpper(byte)) {
- try operation_string.appendBounded(byte);
- in.toss(1);
- } else break;
- } else |err| return err;
-
- const operation = parse(operation_string.items) orelse {
- log.err("Invalid operation: '{s}'", .{operation_string.items});
- return error.InvalidOperation;
- };
-
- errdefer log.err("Failed to parse {s}", .{operation_string.items});
-
- switch (operation) {
- .CONNECT => {
- return parseConnect(alloc, in);
- },
- .PUB => {
- @branchHint(.likely);
- return parsePub(alloc, in);
- },
- .HPUB => {
- @branchHint(.likely);
- return parseHPub(alloc, in);
- },
- .PING => {
- try expectStreamBytes(in, "\r\n");
- return .PING;
- },
- .PONG => {
- try expectStreamBytes(in, "\r\n");
- return .PONG;
- },
- .SUB => {
- return parseSub(alloc, in);
- },
- .UNSUB => {
- return parseUnsub(alloc, in);
- },
- else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}),
- }
+ .HPUB => {
+ @branchHint(.likely);
+ return hpub(alloc, in);
+ },
+ .PING => {
+ try expectStreamBytes(in, "\r\n");
+ return .PING;
+ },
+ .PONG => {
+ try expectStreamBytes(in, "\r\n");
+ return .PONG;
+ },
+ .SUB => return sub(alloc, in),
+ .UNSUB => return unsub(alloc, in),
+ else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}),
}
-};
+}
-fn parseConnect(alloc: Allocator, in: *Reader) !Message {
+pub fn connect(alloc: Allocator, in: *Reader) !Message {
// for storing the json string
var connect_string_writer_allocating: AllocatingWriter = .init(alloc);
defer connect_string_writer_allocating.deinit();
@@ -366,7 +113,6 @@ fn parseConnect(alloc: Allocator, in: *Reader) !Message {
const connect_str = try connect_string_writer_allocating.toOwnedSlice();
defer alloc.free(connect_str);
- // TODO: should be CONNECTION allocator
const res = try std.json.parseFromSliceLeaky(
Message.Connect,
connect_allocator,
@@ -377,7 +123,7 @@ fn parseConnect(alloc: Allocator, in: *Reader) !Message {
return .{ .CONNECT = try res.dupe(alloc) };
}
-fn parseSub(alloc: Allocator, in: *Reader) !Message {
+pub fn sub(alloc: Allocator, in: *Reader) !Message {
try in.discardAll(1); // throw away space
const subject = try readSubject(alloc, in, .sub);
@@ -450,102 +196,7 @@ fn parseSub(alloc: Allocator, in: *Reader) !Message {
};
}
-test parseSub {
- const alloc = std.testing.allocator;
- const expectEqualDeep = std.testing.expectEqualDeep;
- {
- var in: Reader = .fixed(" foo 1\r\n");
- var res = try parseSub(alloc, &in);
- defer res.SUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .SUB = .{
- .subject = "foo",
- .queue_group = null,
- .sid = "1",
- },
- },
- res,
- );
- }
- {
- var in: Reader = .fixed(" foo 1\r\n");
- var res = try parseSub(alloc, &in);
- defer res.SUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .SUB = .{
- .subject = "foo",
- .queue_group = null,
- .sid = "1",
- },
- },
- res,
- );
- }
- {
- var in: Reader = .fixed(" foo q 1\r\n");
- var res = try parseSub(alloc, &in);
- defer res.SUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .SUB = .{
- .subject = "foo",
- .queue_group = "q",
- .sid = "1",
- },
- },
- res,
- );
- }
- {
- var in: Reader = .fixed(" 1 q 1\r\n");
- var res = try parseSub(alloc, &in);
- defer res.SUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .SUB = .{
- .subject = "1",
- .queue_group = "q",
- .sid = "1",
- },
- },
- res,
- );
- }
- {
- var in: Reader = .fixed(" $SRV.PING 4\r\n");
- var res = try parseSub(alloc, &in);
- defer res.SUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .SUB = .{
- .subject = "$SRV.PING",
- .queue_group = null,
- .sid = "4",
- },
- },
- res,
- );
- }
- {
- var in: Reader = .fixed(" foo.echo q 10\r\n");
- var res = try parseSub(alloc, &in);
- defer res.SUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .SUB = .{
- .subject = "foo.echo",
- .queue_group = "q",
- .sid = "10",
- },
- },
- res,
- );
- }
-}
-
-fn parseUnsub(alloc: Allocator, in: *Reader) !Message {
+pub fn unsub(alloc: Allocator, in: *Reader) !Message {
const States = enum {
before_first,
in_first,
@@ -610,44 +261,7 @@ fn parseUnsub(alloc: Allocator, in: *Reader) !Message {
};
}
-test parseUnsub {
- const alloc = std.testing.allocator;
- const expectEqualDeep = std.testing.expectEqualDeep;
- const expectEqual = std.testing.expectEqual;
- {
- var in: Reader = .fixed(" 1\r\n");
- var res = try parseUnsub(alloc, &in);
- defer res.UNSUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .UNSUB = .{
- .sid = "1",
- .max_msgs = null,
- },
- },
- res,
- );
- try expectEqual(0, in.buffered().len);
- }
-
- {
- var in: Reader = .fixed(" 1 1\r\n");
- var res = try parseUnsub(alloc, &in);
- defer res.UNSUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .UNSUB = .{
- .sid = "1",
- .max_msgs = 1,
- },
- },
- res,
- );
- try expectEqual(0, in.buffered().len);
- }
-}
-
-fn parsePub(alloc: Allocator, in: *Reader) !Message {
+pub fn @"pub"(alloc: Allocator, in: *Reader) !Message {
try in.discardAll(1); // throw away space
// Parse subject
@@ -737,88 +351,7 @@ fn parsePub(alloc: Allocator, in: *Reader) !Message {
};
}
-test parsePub {
- const alloc = std.testing.allocator;
- const expectEqualDeep = std.testing.expectEqualDeep;
- const expectEqual = std.testing.expectEqual;
- {
- var in: Reader = .fixed(" foo 3\r\nbar\r\n");
- var res = try parsePub(alloc, &in);
- defer res.PUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .PUB = .{
- .subject = "foo",
- .reply_to = null,
- .payload = .{
- .len = 3,
- .short = blk: {
- var s: [128]u8 = undefined;
- @memcpy(s[0..3], "bar");
- break :blk s;
- },
- .long = null,
- },
- },
- },
- res,
- );
- try expectEqual(0, in.buffered().len);
- }
-
- {
- var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n");
- var res = try parsePub(alloc, &in);
- defer res.PUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .PUB = .{
- .subject = "foo",
- .reply_to = "reply.to",
- .payload = .{
- .len = 3,
- .short = blk: {
- var s: [128]u8 = undefined;
- @memcpy(s[0..3], "bar");
- break :blk s;
- },
- .long = null,
- },
- },
- },
- res,
- );
- try expectEqual(0, in.buffered().len);
- }
-
- // numeric reply subject
- {
- var in: Reader = .fixed(" foo 5 3\r\nbar\r\n");
- var res = try parsePub(alloc, &in);
- defer res.PUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .PUB = .{
- .subject = "foo",
- .reply_to = "5",
- .payload = .{
- .len = 3,
- .short = blk: {
- var s: [128]u8 = undefined;
- @memcpy(s[0..3], "bar");
- break :blk s;
- },
- .long = null,
- },
- },
- },
- res,
- );
- try expectEqual(0, in.buffered().len);
- }
-}
-
-fn parseHPub(alloc: Allocator, in: *Reader) !Message {
+pub fn hpub(alloc: Allocator, in: *Reader) !Message {
try in.discardAll(1); // throw away space
// Parse subject
@@ -939,13 +472,283 @@ fn parseHPub(alloc: Allocator, in: *Reader) !Message {
};
}
-test parseHPub {
+fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 {
+ var subject_list: ArrayList(u8) = .empty;
+ errdefer subject_list.deinit(alloc);
+
+ // Handle the first character
+ {
+ const byte = try in.takeByte();
+ if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>')))
+ return error.InvalidStream;
+
+ try subject_list.append(alloc, byte);
+ }
+
+ switch (pub_or_sub) {
+ .sub => {
+ while (in.takeByte()) |byte| {
+ if (isWhitespace(byte)) break;
+ if (byte == '.') {
+ const next_byte = try in.peekByte();
+ if (next_byte == '.' or isWhitespace(next_byte))
+ return error.InvalidStream;
+ } else if (byte == '>') {
+ const next_byte = try in.takeByte();
+ if (!isWhitespace(next_byte))
+ return error.InvalidStream;
+ } else if (byte == '*') {
+ const next_byte = try in.peekByte();
+ if (next_byte != '.' and !isWhitespace(next_byte))
+ return error.InvalidStream;
+ }
+ try subject_list.append(alloc, byte);
+ } else |err| return err;
+ },
+ .@"pub" => {
+ while (in.takeByte()) |byte| {
+ if (isWhitespace(byte)) break;
+ if (byte == '*' or byte == '>') return error.InvalidStream;
+ if (byte == '.') {
+ const next_byte = try in.peekByte();
+ if (next_byte == '.' or isWhitespace(next_byte))
+ return error.InvalidStream;
+ }
+ try subject_list.append(alloc, byte);
+ } else |err| return err;
+ },
+ }
+
+ return subject_list.toOwnedSlice(alloc);
+}
+
+inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void {
+ if (!std.mem.eql(u8, try reader.take(expected.len), expected)) {
+ @branchHint(.unlikely);
+ return error.InvalidStream;
+ }
+}
+
+test sub {
+ const alloc = std.testing.allocator;
+ const expectEqualDeep = std.testing.expectEqualDeep;
+ {
+ var in: Reader = .fixed(" foo 1\r\n");
+ var res = try sub(alloc, &in);
+ defer res.SUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .SUB = .{
+ .subject = "foo",
+ .queue_group = null,
+ .sid = "1",
+ },
+ },
+ res,
+ );
+ }
+ {
+ var in: Reader = .fixed(" foo 1\r\n");
+ var res = try sub(alloc, &in);
+ defer res.SUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .SUB = .{
+ .subject = "foo",
+ .queue_group = null,
+ .sid = "1",
+ },
+ },
+ res,
+ );
+ }
+ {
+ var in: Reader = .fixed(" foo q 1\r\n");
+ var res = try sub(alloc, &in);
+ defer res.SUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .SUB = .{
+ .subject = "foo",
+ .queue_group = "q",
+ .sid = "1",
+ },
+ },
+ res,
+ );
+ }
+ {
+ var in: Reader = .fixed(" 1 q 1\r\n");
+ var res = try sub(alloc, &in);
+ defer res.SUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .SUB = .{
+ .subject = "1",
+ .queue_group = "q",
+ .sid = "1",
+ },
+ },
+ res,
+ );
+ }
+ {
+ var in: Reader = .fixed(" $SRV.PING 4\r\n");
+ var res = try sub(alloc, &in);
+ defer res.SUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .SUB = .{
+ .subject = "$SRV.PING",
+ .queue_group = null,
+ .sid = "4",
+ },
+ },
+ res,
+ );
+ }
+ {
+ var in: Reader = .fixed(" foo.echo q 10\r\n");
+ var res = try sub(alloc, &in);
+ defer res.SUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .SUB = .{
+ .subject = "foo.echo",
+ .queue_group = "q",
+ .sid = "10",
+ },
+ },
+ res,
+ );
+ }
+}
+
+test unsub {
+ const alloc = std.testing.allocator;
+ const expectEqualDeep = std.testing.expectEqualDeep;
+ const expectEqual = std.testing.expectEqual;
+ {
+ var in: Reader = .fixed(" 1\r\n");
+ var res = try unsub(alloc, &in);
+ defer res.UNSUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .UNSUB = .{
+ .sid = "1",
+ .max_msgs = null,
+ },
+ },
+ res,
+ );
+ try expectEqual(0, in.buffered().len);
+ }
+
+ {
+ var in: Reader = .fixed(" 1 1\r\n");
+ var res = try unsub(alloc, &in);
+ defer res.UNSUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .UNSUB = .{
+ .sid = "1",
+ .max_msgs = 1,
+ },
+ },
+ res,
+ );
+ try expectEqual(0, in.buffered().len);
+ }
+}
+
+test @"pub" {
+ const alloc = std.testing.allocator;
+ const expectEqualDeep = std.testing.expectEqualDeep;
+ const expectEqual = std.testing.expectEqual;
+ {
+ var in: Reader = .fixed(" foo 3\r\nbar\r\n");
+ var res = try @"pub"(alloc, &in);
+ defer res.PUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .PUB = .{
+ .subject = "foo",
+ .reply_to = null,
+ .payload = .{
+ .len = 3,
+ .short = blk: {
+ var s: [128]u8 = undefined;
+ @memcpy(s[0..3], "bar");
+ break :blk s;
+ },
+ .long = null,
+ },
+ },
+ },
+ res,
+ );
+ try expectEqual(0, in.buffered().len);
+ }
+
+ {
+ var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n");
+ var res = try @"pub"(alloc, &in);
+ defer res.PUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .PUB = .{
+ .subject = "foo",
+ .reply_to = "reply.to",
+ .payload = .{
+ .len = 3,
+ .short = blk: {
+ var s: [128]u8 = undefined;
+ @memcpy(s[0..3], "bar");
+ break :blk s;
+ },
+ .long = null,
+ },
+ },
+ },
+ res,
+ );
+ try expectEqual(0, in.buffered().len);
+ }
+
+ // numeric reply subject
+ {
+ var in: Reader = .fixed(" foo 5 3\r\nbar\r\n");
+ var res = try @"pub"(alloc, &in);
+ defer res.PUB.deinit(alloc);
+ try expectEqualDeep(
+ Message{
+ .PUB = .{
+ .subject = "foo",
+ .reply_to = "5",
+ .payload = .{
+ .len = 3,
+ .short = blk: {
+ var s: [128]u8 = undefined;
+ @memcpy(s[0..3], "bar");
+ break :blk s;
+ },
+ .long = null,
+ },
+ },
+ },
+ res,
+ );
+ try expectEqual(0, in.buffered().len);
+ }
+}
+
+test hpub {
const alloc = std.testing.allocator;
const expectEqualDeep = std.testing.expectEqualDeep;
const expectEqual = std.testing.expectEqual;
{
var in: Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
- var res = try parseHPub(alloc, &in);
+ var res = try hpub(alloc, &in);
defer res.HPUB.deinit(alloc);
try expectEqualDeep(
Message{
@@ -974,7 +777,7 @@ test parseHPub {
{
var in: Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
- var res = try parseHPub(alloc, &in);
+ var res = try hpub(alloc, &in);
defer res.HPUB.deinit(alloc);
try expectEqualDeep(
Message{
@@ -1003,7 +806,7 @@ test parseHPub {
{
var in: Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
- var res = try parseHPub(alloc, &in);
+ var res = try hpub(alloc, &in);
defer res.HPUB.deinit(alloc);
try expectEqualDeep(
Message{
@@ -1030,112 +833,3 @@ test parseHPub {
try expectEqual(0, in.buffered().len);
}
}
-
-fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 {
- var subject_list: ArrayList(u8) = .empty;
- errdefer subject_list.deinit(alloc);
-
- // Handle the first character
- {
- const byte = try in.takeByte();
- if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>')))
- return error.InvalidStream;
-
- try subject_list.append(alloc, byte);
- }
-
- switch (pub_or_sub) {
- .sub => {
- while (in.takeByte()) |byte| {
- if (isWhitespace(byte)) break;
- if (byte == '.') {
- const next_byte = try in.peekByte();
- if (next_byte == '.' or isWhitespace(next_byte))
- return error.InvalidStream;
- } else if (byte == '>') {
- const next_byte = try in.takeByte();
- if (!isWhitespace(next_byte))
- return error.InvalidStream;
- } else if (byte == '*') {
- const next_byte = try in.peekByte();
- if (next_byte != '.' and !isWhitespace(next_byte))
- return error.InvalidStream;
- }
- try subject_list.append(alloc, byte);
- } else |err| return err;
- },
- .@"pub" => {
- while (in.takeByte()) |byte| {
- if (isWhitespace(byte)) break;
- if (byte == '*' or byte == '>') return error.InvalidStream;
- if (byte == '.') {
- const next_byte = try in.peekByte();
- if (next_byte == '.' or isWhitespace(next_byte))
- return error.InvalidStream;
- }
- try subject_list.append(alloc, byte);
- } else |err| return err;
- },
- }
-
- return subject_list.toOwnedSlice(alloc);
-}
-
-inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void {
- if (!std.mem.eql(u8, try reader.take(expected.len), expected)) {
- @branchHint(.unlikely);
- return error.InvalidStream;
- }
-}
-
-test "parsing a stream" {
- const alloc = std.testing.allocator;
- const expectEqualDeep = std.testing.expectEqualDeep;
- const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":fa" ++
- "lse,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43" ++
- ".0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r" ++
- "\nPUB hi 3\r\nfoo\r\n";
- var reader: Reader = .fixed(input);
- var arena: ArenaAllocator = .init(alloc);
- defer arena.deinit();
- const gpa = arena.allocator();
-
- {
- const msg: Message = try Message.next(gpa, &reader);
- const expected: Message = .{
- .CONNECT = .{
- .verbose = false,
- .pedantic = false,
- .tls_required = false,
- .name = "NATS CLI Version v0.2.4",
- .lang = "go",
- .version = "1.43.0",
- .protocol = 1,
- .echo = true,
- .headers = true,
- .no_responders = true,
- },
- };
-
- try expectEqualDeep(expected, msg);
- }
- {
- const msg: Message = try Message.next(gpa, &reader);
- const expected: Message = .{
- .PUB = .{
- .subject = "hi",
- .payload = .{
- .len = 3,
- .short = blk: {
- var s: [128]u8 = undefined;
- const str = "foo";
- @memcpy(s[0..str.len], str);
- break :blk s;
- },
- .long = null,
- },
- },
- };
- try expectEqualDeep(expected, msg);
- }
-}
diff --git a/src/Server/parse/Payload.zig b/src/Server/parse/Payload.zig
new file mode 100644
index 0000000..b512a81
--- /dev/null
+++ b/src/Server/parse/Payload.zig
@@ -0,0 +1,51 @@
+const std = @import("std");
+const Reader = std.Io.Reader;
+const Writer = std.Io.Writer;
+const Allocator = std.mem.Allocator;
+
+const Payload = @This();
+
+len: u32,
+short: [128]u8,
+long: ?[]u8,
+
+pub fn read(alloc: Allocator, in: *Reader, bytes: usize) !Payload {
+ var res: Payload = .{
+ .len = @intCast(bytes),
+ .short = undefined,
+ .long = null,
+ };
+
+ try in.readSliceAll(res.short[0..@min(bytes, res.short.len)]);
+ if (bytes > res.short.len) {
+ const long = try alloc.alloc(u8, bytes - res.short.len);
+ errdefer alloc.free(long);
+ try in.readSliceAll(long);
+ res.long = long;
+ }
+ return res;
+}
+
+pub fn write(self: Payload, out: *Writer) !void {
+ std.debug.assert(out.buffer.len >= self.short.len);
+ std.debug.assert(self.len <= self.short.len or self.long != null);
+ try out.writeAll(self.short[0..@min(self.len, self.short.len)]);
+ if (self.long) |l| {
+ try out.writeAll(l);
+ }
+}
+
+pub fn deinit(self: Payload, alloc: Allocator) void {
+ if (self.long) |l| {
+ alloc.free(l);
+ }
+}
+
+pub fn dupe(self: Payload, alloc: Allocator) !Payload {
+ var res = self;
+ if (self.long) |l| {
+ res.long = try alloc.dupe(u8, l);
+ }
+ errdefer if (res.long) |l| alloc.free(l);
+ return res;
+}
diff --git a/src/Server/parse/message.zig b/src/Server/parse/message.zig
new file mode 100644
index 0000000..c8a308f
--- /dev/null
+++ b/src/Server/parse/message.zig
@@ -0,0 +1,208 @@
+const std = @import("std");
+const ArrayList = std.ArrayList;
+const Allocator = std.mem.Allocator;
+const Reader = std.Io.Reader;
+
+const Payload = @import("Payload.zig");
+
+pub const Control = @typeInfo(Message).@"union".tag_type.?;
+
+pub const Message = union(enum) {
+ INFO: ServerInfo,
+ CONNECT: Connect,
+ PUB: Pub,
+ HPUB: HPub,
+ SUB: Sub,
+ UNSUB: Unsub,
+ MSG: Msg,
+ HMSG: HMsg,
+ PING,
+ PONG,
+ @"+OK": void,
+ @"-ERR": []const u8,
+ pub const ServerInfo = struct {
+ /// The unique identifier of the NATS server.
+ server_id: []const u8,
+ /// The name of the NATS server.
+ server_name: []const u8,
+ /// The version of NATS.
+ version: []const u8,
+ /// The version of golang the NATS server was built with.
+ go: []const u8 = "0.0.0",
+ /// The IP address used to start the NATS server,
+ /// by default this will be 0.0.0.0 and can be
+ /// configured with -client_advertise host:port.
+ host: []const u8 = "0.0.0.0",
+ /// The port number the NATS server is configured
+ /// to listen on.
+ port: u16 = 4222,
+ /// Whether the server supports headers.
+ headers: bool = false,
+ /// Maximum payload size, in bytes, that the server
+ /// will accept from the client.
+ max_payload: u64,
+ /// An integer indicating the protocol version of
+ /// the server. The server version 1.2.0 sets this
+ /// to 1 to indicate that it supports the "Echo"
+ /// feature.
+ proto: u32 = 1,
+ };
+ pub const Connect = struct {
+ verbose: bool = false,
+ pedantic: bool = false,
+ tls_required: bool = false,
+ auth_token: ?[]const u8 = null,
+ user: ?[]const u8 = null,
+ pass: ?[]const u8 = null,
+ name: ?[]const u8 = null,
+ lang: []const u8,
+ version: []const u8,
+ protocol: u32,
+ echo: ?bool = null,
+ sig: ?[]const u8 = null,
+ jwt: ?[]const u8 = null,
+ no_responders: ?bool = null,
+ headers: ?bool = null,
+ nkey: ?[]const u8 = null,
+
+ pub fn deinit(self: Connect, alloc: Allocator) void {
+ if (self.auth_token) |a| alloc.free(a);
+ if (self.user) |u| alloc.free(u);
+ if (self.pass) |p| alloc.free(p);
+ if (self.name) |n| alloc.free(n);
+ alloc.free(self.lang);
+ alloc.free(self.version);
+ if (self.sig) |s| alloc.free(s);
+ if (self.jwt) |j| alloc.free(j);
+ if (self.nkey) |n| alloc.free(n);
+ }
+
+ pub fn dupe(self: Connect, alloc: Allocator) !Connect {
+ var res = self;
+ res.auth_token = if (self.auth_token) |a| try alloc.dupe(u8, a) else null;
+ errdefer if (res.auth_token) |a| alloc.free(a);
+ res.user = if (self.user) |u| try alloc.dupe(u8, u) else null;
+ errdefer if (res.user) |u| alloc.free(u);
+ res.pass = if (self.pass) |p| try alloc.dupe(u8, p) else null;
+ errdefer if (res.pass) |p| alloc.free(p);
+ res.name = if (self.name) |n| try alloc.dupe(u8, n) else null;
+ errdefer if (res.name) |n| alloc.free(n);
+ res.lang = try alloc.dupe(u8, self.lang);
+ errdefer alloc.free(res.lang);
+ res.version = try alloc.dupe(u8, self.version);
+ errdefer alloc.free(res.version);
+ res.sig = if (self.sig) |s| try alloc.dupe(u8, s) else null;
+ errdefer if (res.sig) |s| alloc.free(s);
+ res.jwt = if (self.jwt) |j| try alloc.dupe(u8, j) else null;
+ errdefer if (res.jwt) |j| alloc.free(j);
+ res.nkey = if (self.nkey) |n| try alloc.dupe(u8, n) else null;
+ errdefer if (res.nkey) |n| alloc.free(n);
+ return res;
+ }
+ };
+ pub const Pub = struct {
+ /// The destination subject to publish to.
+ subject: []const u8,
+ /// The reply subject that subscribers can use to send a response back to the publisher/requestor.
+ reply_to: ?[]const u8 = null,
+ /// The message payload data.
+ payload: Payload,
+
+ pub fn deinit(self: Pub, alloc: Allocator) void {
+ alloc.free(self.subject);
+ self.payload.deinit(alloc);
+ if (self.reply_to) |r| alloc.free(r);
+ }
+
+ pub fn toMsg(self: Pub, alloc: Allocator, sid: []const u8) !Msg {
+ const res: Msg = .{
+ .subject = self.subject,
+ .sid = sid,
+ .reply_to = self.reply_to,
+ .payload = self.payload,
+ };
+ return res.dupe(alloc);
+ }
+ };
+ pub const HPub = struct {
+ header_bytes: usize,
+ @"pub": Pub,
+
+ pub fn deinit(self: HPub, alloc: Allocator) void {
+ self.@"pub".deinit(alloc);
+ }
+
+ pub fn toHMsg(self: HPub, alloc: Allocator, sid: []const u8) !HMsg {
+ return .{
+ .header_bytes = self.header_bytes,
+ .msg = try self.@"pub".toMsg(alloc, sid),
+ };
+ }
+ };
+
+ pub const HMsg = struct {
+ header_bytes: usize,
+ msg: Msg,
+
+ pub fn deinit(self: HMsg, alloc: Allocator) void {
+ self.msg.deinit(alloc);
+ }
+
+ pub fn dupe(self: HMsg, alloc: Allocator) !HMsg {
+ var res = self;
+ res.msg = try self.msg.dupe(alloc);
+ errdefer alloc.free(res.msg);
+ return res;
+ }
+ };
+ pub const Sub = struct {
+ /// The subject name to subscribe to.
+ subject: []const u8,
+ /// If specified, the subscriber will join this queue group.
+ queue_group: ?[]const u8,
+ /// A unique alphanumeric subscription ID, generated by the client.
+ sid: []const u8,
+
+ pub fn deinit(self: Sub, alloc: Allocator) void {
+ alloc.free(self.subject);
+ alloc.free(self.sid);
+ if (self.queue_group) |q| alloc.free(q);
+ }
+ };
+ pub const Unsub = struct {
+ /// The unique alphanumeric subscription ID of the subject to unsubscribe from.
+ sid: []const u8,
+ /// A number of messages to wait for before automatically unsubscribing.
+ max_msgs: ?usize = null,
+
+ pub fn deinit(self: Unsub, alloc: Allocator) void {
+ alloc.free(self.sid);
+ }
+ };
+ pub const Msg = struct {
+ subject: []const u8,
+ sid: []const u8,
+ reply_to: ?[]const u8,
+ payload: Payload,
+
+ pub fn deinit(self: Msg, alloc: Allocator) void {
+ alloc.free(self.subject);
+ alloc.free(self.sid);
+ if (self.reply_to) |r| alloc.free(r);
+ self.payload.deinit(alloc);
+ }
+
+ pub fn dupe(self: Msg, alloc: Allocator) !Msg {
+ var res: Msg = undefined;
+ res.subject = try alloc.dupe(u8, self.subject);
+ errdefer alloc.free(res.subject);
+ res.sid = try alloc.dupe(u8, self.sid);
+ errdefer alloc.free(res.sid);
+ res.reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null;
+ errdefer if (res.reply_to) |r| alloc.free(r);
+ res.payload = try self.payload.dupe(alloc);
+ errdefer alloc.free(res.payload);
+ return res;
+ }
+ };
+};
diff --git a/src/main.zig b/src/main.zig
index a413fba..7784df8 100644
--- a/src/main.zig
+++ b/src/main.zig
@@ -25,7 +25,7 @@ pub fn main() !void {
'a',
std.fmt.comptimePrint(
"Address to bind to (default: {s})",
- .{std.meta.fieldInfo(zits.Server.Message.ServerInfo, .host).defaultValue().?},
+ .{std.meta.fieldInfo(zits.Server.parse.Message.ServerInfo, .host).defaultValue().?},
),
),
yazap.Arg.singleValueOption(
@@ -33,7 +33,7 @@ pub fn main() !void {
'p',
std.fmt.comptimePrint(
"Port to listen on (default: {d})",
- .{std.meta.fieldInfo(zits.Server.Message.ServerInfo, .port).defaultValue().?},
+ .{std.meta.fieldInfo(zits.Server.parse.Message.ServerInfo, .port).defaultValue().?},
),
),
yazap.Arg.singleValueOption(
@@ -54,7 +54,7 @@ pub fn main() !void {
const matches = try app.parseProcess(io);
if (matches.subcommandMatches("serve")) |serve_matches| {
- var info: zits.Server.Message.ServerInfo = .{
+ var info: zits.Server.parse.Message.ServerInfo = .{
.server_id = zits.Server.default_id,
.server_name = zits.Server.default_name,
.version = "zits-master",
diff --git a/src/subcommand/server.zig b/src/subcommand/server.zig
index 1aaf572..02a96e5 100644
--- a/src/subcommand/server.zig
+++ b/src/subcommand/server.zig
@@ -10,7 +10,7 @@ const Threaded = Io.Threaded;
const builtin = @import("builtin");
const zits = @import("zits");
-const Message = zits.Server.Message;
+const Message = zits.Server.parse.Message;
const ServerInfo = Message.ServerInfo;
const Server = zits.Server;