diff options
Diffstat (limited to 'src/server/message_parser.zig')
| -rw-r--r-- | src/server/message_parser.zig | 44 |
1 files changed, 26 insertions, 18 deletions
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index ed5bc76..f99dfcb 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -206,7 +206,7 @@ pub const Message = union(MessageType) { // Parse byte count const byte_count = blk: { var byte_count_list: std.ArrayList(u8) = try .initCapacity(alloc, 64); - while (in.takeByte() catch null) |byte| { + while (in.takeByte()) |byte| { if (std.ascii.isWhitespace(byte)) { std.debug.assert(byte == '\r'); std.debug.assert(try in.takeByte() == '\n'); @@ -218,7 +218,7 @@ pub const Message = union(MessageType) { } else { return error.InvalidStream; } - } else return error.InvalidStream; + } else |err| return err; break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10); }; @@ -226,7 +226,7 @@ pub const Message = union(MessageType) { const payload = blk: { const bytes = try alloc.alloc(u8, byte_count); try in.readSliceAll(bytes); - std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n")); + try assertStreamBytes(std.mem.eql(u8, try in.take(2), "\r\n")); break :blk bytes; }; @@ -238,34 +238,35 @@ pub const Message = union(MessageType) { }; }, .ping => { - std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n")); + try assertStreamBytes(std.mem.eql(u8, try in.take(2), "\r\n")); return .ping; }, .pong => { - std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n")); + try assertStreamBytes(std.mem.eql(u8, try in.take(2), "\r\n")); return .pong; }, .sub => { - std.debug.assert(std.ascii.isWhitespace(try in.takeByte())); + try assertStreamBytes(std.ascii.isWhitespace(try in.takeByte())); const subject = try readSubject(alloc, in); const second = blk: { // Drop whitespace - while (in.peekByte() catch null) |byte| { + while (in.peekByte()) |byte| { if (std.ascii.isWhitespace(byte)) { in.toss(1); } else break; - } else return error.InvalidStream; + } else |err| return err; var acc: std.ArrayList(u8) = try .initCapacity(alloc, 32); - while (in.takeByte() catch null) |byte| { + while (in.takeByte()) |byte| { if (std.ascii.isWhitespace(byte)) break; try acc.append(alloc, byte); - } else return error.InvalidStream; + } else |err| return err; break :blk try acc.toOwnedSlice(alloc); }; const queue_group = if ((try in.peekByte()) != '\r') second else null; const sid = if (queue_group) |_| try in.takeDelimiterExclusive('\r') else second; + std.debug.print("SID is '{s}'\n", .{sid}); std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n")); return .{ .sub = .{ @@ -276,20 +277,20 @@ pub const Message = union(MessageType) { }; }, .unsub => { - std.debug.assert(std.ascii.isWhitespace(try in.takeByte())); + try assertStreamBytes(std.ascii.isWhitespace(try in.takeByte())); // Parse byte count const sid = blk: { var acc: std.ArrayList(u8) = try .initCapacity(alloc, 8); - while (in.peekByte() catch null) |byte| { + while (in.peekByte()) |byte| { if (std.ascii.isWhitespace(byte)) break; try acc.append(alloc, byte); in.toss(1); - } else return error.InvalidStream; + } else |err| return err; break :blk try acc.toOwnedSlice(alloc); }; if ((try in.peekByte()) == '\r') { - std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n")); + try assertStreamBytes(std.mem.eql(u8, try in.take(2), "\r\n")); return .{ .unsub = .{ .sid = sid, @@ -299,7 +300,7 @@ pub const Message = union(MessageType) { in.toss(1); const max_msgs = blk: { var max_msgs_list: std.ArrayList(u8) = try .initCapacity(alloc, 64); - while (in.takeByte() catch null) |byte| { + while (in.takeByte()) |byte| { if (std.ascii.isWhitespace(byte)) { std.debug.assert(byte == '\r'); std.debug.assert(try in.takeByte() == '\n'); @@ -311,7 +312,7 @@ pub const Message = union(MessageType) { } else { return error.InvalidStream; } - } else return error.InvalidStream; + } else |err| return err; break :blk try std.fmt.parseUnsigned(usize, max_msgs_list.items, 10); }; @@ -343,7 +344,7 @@ fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 { try subject_list.append(alloc, byte); } - while (in.takeByte() catch null) |byte| { + while (in.takeByte()) |byte| { if (std.ascii.isWhitespace(byte)) break; if (std.ascii.isAscii(byte)) { if (byte == '.') { @@ -353,7 +354,7 @@ fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 { } try subject_list.append(alloc, byte); } - } else return error.InvalidStream; + } else |err| return err; return subject_list.toOwnedSlice(alloc); } @@ -385,6 +386,13 @@ fn parsePub(in: *std.Io.Reader) !Message.Pub { }; } +inline fn assertStreamBytes(cond: bool) !void { + if (!cond) { + @branchHint(.unlikely); + return error.InvalidStream; + } +} + // try returning error in debug mode, only null in release? // pub fn parseNextMessage(alloc: std.mem.Allocator, in: *std.Io.Reader) ?Message { // const message_type: MessageType = blk: { |
