summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/server/message_parser.zig18
1 files changed, 9 insertions, 9 deletions
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig
index 50e4e41..41f309a 100644
--- a/src/server/message_parser.zig
+++ b/src/server/message_parser.zig
@@ -190,7 +190,7 @@ pub const Message = union(MessageType) {
// Should read the next JSON object to the fixed buffer writer.
_ = try in.streamDelimiter(&connect_string_writer, '}');
try connect_string_writer.writeByte('}');
- try assertStreamBytes(in, "}\r\n"); // discard '}\r\n'
+ try expectStreamBytes(in, "}\r\n"); // discard '}\r\n'
// TODO: should be CONNECTION allocator
const res = try std.json.parseFromSliceLeaky(Connect, connect_allocator, connect_string_writer.buffered(), .{ .allocate = .alloc_always });
@@ -208,7 +208,7 @@ pub const Message = union(MessageType) {
var byte_count_list: std.ArrayList(u8) = try .initCapacity(alloc, 64);
while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) {
- try assertStreamBytes(in, "\r\n");
+ try expectStreamBytes(in, "\r\n");
break;
}
defer in.toss(1);
@@ -226,7 +226,7 @@ pub const Message = union(MessageType) {
const payload = blk: {
const bytes = try alloc.alloc(u8, byte_count);
try in.readSliceAll(bytes);
- try assertStreamBytes(in, "\r\n");
+ try expectStreamBytes(in, "\r\n");
break :blk bytes;
};
@@ -238,11 +238,11 @@ pub const Message = union(MessageType) {
};
},
.ping => {
- try assertStreamBytes(in, "\r\n");
+ try expectStreamBytes(in, "\r\n");
return .ping;
},
.pong => {
- try assertStreamBytes(in, "\r\n");
+ try expectStreamBytes(in, "\r\n");
return .pong;
},
.sub => {
@@ -270,7 +270,7 @@ pub const Message = union(MessageType) {
};
const queue_group = if ((try in.peekByte()) != '\r') second else null;
const sid = if (queue_group) |_| try in.takeDelimiterExclusive('\r') else second;
- try assertStreamBytes(in, "\r\n");
+ try expectStreamBytes(in, "\r\n");
return .{
.sub = .{
.subject = subject,
@@ -296,7 +296,7 @@ pub const Message = union(MessageType) {
};
if ((try in.peekByte()) == '\r') {
- try assertStreamBytes(in, "\r\n");
+ try expectStreamBytes(in, "\r\n");
return .{
.unsub = .{
.sid = sid,
@@ -308,7 +308,7 @@ pub const Message = union(MessageType) {
var max_msgs_list: std.ArrayList(u8) = try .initCapacity(alloc, 64);
while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) {
- try assertStreamBytes(in, "\r\n");
+ try expectStreamBytes(in, "\r\n");
break;
}
@@ -390,7 +390,7 @@ fn parsePub(in: *std.Io.Reader) !Message.Pub {
};
}
-inline fn assertStreamBytes(reader: *std.Io.Reader, expected: []const u8) !void {
+inline fn expectStreamBytes(reader: *std.Io.Reader, expected: []const u8) !void {
if (!std.mem.eql(u8, try reader.take(expected.len), expected)) {
@branchHint(.unlikely);
return error.InvalidStream;