diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-01 19:38:36 +0000 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-01 20:26:41 +0000 |
| commit | fc68749669a3bd9e0530d5958b100262537f142a (patch) | |
| tree | 64d170e2b8f6e0190caf1ed1e2749cc043eed024 /src/server/message_parser.zig | |
| parent | 987dc492a6ad8e3b4bd2f369d676a2d588342543 (diff) | |
gracefully exit
simplify code
clean up dead code
Diffstat (limited to 'src/server/message_parser.zig')
| -rw-r--r-- | src/server/message_parser.zig | 137 |
1 files changed, 37 insertions, 100 deletions
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index b156dd6..6561c4b 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -43,7 +43,7 @@ pub const Message = union(MessageType) { ping, pong, @"+ok": void, - @"-err": void, + @"-err": []const u8, pub const ServerInfo = struct { /// The unique identifier of the NATS server. server_id: []const u8, @@ -416,34 +416,6 @@ fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 { return subject_list.toOwnedSlice(alloc); } -fn parseJsonMessage(T: type, alloc: std.mem.Allocator, in: *std.Io.Reader) !T { - var reader: std.json.Reader = .init(alloc, in); - return std.json.innerParse(T, alloc, &reader, .{ - .max_value_len = std.json.default_max_value_len, - .allocate = .alloc_always, - }); -} - -fn parsePub(in: *std.Io.Reader) !Message.Pub { - const subject = (try in.takeDelimiter(' ')) orelse return error.EndOfStream; - const next = (try in.takeDelimiter('\r')) orelse return error.EndOfStream; - var reply_to: ?[]const u8 = null; - const bytes = std.fmt.parseUnsigned(usize, next, 10) catch blk: { - reply_to = next; - break :blk try std.fmt.parseUnsigned(usize, (try in.takeDelimiter(' ')) orelse return error.EndOfStream, 10); - }; - - in.toss(1); // LF - const payload = try in.take(bytes); - - return .{ - .subject = subject, - .reply_to = reply_to, - .bytes = bytes, - .payload = payload, - }; -} - inline fn expectStreamBytes(reader: *std.Io.Reader, expected: []const u8) !void { if (!std.mem.eql(u8, try reader.take(expected.len), expected)) { @branchHint(.unlikely); @@ -451,74 +423,39 @@ inline fn expectStreamBytes(reader: *std.Io.Reader, expected: []const u8) !void } } -// try returning error in debug mode, only null in release? -// pub fn parseNextMessage(alloc: std.mem.Allocator, in: *std.Io.Reader) ?Message { -// const message_type: MessageType = blk: { -// var word: ["CONNECT".len]u8 = undefined; -// var len: usize = 0; -// for (&word, 0..) |*b, i| { -// const byte = in.takeByte() catch return null; -// if (std.ascii.isUpper(byte)) { -// b.* = byte; -// len = i + 1; -// } else break; -// } - -// break :blk Message.parse(word[0..len]) orelse return null; -// }; - -// // defer in.toss(2); // CRLF -// return switch (message_type) { -// .connect => blk: { -// const value: ?Message = .{ .connect = parseJsonMessage(Message.Connect, alloc, in) catch return null }; - -// break :blk value; -// }, -// .@"pub" => .{ .@"pub" = parsePub(in) catch |err| std.debug.panic("{}", .{err}) }, -// .ping => .ping, -// else => null, -// }; -// } - -// test parseNextMessage { -// const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\nPUB hi 3\r\nfoo\r\n"; -// var reader: std.Io.Reader = .fixed(input); -// var arena: std.heap.ArenaAllocator = .init(std.testing.allocator); -// defer arena.deinit(); -// const gpa = arena.allocator(); - -// { -// const msg: Message = try Message.next(gpa, &reader); -// const expected: Message = .{ .connect = .{ -// .connect = .{ -// .verbose = false, -// .pedantic = false, -// .tls_required = false, -// .name = try gpa.dupe(u8, "NATS CLI Version v0.2.4"), -// .lang = try gpa.dupe(u8, "go"), -// .version = try gpa.dupe(u8, "1.43.0"), -// .protocol = 1, -// .echo = true, -// .headers = true, -// .no_responders = true, -// }, -// .allocator = arena, -// } }; - -// try std.testing.expectEqualDeep(expected, msg); -// } -// { -// const msg: Message = try Message.next(gpa, &reader); -// const expected: Message = .{ .@"pub" = .{ -// .subject = "hi", -// .payload = "foo", -// } }; -// try std.testing.expectEqualDeep(expected, msg); -// } -// } - -// test "MessageType.parse performance" { -// // Measure perf for parseMemEql -// // Measure perf for parseStaticStringMap -// // assert parse = fastest perf -// } +test "parsing a stream" { + const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":fa" ++ + "lse,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43" ++ + ".0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r" ++ + "\nPUB hi 3\r\nfoo\r\n"; + var reader: std.Io.Reader = .fixed(input); + var arena: std.heap.ArenaAllocator = .init(std.testing.allocator); + defer arena.deinit(); + const gpa = arena.allocator(); + + { + const msg: Message = try Message.next(gpa, &reader); + const expected: Message = .{ .connect = .{ + .verbose = false, + .pedantic = false, + .tls_required = false, + .name = "NATS CLI Version v0.2.4", + .lang = "go", + .version = "1.43.0", + .protocol = 1, + .echo = true, + .headers = true, + .no_responders = true, + } }; + + try std.testing.expectEqualDeep(expected, msg); + } + { + const msg: Message = try Message.next(gpa, &reader); + const expected: Message = .{ .@"pub" = .{ + .subject = "hi", + .payload = "foo", + } }; + try std.testing.expectEqualDeep(expected, msg); + } +} |
