diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2025-12-02 19:53:03 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2025-12-02 19:53:03 -0500 |
| commit | aceb671ddc3e4ff3ce15c2e9814538e4f21d7d12 (patch) | |
| tree | b9624d8c3ffe569735d1374ec9ce42b68258a5ca /src/server/message_parser.zig | |
| parent | 41f4ee721b138304294b185185dc6fc51549c5b9 (diff) | |
Diffstat (limited to 'src/server/message_parser.zig')
| -rw-r--r-- | src/server/message_parser.zig | 154 |
1 files changed, 116 insertions, 38 deletions
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index 75e13d2..1a6b213 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -32,20 +32,48 @@ pub const MessageType = enum { }; pub const Message = union(enum) { - info: void, - + /// TODO: REMOVE + not_real: void, + info: ServerInfo, connect: Connect, @"pub": Pub, hpub: void, - sub: void, + sub: Sub, unsub: void, - msg: void, + msg: Msg, hmsg: void, ping, pong, @"+ok": void, @"-err": void, - const Connect = struct { + pub const ServerInfo = struct { + /// The unique identifier of the NATS server. + server_id: []const u8, + /// The name of the NATS server. + server_name: []const u8, + /// The version of NATS. + version: []const u8, + /// The version of golang the NATS server was built with. + go: []const u8 = "0.0.0", + /// The IP address used to start the NATS server, + /// by default this will be 0.0.0.0 and can be + /// configured with -client_advertise host:port. + host: []const u8 = "0.0.0.0", + /// The port number the NATS server is configured + /// to listen on. + port: u16 = 4222, + /// Whether the server supports headers. + headers: bool = false, + /// Maximum payload size, in bytes, that the server + /// will accept from the client. + max_payload: u64, + /// An integer indicating the protocol version of + /// the server. The server version 1.2.0 sets this + /// to 1 to indicate that it supports the "Echo" + /// feature. + proto: u32 = 1, + }; + pub const Connect = struct { verbose: bool = false, pedantic: bool = false, tls_required: bool = false, @@ -63,9 +91,26 @@ pub const Message = union(enum) { headers: ?bool = null, nkey: ?[]const u8 = null, }; - const Pub = struct { + pub const Pub = struct { + /// The destination subject to publish to. subject: []const u8, + /// The reply subject that subscribers can use to send a response back to the publisher/requestor. reply_to: ?[]const u8 = null, + /// The message payload data. + payload: []const u8, + }; + pub const Sub = struct { + /// The subject name to subscribe to. + subject: []const u8, + /// If specified, the subscriber will join this queue group. + queue_group: ?[]const u8, + /// A unique alphanumeric subscription ID, generated by the client. + sid: []const u8, + }; + pub const Msg = struct { + subject: []const u8, + sid: []const u8, + reply_to: ?[]const u8, payload: []const u8, }; @@ -132,32 +177,7 @@ pub const Message = union(enum) { try in.discardAll(1); // throw away space // Parse subject - const subject: []const u8 = blk: { - // TODO: should be ARENA allocator - var subject_list: std.ArrayList(u8) = try .initCapacity(alloc, 1024); - - // Handle the first character - { - const byte = try in.takeByte(); - if (byte == '.' or std.ascii.isWhitespace(byte)) - return error.InvalidSubject; - - try subject_list.append(alloc, byte); - } - - while (in.takeByte() catch null) |byte| { - if (std.ascii.isWhitespace(byte)) break; - if (std.ascii.isAscii(byte)) { - if (byte == '.') { - const next_byte = try in.peekByte(); - if (next_byte == '.' or std.ascii.isWhitespace(next_byte)) - return error.InvalidSubject; - } - try subject_list.append(alloc, byte); - } - } else return error.InvalidStream; - break :blk subject_list.items; - }; + const subject: []const u8 = try readSubject(alloc, in); // Parse byte count const byte_count = blk: { @@ -186,12 +206,12 @@ pub const Message = union(enum) { break :blk bytes; }; - std.debug.print("buffer: '{s}'\n", .{in.buffered()}); - // return std.debug.panic("not implemented", .{}); - return .{ .@"pub" = .{ - .subject = subject, - .payload = payload, - } }; + return .{ + .@"pub" = .{ + .subject = subject, + .payload = payload, + }, + }; }, .ping => { std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n")); @@ -201,11 +221,69 @@ pub const Message = union(enum) { std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n")); return .pong; }, + .sub => { + std.debug.assert(std.ascii.isWhitespace(try in.takeByte())); + const subject = try readSubject(alloc, in); + const second = blk: { + // Drop whitespace + while (in.peekByte() catch null) |byte| { + if (std.ascii.isWhitespace(byte)) { + in.toss(1); + } else break; + } else return error.InvalidStream; + + var acc: std.ArrayList(u8) = try .initCapacity(alloc, 32); + while (in.takeByte() catch null) |byte| { + if (std.ascii.isWhitespace(byte)) break; + try acc.append(alloc, byte); + } else return error.InvalidStream; + + break :blk try acc.toOwnedSlice(alloc); + }; + const queue_group = if ((try in.peekByte()) != '\r') second else null; + const sid = if (queue_group) |_| try in.takeDelimiterExclusive('\r') else second; + std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n")); + return .{ + .sub = .{ + .subject = subject, + .queue_group = queue_group, + .sid = sid, + }, + }; + }, else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}), } } }; +fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 { + // TODO: should be ARENA allocator + var subject_list: std.ArrayList(u8) = try .initCapacity(alloc, 1024); + + // Handle the first character + { + const byte = try in.takeByte(); + std.debug.assert(!std.ascii.isWhitespace(byte)); + if (byte == '.') + return error.InvalidSubject; + + try subject_list.append(alloc, byte); + } + + while (in.takeByte() catch null) |byte| { + if (std.ascii.isWhitespace(byte)) break; + if (std.ascii.isAscii(byte)) { + if (byte == '.') { + const next_byte = try in.peekByte(); + if (next_byte == '.' or std.ascii.isWhitespace(next_byte)) + return error.InvalidSubject; + } + try subject_list.append(alloc, byte); + } + } else return error.InvalidStream; + return subject_list.toOwnedSlice(alloc); +} + fn parseJsonMessage(T: type, alloc: std.mem.Allocator, in: *std.Io.Reader) !T { var reader: std.json.Reader = .init(alloc, in); return std.json.innerParse(T, alloc, &reader, .{ |
