summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-03 05:53:23 +0000
committerRobby Zambito <contact@robbyzambito.me>2026-01-03 05:54:14 +0000
commitdcd09e2f10a95d334a598ea8853c4f0e326fcfb2 (patch)
tree346fcbdb54d459ffa4bf53eb2cebbfabd2148169
parentbd9829f6842f0c989389aa4ce9784ab6e3cb4ee5 (diff)
cleanup imports
-rw-r--r--src/server/message_parser.zig336
1 files changed, 181 insertions, 155 deletions
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig
index 54149cb..1e7527d 100644
--- a/src/server/message_parser.zig
+++ b/src/server/message_parser.zig
@@ -1,4 +1,21 @@
const std = @import("std");
+const Allocator = std.mem.Allocator;
+const ArenaAllocator = std.heap.ArenaAllocator;
+const ArrayList = std.ArrayList;
+const StaticStringMap = std.StaticStringMap;
+
+const Io = std.Io;
+const AllocatingWriter = Io.Writer.Allocating;
+const Reader = Io.Reader;
+
+const ascii = std.ascii;
+const isDigit = std.ascii.isDigit;
+const isUpper = std.ascii.isUpper;
+const isWhitespace = std.ascii.isWhitespace;
+
+const parseUnsigned = std.fmt.parseUnsigned;
+
+const log = std.log;
pub const MessageType = @typeInfo(Message).@"union".tag_type.?;
@@ -60,7 +77,7 @@ pub const Message = union(enum) {
headers: ?bool = null,
nkey: ?[]const u8 = null,
- pub fn deinit(self: Connect, alloc: std.mem.Allocator) void {
+ pub fn deinit(self: Connect, alloc: Allocator) void {
if (self.auth_token) |a| alloc.free(a);
if (self.user) |u| alloc.free(u);
if (self.pass) |p| alloc.free(p);
@@ -72,7 +89,7 @@ pub const Message = union(enum) {
if (self.nkey) |n| alloc.free(n);
}
- pub fn dupe(self: Connect, alloc: std.mem.Allocator) !Connect {
+ pub fn dupe(self: Connect, alloc: Allocator) !Connect {
var res = self;
res.auth_token = if (self.auth_token) |a| try alloc.dupe(u8, a) else null;
errdefer if (res.auth_token) |a| alloc.free(a);
@@ -103,13 +120,13 @@ pub const Message = union(enum) {
/// The message payload data.
payload: []const u8,
- pub fn deinit(self: Pub, alloc: std.mem.Allocator) void {
+ pub fn deinit(self: Pub, alloc: Allocator) void {
alloc.free(self.subject);
alloc.free(self.payload);
if (self.reply_to) |r| alloc.free(r);
}
- pub fn toMsg(self: Pub, alloc: std.mem.Allocator, sid: []const u8) !Msg {
+ pub fn toMsg(self: Pub, alloc: Allocator, sid: []const u8) !Msg {
const res: Msg = .{
.subject = self.subject,
.sid = sid,
@@ -123,11 +140,11 @@ pub const Message = union(enum) {
header_bytes: usize,
@"pub": Pub,
- pub fn deinit(self: HPub, alloc: std.mem.Allocator) void {
+ pub fn deinit(self: HPub, alloc: Allocator) void {
self.@"pub".deinit(alloc);
}
- pub fn toHMsg(self: HPub, alloc: std.mem.Allocator, sid: []const u8) !HMsg {
+ pub fn toHMsg(self: HPub, alloc: Allocator, sid: []const u8) !HMsg {
return .{
.header_bytes = self.header_bytes,
.msg = try self.@"pub".toMsg(alloc, sid),
@@ -139,11 +156,11 @@ pub const Message = union(enum) {
header_bytes: usize,
msg: Msg,
- pub fn deinit(self: HMsg, alloc: std.mem.Allocator) void {
+ pub fn deinit(self: HMsg, alloc: Allocator) void {
self.msg.deinit(alloc);
}
- pub fn dupe(self: HMsg, alloc: std.mem.Allocator) !HMsg {
+ pub fn dupe(self: HMsg, alloc: Allocator) !HMsg {
var res = self;
res.msg = try self.msg.dupe(alloc);
return res;
@@ -157,7 +174,7 @@ pub const Message = union(enum) {
/// A unique alphanumeric subscription ID, generated by the client.
sid: []const u8,
- pub fn deinit(self: Sub, alloc: std.mem.Allocator) void {
+ pub fn deinit(self: Sub, alloc: Allocator) void {
alloc.free(self.subject);
alloc.free(self.sid);
if (self.queue_group) |q| alloc.free(q);
@@ -169,7 +186,7 @@ pub const Message = union(enum) {
/// A number of messages to wait for before automatically unsubscribing.
max_msgs: ?usize = null,
- pub fn deinit(self: Unsub, alloc: std.mem.Allocator) void {
+ pub fn deinit(self: Unsub, alloc: Allocator) void {
alloc.free(self.sid);
}
};
@@ -179,14 +196,14 @@ pub const Message = union(enum) {
reply_to: ?[]const u8,
payload: []const u8,
- pub fn deinit(self: Msg, alloc: std.mem.Allocator) void {
+ pub fn deinit(self: Msg, alloc: Allocator) void {
alloc.free(self.subject);
alloc.free(self.sid);
if (self.reply_to) |r| alloc.free(r);
alloc.free(self.payload);
}
- pub fn dupe(self: Msg, alloc: std.mem.Allocator) !Msg {
+ pub fn dupe(self: Msg, alloc: Allocator) !Msg {
var res: Msg = undefined;
res.subject = try alloc.dupe(u8, self.subject);
errdefer alloc.free(res.subject);
@@ -200,7 +217,7 @@ pub const Message = union(enum) {
}
};
- const client_types = std.StaticStringMap(MessageType).initComptime(
+ const client_types = StaticStringMap(MessageType).initComptime(
.{
// {"INFO", .info},
.{ "CONNECT", .connect },
@@ -223,8 +240,8 @@ pub const Message = union(enum) {
pub const parse = parseStaticStringMap;
/// An error should be handled by cleaning up this connection.
- pub fn next(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
- var operation_string: std.ArrayList(u8) = blk: {
+ pub fn next(alloc: Allocator, in: *Reader) !Message {
+ var operation_string: ArrayList(u8) = blk: {
comptime var buf_len = 0;
comptime {
for (client_types.keys()) |key| {
@@ -236,28 +253,28 @@ pub const Message = union(enum) {
};
while (in.peekByte()) |byte| {
- if (std.ascii.isUpper(byte)) {
+ if (isUpper(byte)) {
try operation_string.appendBounded(byte);
in.toss(1);
} else break;
} else |err| return err;
const operation = parse(operation_string.items) orelse {
- std.log.err("Invalid operation: '{s}'", .{operation_string.items});
+ log.err("Invalid operation: '{s}'", .{operation_string.items});
return error.InvalidOperation;
};
- errdefer std.log.err("Failed to parse {s}", .{operation_string.items});
+ errdefer log.err("Failed to parse {s}", .{operation_string.items});
switch (operation) {
.connect => {
// for storing the json string
- var connect_string_writer_allocating: std.Io.Writer.Allocating = .init(alloc);
+ var connect_string_writer_allocating: AllocatingWriter = .init(alloc);
defer connect_string_writer_allocating.deinit();
var connect_string_writer = &connect_string_writer_allocating.writer;
// for parsing the json string
- var connect_arena_allocator: std.heap.ArenaAllocator = .init(alloc);
+ var connect_arena_allocator: ArenaAllocator = .init(alloc);
defer connect_arena_allocator.deinit();
const connect_allocator = connect_arena_allocator.allocator();
@@ -307,7 +324,7 @@ pub const Message = union(enum) {
}
};
-fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
+fn parseSub(alloc: Allocator, in: *Reader) !Message {
try in.discardAll(1); // throw away space
const subject = try readSubject(alloc, in, .sub);
@@ -319,15 +336,15 @@ fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
in_end,
};
- var second: std.ArrayList(u8) = .empty;
+ var second: ArrayList(u8) = .empty;
errdefer second.deinit(alloc);
- var third: ?std.ArrayList(u8) = null;
+ var third: ?ArrayList(u8) = null;
errdefer if (third) |*t| t.deinit(alloc);
sw: switch (@as(States, .before_second)) {
.before_second => {
const byte = try in.peekByte();
- if (std.ascii.isWhitespace(byte)) {
+ if (isWhitespace(byte)) {
in.toss(1);
continue :sw .before_second;
}
@@ -335,7 +352,7 @@ fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
},
.in_second => {
const byte = try in.peekByte();
- if (!std.ascii.isWhitespace(byte)) {
+ if (!isWhitespace(byte)) {
try second.append(alloc, byte);
in.toss(1);
continue :sw .in_second;
@@ -346,7 +363,7 @@ fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const byte = try in.peekByte();
if (byte == '\r') {
continue :sw .in_end;
- } else if (std.ascii.isWhitespace(byte)) {
+ } else if (isWhitespace(byte)) {
in.toss(1);
continue :sw .after_second;
}
@@ -377,11 +394,13 @@ fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
}
test parseSub {
+ const alloc = std.testing.allocator;
+ const expectEqualDeep = std.testing.expectEqualDeep;
{
- var in: std.Io.Reader = .fixed(" foo 1\r\n");
- var res = try parseSub(std.testing.allocator, &in);
- defer res.sub.deinit(std.testing.allocator);
- try std.testing.expectEqualDeep(
+ var in: Reader = .fixed(" foo 1\r\n");
+ var res = try parseSub(alloc, &in);
+ defer res.sub.deinit(alloc);
+ try expectEqualDeep(
Message{
.sub = .{
.subject = "foo",
@@ -393,10 +412,10 @@ test parseSub {
);
}
{
- var in: std.Io.Reader = .fixed(" foo 1\r\n");
- var res = try parseSub(std.testing.allocator, &in);
- defer res.sub.deinit(std.testing.allocator);
- try std.testing.expectEqualDeep(
+ var in: Reader = .fixed(" foo 1\r\n");
+ var res = try parseSub(alloc, &in);
+ defer res.sub.deinit(alloc);
+ try expectEqualDeep(
Message{
.sub = .{
.subject = "foo",
@@ -408,10 +427,10 @@ test parseSub {
);
}
{
- var in: std.Io.Reader = .fixed(" foo q 1\r\n");
- var res = try parseSub(std.testing.allocator, &in);
- defer res.sub.deinit(std.testing.allocator);
- try std.testing.expectEqualDeep(
+ var in: Reader = .fixed(" foo q 1\r\n");
+ var res = try parseSub(alloc, &in);
+ defer res.sub.deinit(alloc);
+ try expectEqualDeep(
Message{
.sub = .{
.subject = "foo",
@@ -423,10 +442,10 @@ test parseSub {
);
}
{
- var in: std.Io.Reader = .fixed(" 1 q 1\r\n");
- var res = try parseSub(std.testing.allocator, &in);
- defer res.sub.deinit(std.testing.allocator);
- try std.testing.expectEqualDeep(
+ var in: Reader = .fixed(" 1 q 1\r\n");
+ var res = try parseSub(alloc, &in);
+ defer res.sub.deinit(alloc);
+ try expectEqualDeep(
Message{
.sub = .{
.subject = "1",
@@ -438,10 +457,10 @@ test parseSub {
);
}
{
- var in: std.Io.Reader = .fixed(" $SRV.PING 4\r\n");
- var res = try parseSub(std.testing.allocator, &in);
- defer res.sub.deinit(std.testing.allocator);
- try std.testing.expectEqualDeep(
+ var in: Reader = .fixed(" $SRV.PING 4\r\n");
+ var res = try parseSub(alloc, &in);
+ defer res.sub.deinit(alloc);
+ try expectEqualDeep(
Message{
.sub = .{
.subject = "$SRV.PING",
@@ -453,10 +472,10 @@ test parseSub {
);
}
{
- var in: std.Io.Reader = .fixed(" foo.echo q 10\r\n");
- var res = try parseSub(std.testing.allocator, &in);
- defer res.sub.deinit(std.testing.allocator);
- try std.testing.expectEqualDeep(
+ var in: Reader = .fixed(" foo.echo q 10\r\n");
+ var res = try parseSub(alloc, &in);
+ defer res.sub.deinit(alloc);
+ try expectEqualDeep(
Message{
.sub = .{
.subject = "foo.echo",
@@ -469,7 +488,7 @@ test parseSub {
}
}
-fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
+fn parseUnsub(alloc: Allocator, in: *Reader) !Message {
const States = enum {
before_first,
in_first,
@@ -478,15 +497,15 @@ fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
in_end,
};
- var first: std.ArrayList(u8) = .empty;
+ var first: ArrayList(u8) = .empty;
errdefer first.deinit(alloc);
- var second: ?std.ArrayList(u8) = null;
+ var second: ?ArrayList(u8) = null;
defer if (second) |*s| s.deinit(alloc);
sw: switch (@as(States, .before_first)) {
.before_first => {
const byte = try in.peekByte();
- if (std.ascii.isWhitespace(byte)) {
+ if (isWhitespace(byte)) {
in.toss(1);
continue :sw .before_first;
}
@@ -494,7 +513,7 @@ fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
},
.in_first => {
const byte = try in.peekByte();
- if (!std.ascii.isWhitespace(byte)) {
+ if (!isWhitespace(byte)) {
try first.append(alloc, byte);
in.toss(1);
continue :sw .in_first;
@@ -505,7 +524,7 @@ fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const byte = try in.peekByte();
if (byte == '\r') {
continue :sw .in_end;
- } else if (std.ascii.isWhitespace(byte)) {
+ } else if (isWhitespace(byte)) {
in.toss(1);
continue :sw .after_first;
}
@@ -529,17 +548,20 @@ fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
return .{
.unsub = .{
.sid = try first.toOwnedSlice(alloc),
- .max_msgs = if (second) |s| try std.fmt.parseUnsigned(usize, s.items, 10) else null,
+ .max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null,
},
};
}
test parseUnsub {
+ const alloc = std.testing.allocator;
+ const expectEqualDeep = std.testing.expectEqualDeep;
+ const expectEqual = std.testing.expectEqual;
{
- var in: std.Io.Reader = .fixed(" 1\r\n");
- var res = try parseUnsub(std.testing.allocator, &in);
- defer res.unsub.deinit(std.testing.allocator);
- try std.testing.expectEqualDeep(
+ var in: Reader = .fixed(" 1\r\n");
+ var res = try parseUnsub(alloc, &in);
+ defer res.unsub.deinit(alloc);
+ try expectEqualDeep(
Message{
.unsub = .{
.sid = "1",
@@ -548,14 +570,14 @@ test parseUnsub {
},
res,
);
- try std.testing.expectEqual(0, in.buffered().len);
+ try expectEqual(0, in.buffered().len);
}
{
- var in: std.Io.Reader = .fixed(" 1 1\r\n");
- var res = try parseUnsub(std.testing.allocator, &in);
- defer res.unsub.deinit(std.testing.allocator);
- try std.testing.expectEqualDeep(
+ var in: Reader = .fixed(" 1 1\r\n");
+ var res = try parseUnsub(alloc, &in);
+ defer res.unsub.deinit(alloc);
+ try expectEqualDeep(
Message{
.unsub = .{
.sid = "1",
@@ -564,11 +586,11 @@ test parseUnsub {
},
res,
);
- try std.testing.expectEqual(0, in.buffered().len);
+ try expectEqual(0, in.buffered().len);
}
}
-fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
+fn parsePub(alloc: Allocator, in: *Reader) !Message {
try in.discardAll(1); // throw away space
// Parse subject
@@ -583,18 +605,18 @@ fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
in_end,
};
- var second: std.ArrayList(u8) = .empty;
+ var second: ArrayList(u8) = .empty;
defer second.deinit(alloc);
- var third: ?std.ArrayList(u8) = null;
+ var third: ?ArrayList(u8) = null;
defer if (third) |*t| t.deinit(alloc);
- var payload: std.Io.Writer.Allocating = .init(alloc);
+ var payload: AllocatingWriter = .init(alloc);
errdefer payload.deinit();
sw: switch (@as(States, .before_second)) {
.before_second => {
// Drop whitespace
const byte = try in.peekByte();
- if (std.ascii.isWhitespace(byte)) {
+ if (isWhitespace(byte)) {
in.toss(1);
continue :sw .before_second;
}
@@ -602,7 +624,7 @@ fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
},
.in_second => {
const byte = try in.peekByte();
- if (!std.ascii.isWhitespace(byte)) {
+ if (!isWhitespace(byte)) {
try second.append(alloc, byte);
in.toss(1);
continue :sw .in_second;
@@ -613,7 +635,7 @@ fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const byte = try in.peekByte();
if (byte == '\r') {
continue :sw .in_end;
- } else if (std.ascii.isWhitespace(byte)) {
+ } else if (isWhitespace(byte)) {
in.toss(1);
continue :sw .after_second;
}
@@ -624,7 +646,7 @@ fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const byte = try in.peekByte();
if (byte == '\r') {
continue :sw .in_end;
- } else if (std.ascii.isDigit(byte)) {
+ } else if (isDigit(byte)) {
try third.?.append(alloc, byte);
in.toss(1);
continue :sw .in_third;
@@ -639,10 +661,10 @@ fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const reply_to: ?[]const u8, const bytes: usize =
if (third) |t| .{
try alloc.dupe(u8, second.items),
- try std.fmt.parseUnsigned(usize, t.items, 10),
+ try parseUnsigned(usize, t.items, 10),
} else .{
null,
- try std.fmt.parseUnsigned(usize, second.items, 10),
+ try parseUnsigned(usize, second.items, 10),
};
try in.streamExact(&payload.writer, bytes);
@@ -658,11 +680,14 @@ fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
}
test parsePub {
+ const alloc = std.testing.allocator;
+ const expectEqualDeep = std.testing.expectEqualDeep;
+ const expectEqual = std.testing.expectEqual;
{
- var in: std.Io.Reader = .fixed(" foo 3\r\nbar\r\n");
- var res = try parsePub(std.testing.allocator, &in);
- defer res.@"pub".deinit(std.testing.allocator);
- try std.testing.expectEqualDeep(
+ var in: Reader = .fixed(" foo 3\r\nbar\r\n");
+ var res = try parsePub(alloc, &in);
+ defer res.@"pub".deinit(alloc);
+ try expectEqualDeep(
Message{
.@"pub" = .{
.subject = "foo",
@@ -672,14 +697,14 @@ test parsePub {
},
res,
);
- try std.testing.expectEqual(0, in.buffered().len);
+ try expectEqual(0, in.buffered().len);
}
{
- var in: std.Io.Reader = .fixed(" foo reply.to 3\r\nbar\r\n");
- var res = try parsePub(std.testing.allocator, &in);
- defer res.@"pub".deinit(std.testing.allocator);
- try std.testing.expectEqualDeep(
+ var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n");
+ var res = try parsePub(alloc, &in);
+ defer res.@"pub".deinit(alloc);
+ try expectEqualDeep(
Message{
.@"pub" = .{
.subject = "foo",
@@ -689,15 +714,15 @@ test parsePub {
},
res,
);
- try std.testing.expectEqual(0, in.buffered().len);
+ try expectEqual(0, in.buffered().len);
}
// numeric reply subject
{
- var in: std.Io.Reader = .fixed(" foo 5 3\r\nbar\r\n");
- var res = try parsePub(std.testing.allocator, &in);
- defer res.@"pub".deinit(std.testing.allocator);
- try std.testing.expectEqualDeep(
+ var in: Reader = .fixed(" foo 5 3\r\nbar\r\n");
+ var res = try parsePub(alloc, &in);
+ defer res.@"pub".deinit(alloc);
+ try expectEqualDeep(
Message{
.@"pub" = .{
.subject = "foo",
@@ -707,11 +732,11 @@ test parsePub {
},
res,
);
- try std.testing.expectEqual(0, in.buffered().len);
+ try expectEqual(0, in.buffered().len);
}
}
-fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
+fn parseHPub(alloc: Allocator, in: *Reader) !Message {
try in.discardAll(1); // throw away space
// Parse subject
@@ -728,20 +753,20 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
in_end,
};
- var second: std.ArrayList(u8) = .empty;
+ var second: ArrayList(u8) = .empty;
defer second.deinit(alloc);
- var third: std.ArrayList(u8) = .empty;
+ var third: ArrayList(u8) = .empty;
defer third.deinit(alloc);
- var fourth: ?std.ArrayList(u8) = null;
+ var fourth: ?ArrayList(u8) = null;
defer if (fourth) |*f| f.deinit(alloc);
- var payload: std.Io.Writer.Allocating = .init(alloc);
+ var payload: AllocatingWriter = .init(alloc);
errdefer payload.deinit();
sw: switch (@as(States, .before_second)) {
.before_second => {
// Drop whitespace
const byte = try in.peekByte();
- if (std.ascii.isWhitespace(byte)) {
+ if (isWhitespace(byte)) {
in.toss(1);
continue :sw .before_second;
}
@@ -749,7 +774,7 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
},
.in_second => {
const byte = try in.peekByte();
- if (!std.ascii.isWhitespace(byte)) {
+ if (!isWhitespace(byte)) {
try second.append(alloc, byte);
in.toss(1);
continue :sw .in_second;
@@ -760,7 +785,7 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const byte = try in.peekByte();
if (byte == '\r') {
continue :sw .in_end;
- } else if (std.ascii.isWhitespace(byte)) {
+ } else if (isWhitespace(byte)) {
in.toss(1);
continue :sw .after_second;
}
@@ -769,7 +794,7 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
},
.in_third => {
const byte = try in.peekByte();
- if (!std.ascii.isWhitespace(byte)) {
+ if (!isWhitespace(byte)) {
try third.append(alloc, byte);
in.toss(1);
continue :sw .in_third;
@@ -780,7 +805,7 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const byte = try in.peekByte();
if (byte == '\r') {
continue :sw .in_end;
- } else if (std.ascii.isWhitespace(byte)) {
+ } else if (isWhitespace(byte)) {
in.toss(1);
continue :sw .after_third;
}
@@ -791,7 +816,7 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const byte = try in.peekByte();
if (byte == '\r') {
continue :sw .in_end;
- } else if (std.ascii.isDigit(byte)) {
+ } else if (isDigit(byte)) {
try fourth.?.append(alloc, byte);
in.toss(1);
continue :sw .in_fourth;
@@ -806,12 +831,12 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize =
if (fourth) |f| .{
try alloc.dupe(u8, second.items),
- try std.fmt.parseUnsigned(usize, third.items, 10),
- try std.fmt.parseUnsigned(usize, f.items, 10),
+ try parseUnsigned(usize, third.items, 10),
+ try parseUnsigned(usize, f.items, 10),
} else .{
null,
- try std.fmt.parseUnsigned(usize, second.items, 10),
- try std.fmt.parseUnsigned(usize, third.items, 10),
+ try parseUnsigned(usize, second.items, 10),
+ try parseUnsigned(usize, third.items, 10),
};
try in.streamExact(&payload.writer, total_bytes);
@@ -830,11 +855,14 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
}
test parseHPub {
+ const alloc = std.testing.allocator;
+ const expectEqualDeep = std.testing.expectEqualDeep;
+ const expectEqual = std.testing.expectEqual;
{
- var in: std.Io.Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
- var res = try parseHPub(std.testing.allocator, &in);
- defer res.hpub.deinit(std.testing.allocator);
- try std.testing.expectEqualDeep(
+ var in: Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
+ var res = try parseHPub(alloc, &in);
+ defer res.hpub.deinit(alloc);
+ try expectEqualDeep(
Message{
.hpub = .{
.header_bytes = 22,
@@ -847,14 +875,14 @@ test parseHPub {
},
res,
);
- try std.testing.expectEqual(0, in.buffered().len);
+ try expectEqual(0, in.buffered().len);
}
{
- var in: std.Io.Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
- var res = try parseHPub(std.testing.allocator, &in);
- defer res.hpub.deinit(std.testing.allocator);
- try std.testing.expectEqualDeep(
+ var in: Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
+ var res = try parseHPub(alloc, &in);
+ defer res.hpub.deinit(alloc);
+ try expectEqualDeep(
Message{
.hpub = .{
.header_bytes = 22,
@@ -867,14 +895,14 @@ test parseHPub {
},
res,
);
- try std.testing.expectEqual(0, in.buffered().len);
+ try expectEqual(0, in.buffered().len);
}
{
- var in: std.Io.Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
- var res = try parseHPub(std.testing.allocator, &in);
- defer res.hpub.deinit(std.testing.allocator);
- try std.testing.expectEqualDeep(
+ var in: Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
+ var res = try parseHPub(alloc, &in);
+ defer res.hpub.deinit(alloc);
+ try expectEqualDeep(
Message{
.hpub = .{
.header_bytes = 22,
@@ -887,18 +915,18 @@ test parseHPub {
},
res,
);
- try std.testing.expectEqual(0, in.buffered().len);
+ try expectEqual(0, in.buffered().len);
}
}
-fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 {
- var subject_list: std.ArrayList(u8) = .empty;
+fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 {
+ var subject_list: ArrayList(u8) = .empty;
errdefer subject_list.deinit(alloc);
// Handle the first character
{
const byte = try in.takeByte();
- if (std.ascii.isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>')))
+ if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>')))
return error.InvalidStream;
try subject_list.append(alloc, byte);
@@ -907,37 +935,33 @@ fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader, comptime pub_or_sub
switch (pub_or_sub) {
.sub => {
while (in.takeByte()) |byte| {
- if (std.ascii.isWhitespace(byte)) break;
- if (std.ascii.isAscii(byte)) {
- if (byte == '.') {
- const next_byte = try in.peekByte();
- if (next_byte == '.' or std.ascii.isWhitespace(next_byte))
- return error.InvalidStream;
- } else if (byte == '>') {
- const next_byte = try in.takeByte();
- if (!std.ascii.isWhitespace(next_byte))
- return error.InvalidStream;
- } else if (byte == '*') {
- const next_byte = try in.peekByte();
- if (next_byte != '.' and !std.ascii.isWhitespace(next_byte))
- return error.InvalidStream;
- }
- try subject_list.append(alloc, byte);
+ if (isWhitespace(byte)) break;
+ if (byte == '.') {
+ const next_byte = try in.peekByte();
+ if (next_byte == '.' or isWhitespace(next_byte))
+ return error.InvalidStream;
+ } else if (byte == '>') {
+ const next_byte = try in.takeByte();
+ if (!isWhitespace(next_byte))
+ return error.InvalidStream;
+ } else if (byte == '*') {
+ const next_byte = try in.peekByte();
+ if (next_byte != '.' and !isWhitespace(next_byte))
+ return error.InvalidStream;
}
+ try subject_list.append(alloc, byte);
} else |err| return err;
},
.@"pub" => {
while (in.takeByte()) |byte| {
- if (std.ascii.isWhitespace(byte)) break;
- if (std.ascii.isAscii(byte)) {
- if (byte == '*' or byte == '>') return error.InvalidStream;
- if (byte == '.') {
- const next_byte = try in.peekByte();
- if (next_byte == '.' or std.ascii.isWhitespace(next_byte))
- return error.InvalidStream;
- }
- try subject_list.append(alloc, byte);
+ if (isWhitespace(byte)) break;
+ if (byte == '*' or byte == '>') return error.InvalidStream;
+ if (byte == '.') {
+ const next_byte = try in.peekByte();
+ if (next_byte == '.' or isWhitespace(next_byte))
+ return error.InvalidStream;
}
+ try subject_list.append(alloc, byte);
} else |err| return err;
},
}
@@ -945,7 +969,7 @@ fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader, comptime pub_or_sub
return subject_list.toOwnedSlice(alloc);
}
-inline fn expectStreamBytes(reader: *std.Io.Reader, expected: []const u8) !void {
+inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void {
if (!std.mem.eql(u8, try reader.take(expected.len), expected)) {
@branchHint(.unlikely);
return error.InvalidStream;
@@ -953,12 +977,14 @@ inline fn expectStreamBytes(reader: *std.Io.Reader, expected: []const u8) !void
}
test "parsing a stream" {
+ const alloc = std.testing.allocator;
+ const expectEqualDeep = std.testing.expectEqualDeep;
const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":fa" ++
"lse,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43" ++
".0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r" ++
"\nPUB hi 3\r\nfoo\r\n";
- var reader: std.Io.Reader = .fixed(input);
- var arena: std.heap.ArenaAllocator = .init(std.testing.allocator);
+ var reader: Reader = .fixed(input);
+ var arena: ArenaAllocator = .init(alloc);
defer arena.deinit();
const gpa = arena.allocator();
@@ -977,7 +1003,7 @@ test "parsing a stream" {
.no_responders = true,
} };
- try std.testing.expectEqualDeep(expected, msg);
+ try expectEqualDeep(expected, msg);
}
{
const msg: Message = try Message.next(gpa, &reader);
@@ -985,6 +1011,6 @@ test "parsing a stream" {
.subject = "hi",
.payload = "foo",
} };
- try std.testing.expectEqualDeep(expected, msg);
+ try expectEqualDeep(expected, msg);
}
}