diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-06 21:56:39 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-06 21:59:41 -0500 |
| commit | 48969283527e0db6b71893b2b3f3bbeb21e522db (patch) | |
| tree | dee322b777c79ef1cf7133bb65f226488b2cdab7 /src/Server/parse | |
| parent | cc036318387cc5c44f2a0a2a1e28d067f3e6bdf6 (diff) | |
Major restructuring
This makes things much easier to use as a library
Diffstat (limited to 'src/Server/parse')
| -rw-r--r-- | src/Server/parse/Payload.zig | 51 | ||||
| -rw-r--r-- | src/Server/parse/message.zig | 208 |
2 files changed, 259 insertions, 0 deletions
diff --git a/src/Server/parse/Payload.zig b/src/Server/parse/Payload.zig new file mode 100644 index 0000000..b512a81 --- /dev/null +++ b/src/Server/parse/Payload.zig @@ -0,0 +1,51 @@ +const std = @import("std"); +const Reader = std.Io.Reader; +const Writer = std.Io.Writer; +const Allocator = std.mem.Allocator; + +const Payload = @This(); + +len: u32, +short: [128]u8, +long: ?[]u8, + +pub fn read(alloc: Allocator, in: *Reader, bytes: usize) !Payload { + var res: Payload = .{ + .len = @intCast(bytes), + .short = undefined, + .long = null, + }; + + try in.readSliceAll(res.short[0..@min(bytes, res.short.len)]); + if (bytes > res.short.len) { + const long = try alloc.alloc(u8, bytes - res.short.len); + errdefer alloc.free(long); + try in.readSliceAll(long); + res.long = long; + } + return res; +} + +pub fn write(self: Payload, out: *Writer) !void { + std.debug.assert(out.buffer.len >= self.short.len); + std.debug.assert(self.len <= self.short.len or self.long != null); + try out.writeAll(self.short[0..@min(self.len, self.short.len)]); + if (self.long) |l| { + try out.writeAll(l); + } +} + +pub fn deinit(self: Payload, alloc: Allocator) void { + if (self.long) |l| { + alloc.free(l); + } +} + +pub fn dupe(self: Payload, alloc: Allocator) !Payload { + var res = self; + if (self.long) |l| { + res.long = try alloc.dupe(u8, l); + } + errdefer if (res.long) |l| alloc.free(l); + return res; +} diff --git a/src/Server/parse/message.zig b/src/Server/parse/message.zig new file mode 100644 index 0000000..c8a308f --- /dev/null +++ b/src/Server/parse/message.zig @@ -0,0 +1,208 @@ +const std = @import("std"); +const ArrayList = std.ArrayList; +const Allocator = std.mem.Allocator; +const Reader = std.Io.Reader; + +const Payload = @import("Payload.zig"); + +pub const Control = @typeInfo(Message).@"union".tag_type.?; + +pub const Message = union(enum) { + INFO: ServerInfo, + CONNECT: Connect, + PUB: Pub, + HPUB: HPub, + SUB: Sub, + UNSUB: Unsub, + MSG: Msg, + HMSG: HMsg, + PING, + PONG, + @"+OK": void, + @"-ERR": []const u8, + pub const ServerInfo = struct { + /// The unique identifier of the NATS server. + server_id: []const u8, + /// The name of the NATS server. + server_name: []const u8, + /// The version of NATS. + version: []const u8, + /// The version of golang the NATS server was built with. + go: []const u8 = "0.0.0", + /// The IP address used to start the NATS server, + /// by default this will be 0.0.0.0 and can be + /// configured with -client_advertise host:port. + host: []const u8 = "0.0.0.0", + /// The port number the NATS server is configured + /// to listen on. + port: u16 = 4222, + /// Whether the server supports headers. + headers: bool = false, + /// Maximum payload size, in bytes, that the server + /// will accept from the client. + max_payload: u64, + /// An integer indicating the protocol version of + /// the server. The server version 1.2.0 sets this + /// to 1 to indicate that it supports the "Echo" + /// feature. + proto: u32 = 1, + }; + pub const Connect = struct { + verbose: bool = false, + pedantic: bool = false, + tls_required: bool = false, + auth_token: ?[]const u8 = null, + user: ?[]const u8 = null, + pass: ?[]const u8 = null, + name: ?[]const u8 = null, + lang: []const u8, + version: []const u8, + protocol: u32, + echo: ?bool = null, + sig: ?[]const u8 = null, + jwt: ?[]const u8 = null, + no_responders: ?bool = null, + headers: ?bool = null, + nkey: ?[]const u8 = null, + + pub fn deinit(self: Connect, alloc: Allocator) void { + if (self.auth_token) |a| alloc.free(a); + if (self.user) |u| alloc.free(u); + if (self.pass) |p| alloc.free(p); + if (self.name) |n| alloc.free(n); + alloc.free(self.lang); + alloc.free(self.version); + if (self.sig) |s| alloc.free(s); + if (self.jwt) |j| alloc.free(j); + if (self.nkey) |n| alloc.free(n); + } + + pub fn dupe(self: Connect, alloc: Allocator) !Connect { + var res = self; + res.auth_token = if (self.auth_token) |a| try alloc.dupe(u8, a) else null; + errdefer if (res.auth_token) |a| alloc.free(a); + res.user = if (self.user) |u| try alloc.dupe(u8, u) else null; + errdefer if (res.user) |u| alloc.free(u); + res.pass = if (self.pass) |p| try alloc.dupe(u8, p) else null; + errdefer if (res.pass) |p| alloc.free(p); + res.name = if (self.name) |n| try alloc.dupe(u8, n) else null; + errdefer if (res.name) |n| alloc.free(n); + res.lang = try alloc.dupe(u8, self.lang); + errdefer alloc.free(res.lang); + res.version = try alloc.dupe(u8, self.version); + errdefer alloc.free(res.version); + res.sig = if (self.sig) |s| try alloc.dupe(u8, s) else null; + errdefer if (res.sig) |s| alloc.free(s); + res.jwt = if (self.jwt) |j| try alloc.dupe(u8, j) else null; + errdefer if (res.jwt) |j| alloc.free(j); + res.nkey = if (self.nkey) |n| try alloc.dupe(u8, n) else null; + errdefer if (res.nkey) |n| alloc.free(n); + return res; + } + }; + pub const Pub = struct { + /// The destination subject to publish to. + subject: []const u8, + /// The reply subject that subscribers can use to send a response back to the publisher/requestor. + reply_to: ?[]const u8 = null, + /// The message payload data. + payload: Payload, + + pub fn deinit(self: Pub, alloc: Allocator) void { + alloc.free(self.subject); + self.payload.deinit(alloc); + if (self.reply_to) |r| alloc.free(r); + } + + pub fn toMsg(self: Pub, alloc: Allocator, sid: []const u8) !Msg { + const res: Msg = .{ + .subject = self.subject, + .sid = sid, + .reply_to = self.reply_to, + .payload = self.payload, + }; + return res.dupe(alloc); + } + }; + pub const HPub = struct { + header_bytes: usize, + @"pub": Pub, + + pub fn deinit(self: HPub, alloc: Allocator) void { + self.@"pub".deinit(alloc); + } + + pub fn toHMsg(self: HPub, alloc: Allocator, sid: []const u8) !HMsg { + return .{ + .header_bytes = self.header_bytes, + .msg = try self.@"pub".toMsg(alloc, sid), + }; + } + }; + + pub const HMsg = struct { + header_bytes: usize, + msg: Msg, + + pub fn deinit(self: HMsg, alloc: Allocator) void { + self.msg.deinit(alloc); + } + + pub fn dupe(self: HMsg, alloc: Allocator) !HMsg { + var res = self; + res.msg = try self.msg.dupe(alloc); + errdefer alloc.free(res.msg); + return res; + } + }; + pub const Sub = struct { + /// The subject name to subscribe to. + subject: []const u8, + /// If specified, the subscriber will join this queue group. + queue_group: ?[]const u8, + /// A unique alphanumeric subscription ID, generated by the client. + sid: []const u8, + + pub fn deinit(self: Sub, alloc: Allocator) void { + alloc.free(self.subject); + alloc.free(self.sid); + if (self.queue_group) |q| alloc.free(q); + } + }; + pub const Unsub = struct { + /// The unique alphanumeric subscription ID of the subject to unsubscribe from. + sid: []const u8, + /// A number of messages to wait for before automatically unsubscribing. + max_msgs: ?usize = null, + + pub fn deinit(self: Unsub, alloc: Allocator) void { + alloc.free(self.sid); + } + }; + pub const Msg = struct { + subject: []const u8, + sid: []const u8, + reply_to: ?[]const u8, + payload: Payload, + + pub fn deinit(self: Msg, alloc: Allocator) void { + alloc.free(self.subject); + alloc.free(self.sid); + if (self.reply_to) |r| alloc.free(r); + self.payload.deinit(alloc); + } + + pub fn dupe(self: Msg, alloc: Allocator) !Msg { + var res: Msg = undefined; + res.subject = try alloc.dupe(u8, self.subject); + errdefer alloc.free(res.subject); + res.sid = try alloc.dupe(u8, self.sid); + errdefer alloc.free(res.sid); + res.reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null; + errdefer if (res.reply_to) |r| alloc.free(r); + res.payload = try self.payload.dupe(alloc); + errdefer alloc.free(res.payload); + return res; + } + }; +}; |
