summaryrefslogtreecommitdiff
path: root/src/server/message_parser.zig
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-01 19:38:36 +0000
committerRobby Zambito <contact@robbyzambito.me>2026-01-01 20:26:41 +0000
commitfc68749669a3bd9e0530d5958b100262537f142a (patch)
tree64d170e2b8f6e0190caf1ed1e2749cc043eed024 /src/server/message_parser.zig
parent987dc492a6ad8e3b4bd2f369d676a2d588342543 (diff)
gracefully exit
simplify code clean up dead code
Diffstat (limited to 'src/server/message_parser.zig')
-rw-r--r--src/server/message_parser.zig137
1 files changed, 37 insertions, 100 deletions
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig
index b156dd6..6561c4b 100644
--- a/src/server/message_parser.zig
+++ b/src/server/message_parser.zig
@@ -43,7 +43,7 @@ pub const Message = union(MessageType) {
ping,
pong,
@"+ok": void,
- @"-err": void,
+ @"-err": []const u8,
pub const ServerInfo = struct {
/// The unique identifier of the NATS server.
server_id: []const u8,
@@ -416,34 +416,6 @@ fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 {
return subject_list.toOwnedSlice(alloc);
}
-fn parseJsonMessage(T: type, alloc: std.mem.Allocator, in: *std.Io.Reader) !T {
- var reader: std.json.Reader = .init(alloc, in);
- return std.json.innerParse(T, alloc, &reader, .{
- .max_value_len = std.json.default_max_value_len,
- .allocate = .alloc_always,
- });
-}
-
-fn parsePub(in: *std.Io.Reader) !Message.Pub {
- const subject = (try in.takeDelimiter(' ')) orelse return error.EndOfStream;
- const next = (try in.takeDelimiter('\r')) orelse return error.EndOfStream;
- var reply_to: ?[]const u8 = null;
- const bytes = std.fmt.parseUnsigned(usize, next, 10) catch blk: {
- reply_to = next;
- break :blk try std.fmt.parseUnsigned(usize, (try in.takeDelimiter(' ')) orelse return error.EndOfStream, 10);
- };
-
- in.toss(1); // LF
- const payload = try in.take(bytes);
-
- return .{
- .subject = subject,
- .reply_to = reply_to,
- .bytes = bytes,
- .payload = payload,
- };
-}
-
inline fn expectStreamBytes(reader: *std.Io.Reader, expected: []const u8) !void {
if (!std.mem.eql(u8, try reader.take(expected.len), expected)) {
@branchHint(.unlikely);
@@ -451,74 +423,39 @@ inline fn expectStreamBytes(reader: *std.Io.Reader, expected: []const u8) !void
}
}
-// try returning error in debug mode, only null in release?
-// pub fn parseNextMessage(alloc: std.mem.Allocator, in: *std.Io.Reader) ?Message {
-// const message_type: MessageType = blk: {
-// var word: ["CONNECT".len]u8 = undefined;
-// var len: usize = 0;
-// for (&word, 0..) |*b, i| {
-// const byte = in.takeByte() catch return null;
-// if (std.ascii.isUpper(byte)) {
-// b.* = byte;
-// len = i + 1;
-// } else break;
-// }
-
-// break :blk Message.parse(word[0..len]) orelse return null;
-// };
-
-// // defer in.toss(2); // CRLF
-// return switch (message_type) {
-// .connect => blk: {
-// const value: ?Message = .{ .connect = parseJsonMessage(Message.Connect, alloc, in) catch return null };
-
-// break :blk value;
-// },
-// .@"pub" => .{ .@"pub" = parsePub(in) catch |err| std.debug.panic("{}", .{err}) },
-// .ping => .ping,
-// else => null,
-// };
-// }
-
-// test parseNextMessage {
-// const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\nPUB hi 3\r\nfoo\r\n";
-// var reader: std.Io.Reader = .fixed(input);
-// var arena: std.heap.ArenaAllocator = .init(std.testing.allocator);
-// defer arena.deinit();
-// const gpa = arena.allocator();
-
-// {
-// const msg: Message = try Message.next(gpa, &reader);
-// const expected: Message = .{ .connect = .{
-// .connect = .{
-// .verbose = false,
-// .pedantic = false,
-// .tls_required = false,
-// .name = try gpa.dupe(u8, "NATS CLI Version v0.2.4"),
-// .lang = try gpa.dupe(u8, "go"),
-// .version = try gpa.dupe(u8, "1.43.0"),
-// .protocol = 1,
-// .echo = true,
-// .headers = true,
-// .no_responders = true,
-// },
-// .allocator = arena,
-// } };
-
-// try std.testing.expectEqualDeep(expected, msg);
-// }
-// {
-// const msg: Message = try Message.next(gpa, &reader);
-// const expected: Message = .{ .@"pub" = .{
-// .subject = "hi",
-// .payload = "foo",
-// } };
-// try std.testing.expectEqualDeep(expected, msg);
-// }
-// }
-
-// test "MessageType.parse performance" {
-// // Measure perf for parseMemEql
-// // Measure perf for parseStaticStringMap
-// // assert parse = fastest perf
-// }
+test "parsing a stream" {
+ const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":fa" ++
+ "lse,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43" ++
+ ".0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r" ++
+ "\nPUB hi 3\r\nfoo\r\n";
+ var reader: std.Io.Reader = .fixed(input);
+ var arena: std.heap.ArenaAllocator = .init(std.testing.allocator);
+ defer arena.deinit();
+ const gpa = arena.allocator();
+
+ {
+ const msg: Message = try Message.next(gpa, &reader);
+ const expected: Message = .{ .connect = .{
+ .verbose = false,
+ .pedantic = false,
+ .tls_required = false,
+ .name = "NATS CLI Version v0.2.4",
+ .lang = "go",
+ .version = "1.43.0",
+ .protocol = 1,
+ .echo = true,
+ .headers = true,
+ .no_responders = true,
+ } };
+
+ try std.testing.expectEqualDeep(expected, msg);
+ }
+ {
+ const msg: Message = try Message.next(gpa, &reader);
+ const expected: Message = .{ .@"pub" = .{
+ .subject = "hi",
+ .payload = "foo",
+ } };
+ try std.testing.expectEqualDeep(expected, msg);
+ }
+}