diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-03 05:53:23 +0000 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-03 05:54:14 +0000 |
| commit | dcd09e2f10a95d334a598ea8853c4f0e326fcfb2 (patch) | |
| tree | 346fcbdb54d459ffa4bf53eb2cebbfabd2148169 /src/server/message_parser.zig | |
| parent | bd9829f6842f0c989389aa4ce9784ab6e3cb4ee5 (diff) | |
cleanup imports
Diffstat (limited to 'src/server/message_parser.zig')
| -rw-r--r-- | src/server/message_parser.zig | 336 |
1 files changed, 181 insertions, 155 deletions
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index 54149cb..1e7527d 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -1,4 +1,21 @@ const std = @import("std"); +const Allocator = std.mem.Allocator; +const ArenaAllocator = std.heap.ArenaAllocator; +const ArrayList = std.ArrayList; +const StaticStringMap = std.StaticStringMap; + +const Io = std.Io; +const AllocatingWriter = Io.Writer.Allocating; +const Reader = Io.Reader; + +const ascii = std.ascii; +const isDigit = std.ascii.isDigit; +const isUpper = std.ascii.isUpper; +const isWhitespace = std.ascii.isWhitespace; + +const parseUnsigned = std.fmt.parseUnsigned; + +const log = std.log; pub const MessageType = @typeInfo(Message).@"union".tag_type.?; @@ -60,7 +77,7 @@ pub const Message = union(enum) { headers: ?bool = null, nkey: ?[]const u8 = null, - pub fn deinit(self: Connect, alloc: std.mem.Allocator) void { + pub fn deinit(self: Connect, alloc: Allocator) void { if (self.auth_token) |a| alloc.free(a); if (self.user) |u| alloc.free(u); if (self.pass) |p| alloc.free(p); @@ -72,7 +89,7 @@ pub const Message = union(enum) { if (self.nkey) |n| alloc.free(n); } - pub fn dupe(self: Connect, alloc: std.mem.Allocator) !Connect { + pub fn dupe(self: Connect, alloc: Allocator) !Connect { var res = self; res.auth_token = if (self.auth_token) |a| try alloc.dupe(u8, a) else null; errdefer if (res.auth_token) |a| alloc.free(a); @@ -103,13 +120,13 @@ pub const Message = union(enum) { /// The message payload data. payload: []const u8, - pub fn deinit(self: Pub, alloc: std.mem.Allocator) void { + pub fn deinit(self: Pub, alloc: Allocator) void { alloc.free(self.subject); alloc.free(self.payload); if (self.reply_to) |r| alloc.free(r); } - pub fn toMsg(self: Pub, alloc: std.mem.Allocator, sid: []const u8) !Msg { + pub fn toMsg(self: Pub, alloc: Allocator, sid: []const u8) !Msg { const res: Msg = .{ .subject = self.subject, .sid = sid, @@ -123,11 +140,11 @@ pub const Message = union(enum) { header_bytes: usize, @"pub": Pub, - pub fn deinit(self: HPub, alloc: std.mem.Allocator) void { + pub fn deinit(self: HPub, alloc: Allocator) void { self.@"pub".deinit(alloc); } - pub fn toHMsg(self: HPub, alloc: std.mem.Allocator, sid: []const u8) !HMsg { + pub fn toHMsg(self: HPub, alloc: Allocator, sid: []const u8) !HMsg { return .{ .header_bytes = self.header_bytes, .msg = try self.@"pub".toMsg(alloc, sid), @@ -139,11 +156,11 @@ pub const Message = union(enum) { header_bytes: usize, msg: Msg, - pub fn deinit(self: HMsg, alloc: std.mem.Allocator) void { + pub fn deinit(self: HMsg, alloc: Allocator) void { self.msg.deinit(alloc); } - pub fn dupe(self: HMsg, alloc: std.mem.Allocator) !HMsg { + pub fn dupe(self: HMsg, alloc: Allocator) !HMsg { var res = self; res.msg = try self.msg.dupe(alloc); return res; @@ -157,7 +174,7 @@ pub const Message = union(enum) { /// A unique alphanumeric subscription ID, generated by the client. sid: []const u8, - pub fn deinit(self: Sub, alloc: std.mem.Allocator) void { + pub fn deinit(self: Sub, alloc: Allocator) void { alloc.free(self.subject); alloc.free(self.sid); if (self.queue_group) |q| alloc.free(q); @@ -169,7 +186,7 @@ pub const Message = union(enum) { /// A number of messages to wait for before automatically unsubscribing. max_msgs: ?usize = null, - pub fn deinit(self: Unsub, alloc: std.mem.Allocator) void { + pub fn deinit(self: Unsub, alloc: Allocator) void { alloc.free(self.sid); } }; @@ -179,14 +196,14 @@ pub const Message = union(enum) { reply_to: ?[]const u8, payload: []const u8, - pub fn deinit(self: Msg, alloc: std.mem.Allocator) void { + pub fn deinit(self: Msg, alloc: Allocator) void { alloc.free(self.subject); alloc.free(self.sid); if (self.reply_to) |r| alloc.free(r); alloc.free(self.payload); } - pub fn dupe(self: Msg, alloc: std.mem.Allocator) !Msg { + pub fn dupe(self: Msg, alloc: Allocator) !Msg { var res: Msg = undefined; res.subject = try alloc.dupe(u8, self.subject); errdefer alloc.free(res.subject); @@ -200,7 +217,7 @@ pub const Message = union(enum) { } }; - const client_types = std.StaticStringMap(MessageType).initComptime( + const client_types = StaticStringMap(MessageType).initComptime( .{ // {"INFO", .info}, .{ "CONNECT", .connect }, @@ -223,8 +240,8 @@ pub const Message = union(enum) { pub const parse = parseStaticStringMap; /// An error should be handled by cleaning up this connection. - pub fn next(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { - var operation_string: std.ArrayList(u8) = blk: { + pub fn next(alloc: Allocator, in: *Reader) !Message { + var operation_string: ArrayList(u8) = blk: { comptime var buf_len = 0; comptime { for (client_types.keys()) |key| { @@ -236,28 +253,28 @@ pub const Message = union(enum) { }; while (in.peekByte()) |byte| { - if (std.ascii.isUpper(byte)) { + if (isUpper(byte)) { try operation_string.appendBounded(byte); in.toss(1); } else break; } else |err| return err; const operation = parse(operation_string.items) orelse { - std.log.err("Invalid operation: '{s}'", .{operation_string.items}); + log.err("Invalid operation: '{s}'", .{operation_string.items}); return error.InvalidOperation; }; - errdefer std.log.err("Failed to parse {s}", .{operation_string.items}); + errdefer log.err("Failed to parse {s}", .{operation_string.items}); switch (operation) { .connect => { // for storing the json string - var connect_string_writer_allocating: std.Io.Writer.Allocating = .init(alloc); + var connect_string_writer_allocating: AllocatingWriter = .init(alloc); defer connect_string_writer_allocating.deinit(); var connect_string_writer = &connect_string_writer_allocating.writer; // for parsing the json string - var connect_arena_allocator: std.heap.ArenaAllocator = .init(alloc); + var connect_arena_allocator: ArenaAllocator = .init(alloc); defer connect_arena_allocator.deinit(); const connect_allocator = connect_arena_allocator.allocator(); @@ -307,7 +324,7 @@ pub const Message = union(enum) { } }; -fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { +fn parseSub(alloc: Allocator, in: *Reader) !Message { try in.discardAll(1); // throw away space const subject = try readSubject(alloc, in, .sub); @@ -319,15 +336,15 @@ fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { in_end, }; - var second: std.ArrayList(u8) = .empty; + var second: ArrayList(u8) = .empty; errdefer second.deinit(alloc); - var third: ?std.ArrayList(u8) = null; + var third: ?ArrayList(u8) = null; errdefer if (third) |*t| t.deinit(alloc); sw: switch (@as(States, .before_second)) { .before_second => { const byte = try in.peekByte(); - if (std.ascii.isWhitespace(byte)) { + if (isWhitespace(byte)) { in.toss(1); continue :sw .before_second; } @@ -335,7 +352,7 @@ fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { }, .in_second => { const byte = try in.peekByte(); - if (!std.ascii.isWhitespace(byte)) { + if (!isWhitespace(byte)) { try second.append(alloc, byte); in.toss(1); continue :sw .in_second; @@ -346,7 +363,7 @@ fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { const byte = try in.peekByte(); if (byte == '\r') { continue :sw .in_end; - } else if (std.ascii.isWhitespace(byte)) { + } else if (isWhitespace(byte)) { in.toss(1); continue :sw .after_second; } @@ -377,11 +394,13 @@ fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { } test parseSub { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; { - var in: std.Io.Reader = .fixed(" foo 1\r\n"); - var res = try parseSub(std.testing.allocator, &in); - defer res.sub.deinit(std.testing.allocator); - try std.testing.expectEqualDeep( + var in: Reader = .fixed(" foo 1\r\n"); + var res = try parseSub(alloc, &in); + defer res.sub.deinit(alloc); + try expectEqualDeep( Message{ .sub = .{ .subject = "foo", @@ -393,10 +412,10 @@ test parseSub { ); } { - var in: std.Io.Reader = .fixed(" foo 1\r\n"); - var res = try parseSub(std.testing.allocator, &in); - defer res.sub.deinit(std.testing.allocator); - try std.testing.expectEqualDeep( + var in: Reader = .fixed(" foo 1\r\n"); + var res = try parseSub(alloc, &in); + defer res.sub.deinit(alloc); + try expectEqualDeep( Message{ .sub = .{ .subject = "foo", @@ -408,10 +427,10 @@ test parseSub { ); } { - var in: std.Io.Reader = .fixed(" foo q 1\r\n"); - var res = try parseSub(std.testing.allocator, &in); - defer res.sub.deinit(std.testing.allocator); - try std.testing.expectEqualDeep( + var in: Reader = .fixed(" foo q 1\r\n"); + var res = try parseSub(alloc, &in); + defer res.sub.deinit(alloc); + try expectEqualDeep( Message{ .sub = .{ .subject = "foo", @@ -423,10 +442,10 @@ test parseSub { ); } { - var in: std.Io.Reader = .fixed(" 1 q 1\r\n"); - var res = try parseSub(std.testing.allocator, &in); - defer res.sub.deinit(std.testing.allocator); - try std.testing.expectEqualDeep( + var in: Reader = .fixed(" 1 q 1\r\n"); + var res = try parseSub(alloc, &in); + defer res.sub.deinit(alloc); + try expectEqualDeep( Message{ .sub = .{ .subject = "1", @@ -438,10 +457,10 @@ test parseSub { ); } { - var in: std.Io.Reader = .fixed(" $SRV.PING 4\r\n"); - var res = try parseSub(std.testing.allocator, &in); - defer res.sub.deinit(std.testing.allocator); - try std.testing.expectEqualDeep( + var in: Reader = .fixed(" $SRV.PING 4\r\n"); + var res = try parseSub(alloc, &in); + defer res.sub.deinit(alloc); + try expectEqualDeep( Message{ .sub = .{ .subject = "$SRV.PING", @@ -453,10 +472,10 @@ test parseSub { ); } { - var in: std.Io.Reader = .fixed(" foo.echo q 10\r\n"); - var res = try parseSub(std.testing.allocator, &in); - defer res.sub.deinit(std.testing.allocator); - try std.testing.expectEqualDeep( + var in: Reader = .fixed(" foo.echo q 10\r\n"); + var res = try parseSub(alloc, &in); + defer res.sub.deinit(alloc); + try expectEqualDeep( Message{ .sub = .{ .subject = "foo.echo", @@ -469,7 +488,7 @@ test parseSub { } } -fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { +fn parseUnsub(alloc: Allocator, in: *Reader) !Message { const States = enum { before_first, in_first, @@ -478,15 +497,15 @@ fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { in_end, }; - var first: std.ArrayList(u8) = .empty; + var first: ArrayList(u8) = .empty; errdefer first.deinit(alloc); - var second: ?std.ArrayList(u8) = null; + var second: ?ArrayList(u8) = null; defer if (second) |*s| s.deinit(alloc); sw: switch (@as(States, .before_first)) { .before_first => { const byte = try in.peekByte(); - if (std.ascii.isWhitespace(byte)) { + if (isWhitespace(byte)) { in.toss(1); continue :sw .before_first; } @@ -494,7 +513,7 @@ fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { }, .in_first => { const byte = try in.peekByte(); - if (!std.ascii.isWhitespace(byte)) { + if (!isWhitespace(byte)) { try first.append(alloc, byte); in.toss(1); continue :sw .in_first; @@ -505,7 +524,7 @@ fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { const byte = try in.peekByte(); if (byte == '\r') { continue :sw .in_end; - } else if (std.ascii.isWhitespace(byte)) { + } else if (isWhitespace(byte)) { in.toss(1); continue :sw .after_first; } @@ -529,17 +548,20 @@ fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { return .{ .unsub = .{ .sid = try first.toOwnedSlice(alloc), - .max_msgs = if (second) |s| try std.fmt.parseUnsigned(usize, s.items, 10) else null, + .max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null, }, }; } test parseUnsub { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; + const expectEqual = std.testing.expectEqual; { - var in: std.Io.Reader = .fixed(" 1\r\n"); - var res = try parseUnsub(std.testing.allocator, &in); - defer res.unsub.deinit(std.testing.allocator); - try std.testing.expectEqualDeep( + var in: Reader = .fixed(" 1\r\n"); + var res = try parseUnsub(alloc, &in); + defer res.unsub.deinit(alloc); + try expectEqualDeep( Message{ .unsub = .{ .sid = "1", @@ -548,14 +570,14 @@ test parseUnsub { }, res, ); - try std.testing.expectEqual(0, in.buffered().len); + try expectEqual(0, in.buffered().len); } { - var in: std.Io.Reader = .fixed(" 1 1\r\n"); - var res = try parseUnsub(std.testing.allocator, &in); - defer res.unsub.deinit(std.testing.allocator); - try std.testing.expectEqualDeep( + var in: Reader = .fixed(" 1 1\r\n"); + var res = try parseUnsub(alloc, &in); + defer res.unsub.deinit(alloc); + try expectEqualDeep( Message{ .unsub = .{ .sid = "1", @@ -564,11 +586,11 @@ test parseUnsub { }, res, ); - try std.testing.expectEqual(0, in.buffered().len); + try expectEqual(0, in.buffered().len); } } -fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { +fn parsePub(alloc: Allocator, in: *Reader) !Message { try in.discardAll(1); // throw away space // Parse subject @@ -583,18 +605,18 @@ fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { in_end, }; - var second: std.ArrayList(u8) = .empty; + var second: ArrayList(u8) = .empty; defer second.deinit(alloc); - var third: ?std.ArrayList(u8) = null; + var third: ?ArrayList(u8) = null; defer if (third) |*t| t.deinit(alloc); - var payload: std.Io.Writer.Allocating = .init(alloc); + var payload: AllocatingWriter = .init(alloc); errdefer payload.deinit(); sw: switch (@as(States, .before_second)) { .before_second => { // Drop whitespace const byte = try in.peekByte(); - if (std.ascii.isWhitespace(byte)) { + if (isWhitespace(byte)) { in.toss(1); continue :sw .before_second; } @@ -602,7 +624,7 @@ fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { }, .in_second => { const byte = try in.peekByte(); - if (!std.ascii.isWhitespace(byte)) { + if (!isWhitespace(byte)) { try second.append(alloc, byte); in.toss(1); continue :sw .in_second; @@ -613,7 +635,7 @@ fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { const byte = try in.peekByte(); if (byte == '\r') { continue :sw .in_end; - } else if (std.ascii.isWhitespace(byte)) { + } else if (isWhitespace(byte)) { in.toss(1); continue :sw .after_second; } @@ -624,7 +646,7 @@ fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { const byte = try in.peekByte(); if (byte == '\r') { continue :sw .in_end; - } else if (std.ascii.isDigit(byte)) { + } else if (isDigit(byte)) { try third.?.append(alloc, byte); in.toss(1); continue :sw .in_third; @@ -639,10 +661,10 @@ fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { const reply_to: ?[]const u8, const bytes: usize = if (third) |t| .{ try alloc.dupe(u8, second.items), - try std.fmt.parseUnsigned(usize, t.items, 10), + try parseUnsigned(usize, t.items, 10), } else .{ null, - try std.fmt.parseUnsigned(usize, second.items, 10), + try parseUnsigned(usize, second.items, 10), }; try in.streamExact(&payload.writer, bytes); @@ -658,11 +680,14 @@ fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { } test parsePub { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; + const expectEqual = std.testing.expectEqual; { - var in: std.Io.Reader = .fixed(" foo 3\r\nbar\r\n"); - var res = try parsePub(std.testing.allocator, &in); - defer res.@"pub".deinit(std.testing.allocator); - try std.testing.expectEqualDeep( + var in: Reader = .fixed(" foo 3\r\nbar\r\n"); + var res = try parsePub(alloc, &in); + defer res.@"pub".deinit(alloc); + try expectEqualDeep( Message{ .@"pub" = .{ .subject = "foo", @@ -672,14 +697,14 @@ test parsePub { }, res, ); - try std.testing.expectEqual(0, in.buffered().len); + try expectEqual(0, in.buffered().len); } { - var in: std.Io.Reader = .fixed(" foo reply.to 3\r\nbar\r\n"); - var res = try parsePub(std.testing.allocator, &in); - defer res.@"pub".deinit(std.testing.allocator); - try std.testing.expectEqualDeep( + var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n"); + var res = try parsePub(alloc, &in); + defer res.@"pub".deinit(alloc); + try expectEqualDeep( Message{ .@"pub" = .{ .subject = "foo", @@ -689,15 +714,15 @@ test parsePub { }, res, ); - try std.testing.expectEqual(0, in.buffered().len); + try expectEqual(0, in.buffered().len); } // numeric reply subject { - var in: std.Io.Reader = .fixed(" foo 5 3\r\nbar\r\n"); - var res = try parsePub(std.testing.allocator, &in); - defer res.@"pub".deinit(std.testing.allocator); - try std.testing.expectEqualDeep( + var in: Reader = .fixed(" foo 5 3\r\nbar\r\n"); + var res = try parsePub(alloc, &in); + defer res.@"pub".deinit(alloc); + try expectEqualDeep( Message{ .@"pub" = .{ .subject = "foo", @@ -707,11 +732,11 @@ test parsePub { }, res, ); - try std.testing.expectEqual(0, in.buffered().len); + try expectEqual(0, in.buffered().len); } } -fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { +fn parseHPub(alloc: Allocator, in: *Reader) !Message { try in.discardAll(1); // throw away space // Parse subject @@ -728,20 +753,20 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { in_end, }; - var second: std.ArrayList(u8) = .empty; + var second: ArrayList(u8) = .empty; defer second.deinit(alloc); - var third: std.ArrayList(u8) = .empty; + var third: ArrayList(u8) = .empty; defer third.deinit(alloc); - var fourth: ?std.ArrayList(u8) = null; + var fourth: ?ArrayList(u8) = null; defer if (fourth) |*f| f.deinit(alloc); - var payload: std.Io.Writer.Allocating = .init(alloc); + var payload: AllocatingWriter = .init(alloc); errdefer payload.deinit(); sw: switch (@as(States, .before_second)) { .before_second => { // Drop whitespace const byte = try in.peekByte(); - if (std.ascii.isWhitespace(byte)) { + if (isWhitespace(byte)) { in.toss(1); continue :sw .before_second; } @@ -749,7 +774,7 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { }, .in_second => { const byte = try in.peekByte(); - if (!std.ascii.isWhitespace(byte)) { + if (!isWhitespace(byte)) { try second.append(alloc, byte); in.toss(1); continue :sw .in_second; @@ -760,7 +785,7 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { const byte = try in.peekByte(); if (byte == '\r') { continue :sw .in_end; - } else if (std.ascii.isWhitespace(byte)) { + } else if (isWhitespace(byte)) { in.toss(1); continue :sw .after_second; } @@ -769,7 +794,7 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { }, .in_third => { const byte = try in.peekByte(); - if (!std.ascii.isWhitespace(byte)) { + if (!isWhitespace(byte)) { try third.append(alloc, byte); in.toss(1); continue :sw .in_third; @@ -780,7 +805,7 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { const byte = try in.peekByte(); if (byte == '\r') { continue :sw .in_end; - } else if (std.ascii.isWhitespace(byte)) { + } else if (isWhitespace(byte)) { in.toss(1); continue :sw .after_third; } @@ -791,7 +816,7 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { const byte = try in.peekByte(); if (byte == '\r') { continue :sw .in_end; - } else if (std.ascii.isDigit(byte)) { + } else if (isDigit(byte)) { try fourth.?.append(alloc, byte); in.toss(1); continue :sw .in_fourth; @@ -806,12 +831,12 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize = if (fourth) |f| .{ try alloc.dupe(u8, second.items), - try std.fmt.parseUnsigned(usize, third.items, 10), - try std.fmt.parseUnsigned(usize, f.items, 10), + try parseUnsigned(usize, third.items, 10), + try parseUnsigned(usize, f.items, 10), } else .{ null, - try std.fmt.parseUnsigned(usize, second.items, 10), - try std.fmt.parseUnsigned(usize, third.items, 10), + try parseUnsigned(usize, second.items, 10), + try parseUnsigned(usize, third.items, 10), }; try in.streamExact(&payload.writer, total_bytes); @@ -830,11 +855,14 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { } test parseHPub { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; + const expectEqual = std.testing.expectEqual; { - var in: std.Io.Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); - var res = try parseHPub(std.testing.allocator, &in); - defer res.hpub.deinit(std.testing.allocator); - try std.testing.expectEqualDeep( + var in: Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); + var res = try parseHPub(alloc, &in); + defer res.hpub.deinit(alloc); + try expectEqualDeep( Message{ .hpub = .{ .header_bytes = 22, @@ -847,14 +875,14 @@ test parseHPub { }, res, ); - try std.testing.expectEqual(0, in.buffered().len); + try expectEqual(0, in.buffered().len); } { - var in: std.Io.Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); - var res = try parseHPub(std.testing.allocator, &in); - defer res.hpub.deinit(std.testing.allocator); - try std.testing.expectEqualDeep( + var in: Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); + var res = try parseHPub(alloc, &in); + defer res.hpub.deinit(alloc); + try expectEqualDeep( Message{ .hpub = .{ .header_bytes = 22, @@ -867,14 +895,14 @@ test parseHPub { }, res, ); - try std.testing.expectEqual(0, in.buffered().len); + try expectEqual(0, in.buffered().len); } { - var in: std.Io.Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); - var res = try parseHPub(std.testing.allocator, &in); - defer res.hpub.deinit(std.testing.allocator); - try std.testing.expectEqualDeep( + var in: Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); + var res = try parseHPub(alloc, &in); + defer res.hpub.deinit(alloc); + try expectEqualDeep( Message{ .hpub = .{ .header_bytes = 22, @@ -887,18 +915,18 @@ test parseHPub { }, res, ); - try std.testing.expectEqual(0, in.buffered().len); + try expectEqual(0, in.buffered().len); } } -fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 { - var subject_list: std.ArrayList(u8) = .empty; +fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 { + var subject_list: ArrayList(u8) = .empty; errdefer subject_list.deinit(alloc); // Handle the first character { const byte = try in.takeByte(); - if (std.ascii.isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>'))) + if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>'))) return error.InvalidStream; try subject_list.append(alloc, byte); @@ -907,37 +935,33 @@ fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader, comptime pub_or_sub switch (pub_or_sub) { .sub => { while (in.takeByte()) |byte| { - if (std.ascii.isWhitespace(byte)) break; - if (std.ascii.isAscii(byte)) { - if (byte == '.') { - const next_byte = try in.peekByte(); - if (next_byte == '.' or std.ascii.isWhitespace(next_byte)) - return error.InvalidStream; - } else if (byte == '>') { - const next_byte = try in.takeByte(); - if (!std.ascii.isWhitespace(next_byte)) - return error.InvalidStream; - } else if (byte == '*') { - const next_byte = try in.peekByte(); - if (next_byte != '.' and !std.ascii.isWhitespace(next_byte)) - return error.InvalidStream; - } - try subject_list.append(alloc, byte); + if (isWhitespace(byte)) break; + if (byte == '.') { + const next_byte = try in.peekByte(); + if (next_byte == '.' or isWhitespace(next_byte)) + return error.InvalidStream; + } else if (byte == '>') { + const next_byte = try in.takeByte(); + if (!isWhitespace(next_byte)) + return error.InvalidStream; + } else if (byte == '*') { + const next_byte = try in.peekByte(); + if (next_byte != '.' and !isWhitespace(next_byte)) + return error.InvalidStream; } + try subject_list.append(alloc, byte); } else |err| return err; }, .@"pub" => { while (in.takeByte()) |byte| { - if (std.ascii.isWhitespace(byte)) break; - if (std.ascii.isAscii(byte)) { - if (byte == '*' or byte == '>') return error.InvalidStream; - if (byte == '.') { - const next_byte = try in.peekByte(); - if (next_byte == '.' or std.ascii.isWhitespace(next_byte)) - return error.InvalidStream; - } - try subject_list.append(alloc, byte); + if (isWhitespace(byte)) break; + if (byte == '*' or byte == '>') return error.InvalidStream; + if (byte == '.') { + const next_byte = try in.peekByte(); + if (next_byte == '.' or isWhitespace(next_byte)) + return error.InvalidStream; } + try subject_list.append(alloc, byte); } else |err| return err; }, } @@ -945,7 +969,7 @@ fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader, comptime pub_or_sub return subject_list.toOwnedSlice(alloc); } -inline fn expectStreamBytes(reader: *std.Io.Reader, expected: []const u8) !void { +inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void { if (!std.mem.eql(u8, try reader.take(expected.len), expected)) { @branchHint(.unlikely); return error.InvalidStream; @@ -953,12 +977,14 @@ inline fn expectStreamBytes(reader: *std.Io.Reader, expected: []const u8) !void } test "parsing a stream" { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":fa" ++ "lse,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43" ++ ".0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r" ++ "\nPUB hi 3\r\nfoo\r\n"; - var reader: std.Io.Reader = .fixed(input); - var arena: std.heap.ArenaAllocator = .init(std.testing.allocator); + var reader: Reader = .fixed(input); + var arena: ArenaAllocator = .init(alloc); defer arena.deinit(); const gpa = arena.allocator(); @@ -977,7 +1003,7 @@ test "parsing a stream" { .no_responders = true, } }; - try std.testing.expectEqualDeep(expected, msg); + try expectEqualDeep(expected, msg); } { const msg: Message = try Message.next(gpa, &reader); @@ -985,6 +1011,6 @@ test "parsing a stream" { .subject = "hi", .payload = "foo", } }; - try std.testing.expectEqualDeep(expected, msg); + try expectEqualDeep(expected, msg); } } |
