diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-06 21:56:39 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-06 21:59:41 -0500 |
| commit | 48969283527e0db6b71893b2b3f3bbeb21e522db (patch) | |
| tree | dee322b777c79ef1cf7133bb65f226488b2cdab7 | |
| parent | cc036318387cc5c44f2a0a2a1e28d067f3e6bdf6 (diff) | |
Major restructuring
This makes things much easier to use as a library
| -rw-r--r-- | src/Server.zig | 6 | ||||
| -rw-r--r-- | src/Server/Client.zig | 5 | ||||
| -rw-r--r-- | src/Server/parse.zig (renamed from src/Server/message_parser.zig) | 1006 | ||||
| -rw-r--r-- | src/Server/parse/Payload.zig | 51 | ||||
| -rw-r--r-- | src/Server/parse/message.zig | 208 | ||||
| -rw-r--r-- | src/main.zig | 6 | ||||
| -rw-r--r-- | src/subcommand/server.zig | 2 |
7 files changed, 619 insertions, 665 deletions
diff --git a/src/Server.zig b/src/Server.zig index 85ddd9e..c2cc17d 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -13,10 +13,10 @@ const Stream = std.Io.net.Stream; pub const Client = @import("./Server/Client.zig"); -const message_parser = @import("./Server/message_parser.zig"); +pub const parse = @import("./Server/parse.zig"); -pub const MessageType = message_parser.MessageType; -pub const Message = message_parser.Message; +const MessageType = parse.MessageType; +const Message = parse.Message; const ServerInfo = Message.ServerInfo; const Msgs = Client.Msgs; diff --git a/src/Server/Client.zig b/src/Server/Client.zig index dff3534..9ec928c 100644 --- a/src/Server/Client.zig +++ b/src/Server/Client.zig @@ -1,4 +1,5 @@ -const Message = @import("message_parser.zig").Message; +const parse = @import("parse.zig"); +const Message = parse.Message; const std = @import("std"); const Queue = std.Io.Queue; @@ -193,7 +194,7 @@ test send { } pub fn next(self: *Client, allocator: std.mem.Allocator) !Message { - return Message.next(allocator, self.from_client); + return parse.next(allocator, self.from_client); } test next { diff --git a/src/Server/message_parser.zig b/src/Server/parse.zig index fd1b5b1..d58c0e5 100644 --- a/src/Server/message_parser.zig +++ b/src/Server/parse.zig @@ -1,352 +1,99 @@ const std = @import("std"); -const Allocator = std.mem.Allocator; const ArenaAllocator = std.heap.ArenaAllocator; +const Allocator = std.mem.Allocator; const ArrayList = std.ArrayList; +const Reader = std.Io.Reader; +const Writer = std.Io.Writer; +const AllocatingWriter = std.Io.Writer.Allocating; const StaticStringMap = std.StaticStringMap; -const Io = std.Io; -const Writer = Io.Writer; -const AllocatingWriter = Writer.Allocating; -const Reader = Io.Reader; +const log = std.log; -const ascii = std.ascii; const isDigit = std.ascii.isDigit; const isUpper = std.ascii.isUpper; const isWhitespace = std.ascii.isWhitespace; const parseUnsigned = std.fmt.parseUnsigned; -const log = std.log; - -pub const Payload = struct { - len: u32, - short: [128]u8, - long: ?[]u8, - - pub fn read(alloc: Allocator, in: *Reader, bytes: usize) !Payload { - var res: Payload = .{ - .len = @intCast(bytes), - .short = undefined, - .long = null, - }; - - try in.readSliceAll(res.short[0..@min(bytes, res.short.len)]); - if (bytes > res.short.len) { - const long = try alloc.alloc(u8, bytes - res.short.len); - errdefer alloc.free(long); - try in.readSliceAll(long); - res.long = long; - } - return res; - } - - pub fn write(self: Payload, out: *Writer) !void { - std.debug.assert(out.buffer.len >= self.short.len); - std.debug.assert(self.len <= self.short.len or self.long != null); - try out.writeAll(self.short[0..@min(self.len, self.short.len)]); - if (self.long) |l| { - try out.writeAll(l); - } - } - - pub fn deinit(self: Payload, alloc: Allocator) void { - if (self.long) |l| { - alloc.free(l); - } - } - - pub fn dupe(self: Payload, alloc: Allocator) !Payload { - var res = self; - if (self.long) |l| { - res.long = try alloc.dupe(u8, l); - } - errdefer if (res.long) |l| alloc.free(l); - return res; - } -}; - -pub const MessageType = @typeInfo(Message).@"union".tag_type.?; - -pub const Message = union(enum) { - INFO: ServerInfo, - CONNECT: Connect, - PUB: Pub, - HPUB: HPub, - SUB: Sub, - UNSUB: Unsub, - MSG: Msg, - HMSG: HMsg, - PING, - PONG, - @"+OK": void, - @"-ERR": []const u8, - pub const ServerInfo = struct { - /// The unique identifier of the NATS server. - server_id: []const u8, - /// The name of the NATS server. - server_name: []const u8, - /// The version of NATS. - version: []const u8, - /// The version of golang the NATS server was built with. - go: []const u8 = "0.0.0", - /// The IP address used to start the NATS server, - /// by default this will be 0.0.0.0 and can be - /// configured with -client_advertise host:port. - host: []const u8 = "0.0.0.0", - /// The port number the NATS server is configured - /// to listen on. - port: u16 = 4222, - /// Whether the server supports headers. - headers: bool = false, - /// Maximum payload size, in bytes, that the server - /// will accept from the client. - max_payload: u64, - /// An integer indicating the protocol version of - /// the server. The server version 1.2.0 sets this - /// to 1 to indicate that it supports the "Echo" - /// feature. - proto: u32 = 1, - }; - pub const Connect = struct { - verbose: bool = false, - pedantic: bool = false, - tls_required: bool = false, - auth_token: ?[]const u8 = null, - user: ?[]const u8 = null, - pass: ?[]const u8 = null, - name: ?[]const u8 = null, - lang: []const u8, - version: []const u8, - protocol: u32, - echo: ?bool = null, - sig: ?[]const u8 = null, - jwt: ?[]const u8 = null, - no_responders: ?bool = null, - headers: ?bool = null, - nkey: ?[]const u8 = null, - - pub fn deinit(self: Connect, alloc: Allocator) void { - if (self.auth_token) |a| alloc.free(a); - if (self.user) |u| alloc.free(u); - if (self.pass) |p| alloc.free(p); - if (self.name) |n| alloc.free(n); - alloc.free(self.lang); - alloc.free(self.version); - if (self.sig) |s| alloc.free(s); - if (self.jwt) |j| alloc.free(j); - if (self.nkey) |n| alloc.free(n); - } - - pub fn dupe(self: Connect, alloc: Allocator) !Connect { - var res = self; - res.auth_token = if (self.auth_token) |a| try alloc.dupe(u8, a) else null; - errdefer if (res.auth_token) |a| alloc.free(a); - res.user = if (self.user) |u| try alloc.dupe(u8, u) else null; - errdefer if (res.user) |u| alloc.free(u); - res.pass = if (self.pass) |p| try alloc.dupe(u8, p) else null; - errdefer if (res.pass) |p| alloc.free(p); - res.name = if (self.name) |n| try alloc.dupe(u8, n) else null; - errdefer if (res.name) |n| alloc.free(n); - res.lang = try alloc.dupe(u8, self.lang); - errdefer alloc.free(res.lang); - res.version = try alloc.dupe(u8, self.version); - errdefer alloc.free(res.version); - res.sig = if (self.sig) |s| try alloc.dupe(u8, s) else null; - errdefer if (res.sig) |s| alloc.free(s); - res.jwt = if (self.jwt) |j| try alloc.dupe(u8, j) else null; - errdefer if (res.jwt) |j| alloc.free(j); - res.nkey = if (self.nkey) |n| try alloc.dupe(u8, n) else null; - errdefer if (res.nkey) |n| alloc.free(n); - return res; - } - }; - pub const Pub = struct { - /// The destination subject to publish to. - subject: []const u8, - /// The reply subject that subscribers can use to send a response back to the publisher/requestor. - reply_to: ?[]const u8 = null, - /// The message payload data. - payload: Payload, - - pub fn deinit(self: Pub, alloc: Allocator) void { - alloc.free(self.subject); - self.payload.deinit(alloc); - if (self.reply_to) |r| alloc.free(r); - } - - pub fn toMsg(self: Pub, alloc: Allocator, sid: []const u8) !Msg { - const res: Msg = .{ - .subject = self.subject, - .sid = sid, - .reply_to = self.reply_to, - .payload = self.payload, - }; - return res.dupe(alloc); - } - }; - pub const HPub = struct { - header_bytes: usize, - @"pub": Pub, - - pub fn deinit(self: HPub, alloc: Allocator) void { - self.@"pub".deinit(alloc); - } - - pub fn toHMsg(self: HPub, alloc: Allocator, sid: []const u8) !HMsg { - return .{ - .header_bytes = self.header_bytes, - .msg = try self.@"pub".toMsg(alloc, sid), - }; - } - }; - - pub const HMsg = struct { - header_bytes: usize, - msg: Msg, +const message = @import("./parse/message.zig"); +pub const Message = message.Message; +pub const Payload = @import("./parse/Payload.zig"); + +const client_types = StaticStringMap(message.Control).initComptime( + .{ + // {"INFO", .info}, + .{ @tagName(.CONNECT), .CONNECT }, + .{ @tagName(.PUB), .PUB }, + .{ @tagName(.HPUB), .HPUB }, + .{ @tagName(.SUB), .SUB }, + .{ @tagName(.UNSUB), .UNSUB }, + // {"MSG", .msg}, + // {"HMSG", .hmsg}, + .{ @tagName(.PING), .PING }, + .{ @tagName(.PONG), .PONG }, + // {"+OK", .@"+ok"}, + // {"-ERR", .@"-err"}, + }, +); +fn parseStaticStringMap(input: []const u8) ?message.Control { + return client_types.get(input); +} - pub fn deinit(self: HMsg, alloc: Allocator) void { - self.msg.deinit(alloc); - } +/// Parse a string into its associated MessageType. +const parse = parseStaticStringMap; - pub fn dupe(self: HMsg, alloc: Allocator) !HMsg { - var res = self; - res.msg = try self.msg.dupe(alloc); - errdefer alloc.free(res.msg); - return res; +/// Get the next Message from the input stream. +pub fn next(alloc: Allocator, in: *Reader) !Message { + var operation_string: ArrayList(u8) = blk: { + comptime var buf_len = 0; + comptime { + for (client_types.keys()) |key| { + buf_len = @max(buf_len, key.len); + } } + var buf: [buf_len]u8 = undefined; + break :blk .initBuffer(&buf); }; - pub const Sub = struct { - /// The subject name to subscribe to. - subject: []const u8, - /// If specified, the subscriber will join this queue group. - queue_group: ?[]const u8, - /// A unique alphanumeric subscription ID, generated by the client. - sid: []const u8, - pub fn deinit(self: Sub, alloc: Allocator) void { - alloc.free(self.subject); - alloc.free(self.sid); - if (self.queue_group) |q| alloc.free(q); - } - }; - pub const Unsub = struct { - /// The unique alphanumeric subscription ID of the subject to unsubscribe from. - sid: []const u8, - /// A number of messages to wait for before automatically unsubscribing. - max_msgs: ?usize = null, + while (in.peekByte()) |byte| { + if (isUpper(byte)) { + try operation_string.appendBounded(byte); + in.toss(1); + } else break; + } else |err| return err; - pub fn deinit(self: Unsub, alloc: Allocator) void { - alloc.free(self.sid); - } + const operation = parse(operation_string.items) orelse { + log.err("Invalid operation: '{s}'", .{operation_string.items}); + return error.InvalidOperation; }; - pub const Msg = struct { - subject: []const u8, - sid: []const u8, - reply_to: ?[]const u8, - payload: Payload, - pub fn deinit(self: Msg, alloc: Allocator) void { - alloc.free(self.subject); - alloc.free(self.sid); - if (self.reply_to) |r| alloc.free(r); - self.payload.deinit(alloc); - } + errdefer log.err("Failed to parse {s}", .{operation_string.items}); - pub fn dupe(self: Msg, alloc: Allocator) !Msg { - var res: Msg = undefined; - res.subject = try alloc.dupe(u8, self.subject); - errdefer alloc.free(res.subject); - res.sid = try alloc.dupe(u8, self.sid); - errdefer alloc.free(res.sid); - res.reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null; - errdefer if (res.reply_to) |r| alloc.free(r); - res.payload = try self.payload.dupe(alloc); - errdefer alloc.free(res.payload); - return res; - } - }; - - const client_types = StaticStringMap(MessageType).initComptime( - .{ - // {"INFO", .info}, - .{ @tagName(.CONNECT), .CONNECT }, - .{ @tagName(.PUB), .PUB }, - .{ @tagName(.HPUB), .HPUB }, - .{ @tagName(.SUB), .SUB }, - .{ @tagName(.UNSUB), .UNSUB }, - // {"MSG", .msg}, - // {"HMSG", .hmsg}, - .{ @tagName(.PING), .PING }, - .{ @tagName(.PONG), .PONG }, - // {"+OK", .@"+ok"}, - // {"-ERR", .@"-err"}, + switch (operation) { + .CONNECT => return connect(alloc, in), + .PUB => { + @branchHint(.likely); + return @"pub"(alloc, in); }, - ); - fn parseStaticStringMap(input: []const u8) ?MessageType { - return client_types.get(input); - } - - pub const parse = parseStaticStringMap; - - /// An error should be handled by cleaning up this connection. - pub fn next(alloc: Allocator, in: *Reader) !Message { - var operation_string: ArrayList(u8) = blk: { - comptime var buf_len = 0; - comptime { - for (client_types.keys()) |key| { - buf_len = @max(buf_len, key.len); - } - } - var buf: [buf_len]u8 = undefined; - break :blk .initBuffer(&buf); - }; - - while (in.peekByte()) |byte| { - if (isUpper(byte)) { - try operation_string.appendBounded(byte); - in.toss(1); - } else break; - } else |err| return err; - - const operation = parse(operation_string.items) orelse { - log.err("Invalid operation: '{s}'", .{operation_string.items}); - return error.InvalidOperation; - }; - - errdefer log.err("Failed to parse {s}", .{operation_string.items}); - - switch (operation) { - .CONNECT => { - return parseConnect(alloc, in); - }, - .PUB => { - @branchHint(.likely); - return parsePub(alloc, in); - }, - .HPUB => { - @branchHint(.likely); - return parseHPub(alloc, in); - }, - .PING => { - try expectStreamBytes(in, "\r\n"); - return .PING; - }, - .PONG => { - try expectStreamBytes(in, "\r\n"); - return .PONG; - }, - .SUB => { - return parseSub(alloc, in); - }, - .UNSUB => { - return parseUnsub(alloc, in); - }, - else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}), - } + .HPUB => { + @branchHint(.likely); + return hpub(alloc, in); + }, + .PING => { + try expectStreamBytes(in, "\r\n"); + return .PING; + }, + .PONG => { + try expectStreamBytes(in, "\r\n"); + return .PONG; + }, + .SUB => return sub(alloc, in), + .UNSUB => return unsub(alloc, in), + else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}), } -}; +} -fn parseConnect(alloc: Allocator, in: *Reader) !Message { +pub fn connect(alloc: Allocator, in: *Reader) !Message { // for storing the json string var connect_string_writer_allocating: AllocatingWriter = .init(alloc); defer connect_string_writer_allocating.deinit(); @@ -366,7 +113,6 @@ fn parseConnect(alloc: Allocator, in: *Reader) !Message { const connect_str = try connect_string_writer_allocating.toOwnedSlice(); defer alloc.free(connect_str); - // TODO: should be CONNECTION allocator const res = try std.json.parseFromSliceLeaky( Message.Connect, connect_allocator, @@ -377,7 +123,7 @@ fn parseConnect(alloc: Allocator, in: *Reader) !Message { return .{ .CONNECT = try res.dupe(alloc) }; } -fn parseSub(alloc: Allocator, in: *Reader) !Message { +pub fn sub(alloc: Allocator, in: *Reader) !Message { try in.discardAll(1); // throw away space const subject = try readSubject(alloc, in, .sub); @@ -450,102 +196,7 @@ fn parseSub(alloc: Allocator, in: *Reader) !Message { }; } -test parseSub { - const alloc = std.testing.allocator; - const expectEqualDeep = std.testing.expectEqualDeep; - { - var in: Reader = .fixed(" foo 1\r\n"); - var res = try parseSub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "foo", - .queue_group = null, - .sid = "1", - }, - }, - res, - ); - } - { - var in: Reader = .fixed(" foo 1\r\n"); - var res = try parseSub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "foo", - .queue_group = null, - .sid = "1", - }, - }, - res, - ); - } - { - var in: Reader = .fixed(" foo q 1\r\n"); - var res = try parseSub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "foo", - .queue_group = "q", - .sid = "1", - }, - }, - res, - ); - } - { - var in: Reader = .fixed(" 1 q 1\r\n"); - var res = try parseSub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "1", - .queue_group = "q", - .sid = "1", - }, - }, - res, - ); - } - { - var in: Reader = .fixed(" $SRV.PING 4\r\n"); - var res = try parseSub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "$SRV.PING", - .queue_group = null, - .sid = "4", - }, - }, - res, - ); - } - { - var in: Reader = .fixed(" foo.echo q 10\r\n"); - var res = try parseSub(alloc, &in); - defer res.SUB.deinit(alloc); - try expectEqualDeep( - Message{ - .SUB = .{ - .subject = "foo.echo", - .queue_group = "q", - .sid = "10", - }, - }, - res, - ); - } -} - -fn parseUnsub(alloc: Allocator, in: *Reader) !Message { +pub fn unsub(alloc: Allocator, in: *Reader) !Message { const States = enum { before_first, in_first, @@ -610,44 +261,7 @@ fn parseUnsub(alloc: Allocator, in: *Reader) !Message { }; } -test parseUnsub { - const alloc = std.testing.allocator; - const expectEqualDeep = std.testing.expectEqualDeep; - const expectEqual = std.testing.expectEqual; - { - var in: Reader = .fixed(" 1\r\n"); - var res = try parseUnsub(alloc, &in); - defer res.UNSUB.deinit(alloc); - try expectEqualDeep( - Message{ - .UNSUB = .{ - .sid = "1", - .max_msgs = null, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); - } - - { - var in: Reader = .fixed(" 1 1\r\n"); - var res = try parseUnsub(alloc, &in); - defer res.UNSUB.deinit(alloc); - try expectEqualDeep( - Message{ - .UNSUB = .{ - .sid = "1", - .max_msgs = 1, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); - } -} - -fn parsePub(alloc: Allocator, in: *Reader) !Message { +pub fn @"pub"(alloc: Allocator, in: *Reader) !Message { try in.discardAll(1); // throw away space // Parse subject @@ -737,88 +351,7 @@ fn parsePub(alloc: Allocator, in: *Reader) !Message { }; } -test parsePub { - const alloc = std.testing.allocator; - const expectEqualDeep = std.testing.expectEqualDeep; - const expectEqual = std.testing.expectEqual; - { - var in: Reader = .fixed(" foo 3\r\nbar\r\n"); - var res = try parsePub(alloc, &in); - defer res.PUB.deinit(alloc); - try expectEqualDeep( - Message{ - .PUB = .{ - .subject = "foo", - .reply_to = null, - .payload = .{ - .len = 3, - .short = blk: { - var s: [128]u8 = undefined; - @memcpy(s[0..3], "bar"); - break :blk s; - }, - .long = null, - }, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); - } - - { - var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n"); - var res = try parsePub(alloc, &in); - defer res.PUB.deinit(alloc); - try expectEqualDeep( - Message{ - .PUB = .{ - .subject = "foo", - .reply_to = "reply.to", - .payload = .{ - .len = 3, - .short = blk: { - var s: [128]u8 = undefined; - @memcpy(s[0..3], "bar"); - break :blk s; - }, - .long = null, - }, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); - } - - // numeric reply subject - { - var in: Reader = .fixed(" foo 5 3\r\nbar\r\n"); - var res = try parsePub(alloc, &in); - defer res.PUB.deinit(alloc); - try expectEqualDeep( - Message{ - .PUB = .{ - .subject = "foo", - .reply_to = "5", - .payload = .{ - .len = 3, - .short = blk: { - var s: [128]u8 = undefined; - @memcpy(s[0..3], "bar"); - break :blk s; - }, - .long = null, - }, - }, - }, - res, - ); - try expectEqual(0, in.buffered().len); - } -} - -fn parseHPub(alloc: Allocator, in: *Reader) !Message { +pub fn hpub(alloc: Allocator, in: *Reader) !Message { try in.discardAll(1); // throw away space // Parse subject @@ -939,13 +472,283 @@ fn parseHPub(alloc: Allocator, in: *Reader) !Message { }; } -test parseHPub { +fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 { + var subject_list: ArrayList(u8) = .empty; + errdefer subject_list.deinit(alloc); + + // Handle the first character + { + const byte = try in.takeByte(); + if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>'))) + return error.InvalidStream; + + try subject_list.append(alloc, byte); + } + + switch (pub_or_sub) { + .sub => { + while (in.takeByte()) |byte| { + if (isWhitespace(byte)) break; + if (byte == '.') { + const next_byte = try in.peekByte(); + if (next_byte == '.' or isWhitespace(next_byte)) + return error.InvalidStream; + } else if (byte == '>') { + const next_byte = try in.takeByte(); + if (!isWhitespace(next_byte)) + return error.InvalidStream; + } else if (byte == '*') { + const next_byte = try in.peekByte(); + if (next_byte != '.' and !isWhitespace(next_byte)) + return error.InvalidStream; + } + try subject_list.append(alloc, byte); + } else |err| return err; + }, + .@"pub" => { + while (in.takeByte()) |byte| { + if (isWhitespace(byte)) break; + if (byte == '*' or byte == '>') return error.InvalidStream; + if (byte == '.') { + const next_byte = try in.peekByte(); + if (next_byte == '.' or isWhitespace(next_byte)) + return error.InvalidStream; + } + try subject_list.append(alloc, byte); + } else |err| return err; + }, + } + + return subject_list.toOwnedSlice(alloc); +} + +inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void { + if (!std.mem.eql(u8, try reader.take(expected.len), expected)) { + @branchHint(.unlikely); + return error.InvalidStream; + } +} + +test sub { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; + { + var in: Reader = .fixed(" foo 1\r\n"); + var res = try sub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "foo", + .queue_group = null, + .sid = "1", + }, + }, + res, + ); + } + { + var in: Reader = .fixed(" foo 1\r\n"); + var res = try sub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "foo", + .queue_group = null, + .sid = "1", + }, + }, + res, + ); + } + { + var in: Reader = .fixed(" foo q 1\r\n"); + var res = try sub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "foo", + .queue_group = "q", + .sid = "1", + }, + }, + res, + ); + } + { + var in: Reader = .fixed(" 1 q 1\r\n"); + var res = try sub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "1", + .queue_group = "q", + .sid = "1", + }, + }, + res, + ); + } + { + var in: Reader = .fixed(" $SRV.PING 4\r\n"); + var res = try sub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "$SRV.PING", + .queue_group = null, + .sid = "4", + }, + }, + res, + ); + } + { + var in: Reader = .fixed(" foo.echo q 10\r\n"); + var res = try sub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "foo.echo", + .queue_group = "q", + .sid = "10", + }, + }, + res, + ); + } +} + +test unsub { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; + const expectEqual = std.testing.expectEqual; + { + var in: Reader = .fixed(" 1\r\n"); + var res = try unsub(alloc, &in); + defer res.UNSUB.deinit(alloc); + try expectEqualDeep( + Message{ + .UNSUB = .{ + .sid = "1", + .max_msgs = null, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } + + { + var in: Reader = .fixed(" 1 1\r\n"); + var res = try unsub(alloc, &in); + defer res.UNSUB.deinit(alloc); + try expectEqualDeep( + Message{ + .UNSUB = .{ + .sid = "1", + .max_msgs = 1, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } +} + +test @"pub" { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; + const expectEqual = std.testing.expectEqual; + { + var in: Reader = .fixed(" foo 3\r\nbar\r\n"); + var res = try @"pub"(alloc, &in); + defer res.PUB.deinit(alloc); + try expectEqualDeep( + Message{ + .PUB = .{ + .subject = "foo", + .reply_to = null, + .payload = .{ + .len = 3, + .short = blk: { + var s: [128]u8 = undefined; + @memcpy(s[0..3], "bar"); + break :blk s; + }, + .long = null, + }, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } + + { + var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n"); + var res = try @"pub"(alloc, &in); + defer res.PUB.deinit(alloc); + try expectEqualDeep( + Message{ + .PUB = .{ + .subject = "foo", + .reply_to = "reply.to", + .payload = .{ + .len = 3, + .short = blk: { + var s: [128]u8 = undefined; + @memcpy(s[0..3], "bar"); + break :blk s; + }, + .long = null, + }, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } + + // numeric reply subject + { + var in: Reader = .fixed(" foo 5 3\r\nbar\r\n"); + var res = try @"pub"(alloc, &in); + defer res.PUB.deinit(alloc); + try expectEqualDeep( + Message{ + .PUB = .{ + .subject = "foo", + .reply_to = "5", + .payload = .{ + .len = 3, + .short = blk: { + var s: [128]u8 = undefined; + @memcpy(s[0..3], "bar"); + break :blk s; + }, + .long = null, + }, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } +} + +test hpub { const alloc = std.testing.allocator; const expectEqualDeep = std.testing.expectEqualDeep; const expectEqual = std.testing.expectEqual; { var in: Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); - var res = try parseHPub(alloc, &in); + var res = try hpub(alloc, &in); defer res.HPUB.deinit(alloc); try expectEqualDeep( Message{ @@ -974,7 +777,7 @@ test parseHPub { { var in: Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); - var res = try parseHPub(alloc, &in); + var res = try hpub(alloc, &in); defer res.HPUB.deinit(alloc); try expectEqualDeep( Message{ @@ -1003,7 +806,7 @@ test parseHPub { { var in: Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); - var res = try parseHPub(alloc, &in); + var res = try hpub(alloc, &in); defer res.HPUB.deinit(alloc); try expectEqualDeep( Message{ @@ -1030,112 +833,3 @@ test parseHPub { try expectEqual(0, in.buffered().len); } } - -fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 { - var subject_list: ArrayList(u8) = .empty; - errdefer subject_list.deinit(alloc); - - // Handle the first character - { - const byte = try in.takeByte(); - if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>'))) - return error.InvalidStream; - - try subject_list.append(alloc, byte); - } - - switch (pub_or_sub) { - .sub => { - while (in.takeByte()) |byte| { - if (isWhitespace(byte)) break; - if (byte == '.') { - const next_byte = try in.peekByte(); - if (next_byte == '.' or isWhitespace(next_byte)) - return error.InvalidStream; - } else if (byte == '>') { - const next_byte = try in.takeByte(); - if (!isWhitespace(next_byte)) - return error.InvalidStream; - } else if (byte == '*') { - const next_byte = try in.peekByte(); - if (next_byte != '.' and !isWhitespace(next_byte)) - return error.InvalidStream; - } - try subject_list.append(alloc, byte); - } else |err| return err; - }, - .@"pub" => { - while (in.takeByte()) |byte| { - if (isWhitespace(byte)) break; - if (byte == '*' or byte == '>') return error.InvalidStream; - if (byte == '.') { - const next_byte = try in.peekByte(); - if (next_byte == '.' or isWhitespace(next_byte)) - return error.InvalidStream; - } - try subject_list.append(alloc, byte); - } else |err| return err; - }, - } - - return subject_list.toOwnedSlice(alloc); -} - -inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void { - if (!std.mem.eql(u8, try reader.take(expected.len), expected)) { - @branchHint(.unlikely); - return error.InvalidStream; - } -} - -test "parsing a stream" { - const alloc = std.testing.allocator; - const expectEqualDeep = std.testing.expectEqualDeep; - const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":fa" ++ - "lse,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43" ++ - ".0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r" ++ - "\nPUB hi 3\r\nfoo\r\n"; - var reader: Reader = .fixed(input); - var arena: ArenaAllocator = .init(alloc); - defer arena.deinit(); - const gpa = arena.allocator(); - - { - const msg: Message = try Message.next(gpa, &reader); - const expected: Message = .{ - .CONNECT = .{ - .verbose = false, - .pedantic = false, - .tls_required = false, - .name = "NATS CLI Version v0.2.4", - .lang = "go", - .version = "1.43.0", - .protocol = 1, - .echo = true, - .headers = true, - .no_responders = true, - }, - }; - - try expectEqualDeep(expected, msg); - } - { - const msg: Message = try Message.next(gpa, &reader); - const expected: Message = .{ - .PUB = .{ - .subject = "hi", - .payload = .{ - .len = 3, - .short = blk: { - var s: [128]u8 = undefined; - const str = "foo"; - @memcpy(s[0..str.len], str); - break :blk s; - }, - .long = null, - }, - }, - }; - try expectEqualDeep(expected, msg); - } -} diff --git a/src/Server/parse/Payload.zig b/src/Server/parse/Payload.zig new file mode 100644 index 0000000..b512a81 --- /dev/null +++ b/src/Server/parse/Payload.zig @@ -0,0 +1,51 @@ +const std = @import("std"); +const Reader = std.Io.Reader; +const Writer = std.Io.Writer; +const Allocator = std.mem.Allocator; + +const Payload = @This(); + +len: u32, +short: [128]u8, +long: ?[]u8, + +pub fn read(alloc: Allocator, in: *Reader, bytes: usize) !Payload { + var res: Payload = .{ + .len = @intCast(bytes), + .short = undefined, + .long = null, + }; + + try in.readSliceAll(res.short[0..@min(bytes, res.short.len)]); + if (bytes > res.short.len) { + const long = try alloc.alloc(u8, bytes - res.short.len); + errdefer alloc.free(long); + try in.readSliceAll(long); + res.long = long; + } + return res; +} + +pub fn write(self: Payload, out: *Writer) !void { + std.debug.assert(out.buffer.len >= self.short.len); + std.debug.assert(self.len <= self.short.len or self.long != null); + try out.writeAll(self.short[0..@min(self.len, self.short.len)]); + if (self.long) |l| { + try out.writeAll(l); + } +} + +pub fn deinit(self: Payload, alloc: Allocator) void { + if (self.long) |l| { + alloc.free(l); + } +} + +pub fn dupe(self: Payload, alloc: Allocator) !Payload { + var res = self; + if (self.long) |l| { + res.long = try alloc.dupe(u8, l); + } + errdefer if (res.long) |l| alloc.free(l); + return res; +} diff --git a/src/Server/parse/message.zig b/src/Server/parse/message.zig new file mode 100644 index 0000000..c8a308f --- /dev/null +++ b/src/Server/parse/message.zig @@ -0,0 +1,208 @@ +const std = @import("std"); +const ArrayList = std.ArrayList; +const Allocator = std.mem.Allocator; +const Reader = std.Io.Reader; + +const Payload = @import("Payload.zig"); + +pub const Control = @typeInfo(Message).@"union".tag_type.?; + +pub const Message = union(enum) { + INFO: ServerInfo, + CONNECT: Connect, + PUB: Pub, + HPUB: HPub, + SUB: Sub, + UNSUB: Unsub, + MSG: Msg, + HMSG: HMsg, + PING, + PONG, + @"+OK": void, + @"-ERR": []const u8, + pub const ServerInfo = struct { + /// The unique identifier of the NATS server. + server_id: []const u8, + /// The name of the NATS server. + server_name: []const u8, + /// The version of NATS. + version: []const u8, + /// The version of golang the NATS server was built with. + go: []const u8 = "0.0.0", + /// The IP address used to start the NATS server, + /// by default this will be 0.0.0.0 and can be + /// configured with -client_advertise host:port. + host: []const u8 = "0.0.0.0", + /// The port number the NATS server is configured + /// to listen on. + port: u16 = 4222, + /// Whether the server supports headers. + headers: bool = false, + /// Maximum payload size, in bytes, that the server + /// will accept from the client. + max_payload: u64, + /// An integer indicating the protocol version of + /// the server. The server version 1.2.0 sets this + /// to 1 to indicate that it supports the "Echo" + /// feature. + proto: u32 = 1, + }; + pub const Connect = struct { + verbose: bool = false, + pedantic: bool = false, + tls_required: bool = false, + auth_token: ?[]const u8 = null, + user: ?[]const u8 = null, + pass: ?[]const u8 = null, + name: ?[]const u8 = null, + lang: []const u8, + version: []const u8, + protocol: u32, + echo: ?bool = null, + sig: ?[]const u8 = null, + jwt: ?[]const u8 = null, + no_responders: ?bool = null, + headers: ?bool = null, + nkey: ?[]const u8 = null, + + pub fn deinit(self: Connect, alloc: Allocator) void { + if (self.auth_token) |a| alloc.free(a); + if (self.user) |u| alloc.free(u); + if (self.pass) |p| alloc.free(p); + if (self.name) |n| alloc.free(n); + alloc.free(self.lang); + alloc.free(self.version); + if (self.sig) |s| alloc.free(s); + if (self.jwt) |j| alloc.free(j); + if (self.nkey) |n| alloc.free(n); + } + + pub fn dupe(self: Connect, alloc: Allocator) !Connect { + var res = self; + res.auth_token = if (self.auth_token) |a| try alloc.dupe(u8, a) else null; + errdefer if (res.auth_token) |a| alloc.free(a); + res.user = if (self.user) |u| try alloc.dupe(u8, u) else null; + errdefer if (res.user) |u| alloc.free(u); + res.pass = if (self.pass) |p| try alloc.dupe(u8, p) else null; + errdefer if (res.pass) |p| alloc.free(p); + res.name = if (self.name) |n| try alloc.dupe(u8, n) else null; + errdefer if (res.name) |n| alloc.free(n); + res.lang = try alloc.dupe(u8, self.lang); + errdefer alloc.free(res.lang); + res.version = try alloc.dupe(u8, self.version); + errdefer alloc.free(res.version); + res.sig = if (self.sig) |s| try alloc.dupe(u8, s) else null; + errdefer if (res.sig) |s| alloc.free(s); + res.jwt = if (self.jwt) |j| try alloc.dupe(u8, j) else null; + errdefer if (res.jwt) |j| alloc.free(j); + res.nkey = if (self.nkey) |n| try alloc.dupe(u8, n) else null; + errdefer if (res.nkey) |n| alloc.free(n); + return res; + } + }; + pub const Pub = struct { + /// The destination subject to publish to. + subject: []const u8, + /// The reply subject that subscribers can use to send a response back to the publisher/requestor. + reply_to: ?[]const u8 = null, + /// The message payload data. + payload: Payload, + + pub fn deinit(self: Pub, alloc: Allocator) void { + alloc.free(self.subject); + self.payload.deinit(alloc); + if (self.reply_to) |r| alloc.free(r); + } + + pub fn toMsg(self: Pub, alloc: Allocator, sid: []const u8) !Msg { + const res: Msg = .{ + .subject = self.subject, + .sid = sid, + .reply_to = self.reply_to, + .payload = self.payload, + }; + return res.dupe(alloc); + } + }; + pub const HPub = struct { + header_bytes: usize, + @"pub": Pub, + + pub fn deinit(self: HPub, alloc: Allocator) void { + self.@"pub".deinit(alloc); + } + + pub fn toHMsg(self: HPub, alloc: Allocator, sid: []const u8) !HMsg { + return .{ + .header_bytes = self.header_bytes, + .msg = try self.@"pub".toMsg(alloc, sid), + }; + } + }; + + pub const HMsg = struct { + header_bytes: usize, + msg: Msg, + + pub fn deinit(self: HMsg, alloc: Allocator) void { + self.msg.deinit(alloc); + } + + pub fn dupe(self: HMsg, alloc: Allocator) !HMsg { + var res = self; + res.msg = try self.msg.dupe(alloc); + errdefer alloc.free(res.msg); + return res; + } + }; + pub const Sub = struct { + /// The subject name to subscribe to. + subject: []const u8, + /// If specified, the subscriber will join this queue group. + queue_group: ?[]const u8, + /// A unique alphanumeric subscription ID, generated by the client. + sid: []const u8, + + pub fn deinit(self: Sub, alloc: Allocator) void { + alloc.free(self.subject); + alloc.free(self.sid); + if (self.queue_group) |q| alloc.free(q); + } + }; + pub const Unsub = struct { + /// The unique alphanumeric subscription ID of the subject to unsubscribe from. + sid: []const u8, + /// A number of messages to wait for before automatically unsubscribing. + max_msgs: ?usize = null, + + pub fn deinit(self: Unsub, alloc: Allocator) void { + alloc.free(self.sid); + } + }; + pub const Msg = struct { + subject: []const u8, + sid: []const u8, + reply_to: ?[]const u8, + payload: Payload, + + pub fn deinit(self: Msg, alloc: Allocator) void { + alloc.free(self.subject); + alloc.free(self.sid); + if (self.reply_to) |r| alloc.free(r); + self.payload.deinit(alloc); + } + + pub fn dupe(self: Msg, alloc: Allocator) !Msg { + var res: Msg = undefined; + res.subject = try alloc.dupe(u8, self.subject); + errdefer alloc.free(res.subject); + res.sid = try alloc.dupe(u8, self.sid); + errdefer alloc.free(res.sid); + res.reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null; + errdefer if (res.reply_to) |r| alloc.free(r); + res.payload = try self.payload.dupe(alloc); + errdefer alloc.free(res.payload); + return res; + } + }; +}; diff --git a/src/main.zig b/src/main.zig index a413fba..7784df8 100644 --- a/src/main.zig +++ b/src/main.zig @@ -25,7 +25,7 @@ pub fn main() !void { 'a', std.fmt.comptimePrint( "Address to bind to (default: {s})", - .{std.meta.fieldInfo(zits.Server.Message.ServerInfo, .host).defaultValue().?}, + .{std.meta.fieldInfo(zits.Server.parse.Message.ServerInfo, .host).defaultValue().?}, ), ), yazap.Arg.singleValueOption( @@ -33,7 +33,7 @@ pub fn main() !void { 'p', std.fmt.comptimePrint( "Port to listen on (default: {d})", - .{std.meta.fieldInfo(zits.Server.Message.ServerInfo, .port).defaultValue().?}, + .{std.meta.fieldInfo(zits.Server.parse.Message.ServerInfo, .port).defaultValue().?}, ), ), yazap.Arg.singleValueOption( @@ -54,7 +54,7 @@ pub fn main() !void { const matches = try app.parseProcess(io); if (matches.subcommandMatches("serve")) |serve_matches| { - var info: zits.Server.Message.ServerInfo = .{ + var info: zits.Server.parse.Message.ServerInfo = .{ .server_id = zits.Server.default_id, .server_name = zits.Server.default_name, .version = "zits-master", diff --git a/src/subcommand/server.zig b/src/subcommand/server.zig index 1aaf572..02a96e5 100644 --- a/src/subcommand/server.zig +++ b/src/subcommand/server.zig @@ -10,7 +10,7 @@ const Threaded = Io.Threaded; const builtin = @import("builtin"); const zits = @import("zits"); -const Message = zits.Server.Message; +const Message = zits.Server.parse.Message; const ServerInfo = Message.ServerInfo; const Server = zits.Server; |
