diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2025-11-18 13:49:39 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2025-11-19 08:44:40 -0500 |
| commit | 51008cd7e17d7e30b43107140781a72f10a58830 (patch) | |
| tree | bf0f514299ff4f6b967ddc16c109cbb3bca61f1f /src/server/message_parser.zig | |
| parent | d6d177aede7f314b9120b46c038493da51763815 (diff) | |
Diffstat (limited to 'src/server/message_parser.zig')
| -rw-r--r-- | src/server/message_parser.zig | 168 |
1 files changed, 168 insertions, 0 deletions
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig new file mode 100644 index 0000000..c8359d5 --- /dev/null +++ b/src/server/message_parser.zig @@ -0,0 +1,168 @@ +const std = @import("std"); + +pub const MessageType = enum { + info, + connect, + @"pub", + hpub, + sub, + unsub, + msg, + hmsg, + ping, + pong, + @"+ok", + @"-err", + + fn parseMemEql(input: []const u8) ?MessageType { + // if (std.mem.eql(u8, "INFO", input)) return .info; + if (std.mem.eql(u8, "CONNECT", input)) return .connect; + if (std.mem.eql(u8, "PUB", input)) return .@"pub"; + if (std.mem.eql(u8, "HPUB", input)) return .hpub; + if (std.mem.eql(u8, "SUB", input)) return .sub; + if (std.mem.eql(u8, "UNSUB", input)) return .unsub; + // if (std.mem.eql(u8, "MSG", input)) return .msg; + // if (std.mem.eql(u8, "HMSG", input)) return .hmsg; + if (std.mem.eql(u8, "PING", input)) return .ping; + if (std.mem.eql(u8, "PONG", input)) return .pong; + // if (std.mem.eql(u8, "@"+OK"", input)) return .@"+ok"; + // if (std.mem.eql(u8, "@"-ERR"", input)) return .@"-err"; + return error.InvalidMessageType; + } + + const client_types = std.StaticStringMap(MessageType).initComptime( + .{ + // {"INFO", .info}, + .{ "CONNECT", .connect }, + .{ "PUB", .@"pub" }, + .{ "HPUB", .hpub }, + .{ "SUB", .sub }, + .{ "UNSUB", .unsub }, + // {"MSG", .msg}, + // {"HMSG", .hmsg}, + .{ "PING", .ping }, + .{ "PONG", .pong }, + // {"+OK", .@"+ok"}, + // {"-ERR", .@"-err"}, + }, + ); + fn parseStaticStringMap(input: []const u8) ?MessageType { + std.debug.print("input: '{s}'\n", .{input}); + return client_types.get(input); + } + + pub const parse = parseStaticStringMap; +}; + +const Message = union(MessageType) { + info: void, + + connect: Connect, + @"pub": Pub, + hpub: void, + sub: void, + unsub: void, + msg: void, + hmsg: void, + ping, + pong, + @"+ok": void, + @"-err": void, + const Connect = struct { + verbose: bool = false, + pedantic: bool = false, + tls_required: bool = false, + auth_token: ?[]const u8 = null, + user: ?[]const u8 = null, + pass: ?[]const u8 = null, + name: ?[]const u8 = null, + lang: []const u8, + version: []const u8, + protocol: u32, + echo: ?bool = null, + sig: ?[]const u8 = null, + jwt: ?[]const u8 = null, + no_responders: ?bool = null, + headers: ?bool = null, + nkey: ?[]const u8 = null, + }; + const Pub = struct { + subject: []const u8, + reply_to: ?[]const u8, + bytes: usize, + payload: []const u8, + }; +}; + +fn parseJsonMessage(T: type, alloc: std.mem.Allocator, in: *std.Io.Reader) !T { + var json_reader: std.json.Reader = .init(alloc, in); + defer json_reader.deinit(); + + return std.json.parseFromTokenSourceLeaky(T, alloc, &json_reader, .{}); +} + +fn parsePub(in: *std.Io.Reader) !Message.Pub { + const subject = (try in.takeDelimiter(' ')) orelse return error.EndOfStream; + const next = (try in.takeDelimiter(' ')) orelse return error.EndOfStream; + var reply_to: ?[]const u8 = null; + const bytes = std.fmt.parseUnsigned(usize, next, 10) catch blk: { + reply_to = next; + break :blk try std.fmt.parseUnsigned(usize, (try in.takeDelimiter(' ')) orelse return error.EndOfStream, 10); + }; + // in.toss(2); // CRLF + const payload = try in.take(bytes); + + return .{ + .subject = subject, + .reply_to = reply_to, + .bytes = bytes, + .payload = payload, + }; +} + +// try returning error in debug mode, only null in release? +pub fn parseNextMessage(alloc: std.mem.Allocator, in: *std.Io.Reader) ?Message { + const message_type: MessageType = blk: { + const word: []const u8 = (in.takeDelimiter(' ') catch return null) orelse return null; + std.debug.print("word: {s}\n", .{word}); + break :blk MessageType.parse(word) orelse return null; + }; + // defer in.toss(2); // CRLF + return switch (message_type) { + .connect => .{ .connect = parseJsonMessage(Message.Connect, alloc, in) catch return null }, + .@"pub" => .{ .@"pub" = parsePub(in) catch return null }, + .ping => .{ .ping = {} }, + else => null, + }; +} + +test parseNextMessage { + const input = + \\CONNECT {"verbose":false,"pedantic":false,"tls_required":false,"name":"NATS CLI Version v0.2.4","lang":"go","version":"1.43.0","protocol":1,"echo":true,"headers":true,"no_responders":true} + ; + var reader: std.Io.Reader = .fixed(input); + var arena: std.heap.ArenaAllocator = .init(std.testing.allocator); + defer arena.deinit(); + const gpa = arena.allocator(); + const msg: ?Message = parseNextMessage(gpa, &reader); + const expected: ?Message = .{ .connect = .{ + .verbose = false, + .pedantic = false, + .tls_required = false, + .name = "NATS CLI Version v0.2.4", + .lang = "go", + .version = "1.43.0", + .protocol = 1, + .echo = true, + .headers = true, + .no_responders = true, + } }; + try std.testing.expect(msg != null); + try std.testing.expectEqualDeep(msg, expected); +} + +test "MessageType.parse performance" { + // Measure perf for parseMemEql + // Measure perf for parseStaticStringMap + // assert parse = fastest perf +} |
