summaryrefslogtreecommitdiff
path: root/src/server/message_parser.zig
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2025-12-29 01:34:10 +0000
committerRobby Zambito <contact@robbyzambito.me>2025-12-29 04:23:16 +0000
commit4bf5ddca1508fc485238d9bfebfe67740a7668b1 (patch)
treeddc7438bbe4638598a28272bb48c13a5940e8b04 /src/server/message_parser.zig
parent335c4aa092b2ba1b8233c8c4b25d98c8f01f584e (diff)
publish works
starting to use errors instead of unreachable for stream parsing
Diffstat (limited to 'src/server/message_parser.zig')
-rw-r--r--src/server/message_parser.zig44
1 files changed, 26 insertions, 18 deletions
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig
index ed5bc76..f99dfcb 100644
--- a/src/server/message_parser.zig
+++ b/src/server/message_parser.zig
@@ -206,7 +206,7 @@ pub const Message = union(MessageType) {
// Parse byte count
const byte_count = blk: {
var byte_count_list: std.ArrayList(u8) = try .initCapacity(alloc, 64);
- while (in.takeByte() catch null) |byte| {
+ while (in.takeByte()) |byte| {
if (std.ascii.isWhitespace(byte)) {
std.debug.assert(byte == '\r');
std.debug.assert(try in.takeByte() == '\n');
@@ -218,7 +218,7 @@ pub const Message = union(MessageType) {
} else {
return error.InvalidStream;
}
- } else return error.InvalidStream;
+ } else |err| return err;
break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
};
@@ -226,7 +226,7 @@ pub const Message = union(MessageType) {
const payload = blk: {
const bytes = try alloc.alloc(u8, byte_count);
try in.readSliceAll(bytes);
- std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n"));
+ try assertStreamBytes(std.mem.eql(u8, try in.take(2), "\r\n"));
break :blk bytes;
};
@@ -238,34 +238,35 @@ pub const Message = union(MessageType) {
};
},
.ping => {
- std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n"));
+ try assertStreamBytes(std.mem.eql(u8, try in.take(2), "\r\n"));
return .ping;
},
.pong => {
- std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n"));
+ try assertStreamBytes(std.mem.eql(u8, try in.take(2), "\r\n"));
return .pong;
},
.sub => {
- std.debug.assert(std.ascii.isWhitespace(try in.takeByte()));
+ try assertStreamBytes(std.ascii.isWhitespace(try in.takeByte()));
const subject = try readSubject(alloc, in);
const second = blk: {
// Drop whitespace
- while (in.peekByte() catch null) |byte| {
+ while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) {
in.toss(1);
} else break;
- } else return error.InvalidStream;
+ } else |err| return err;
var acc: std.ArrayList(u8) = try .initCapacity(alloc, 32);
- while (in.takeByte() catch null) |byte| {
+ while (in.takeByte()) |byte| {
if (std.ascii.isWhitespace(byte)) break;
try acc.append(alloc, byte);
- } else return error.InvalidStream;
+ } else |err| return err;
break :blk try acc.toOwnedSlice(alloc);
};
const queue_group = if ((try in.peekByte()) != '\r') second else null;
const sid = if (queue_group) |_| try in.takeDelimiterExclusive('\r') else second;
+ std.debug.print("SID is '{s}'\n", .{sid});
std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n"));
return .{
.sub = .{
@@ -276,20 +277,20 @@ pub const Message = union(MessageType) {
};
},
.unsub => {
- std.debug.assert(std.ascii.isWhitespace(try in.takeByte()));
+ try assertStreamBytes(std.ascii.isWhitespace(try in.takeByte()));
// Parse byte count
const sid = blk: {
var acc: std.ArrayList(u8) = try .initCapacity(alloc, 8);
- while (in.peekByte() catch null) |byte| {
+ while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) break;
try acc.append(alloc, byte);
in.toss(1);
- } else return error.InvalidStream;
+ } else |err| return err;
break :blk try acc.toOwnedSlice(alloc);
};
if ((try in.peekByte()) == '\r') {
- std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n"));
+ try assertStreamBytes(std.mem.eql(u8, try in.take(2), "\r\n"));
return .{
.unsub = .{
.sid = sid,
@@ -299,7 +300,7 @@ pub const Message = union(MessageType) {
in.toss(1);
const max_msgs = blk: {
var max_msgs_list: std.ArrayList(u8) = try .initCapacity(alloc, 64);
- while (in.takeByte() catch null) |byte| {
+ while (in.takeByte()) |byte| {
if (std.ascii.isWhitespace(byte)) {
std.debug.assert(byte == '\r');
std.debug.assert(try in.takeByte() == '\n');
@@ -311,7 +312,7 @@ pub const Message = union(MessageType) {
} else {
return error.InvalidStream;
}
- } else return error.InvalidStream;
+ } else |err| return err;
break :blk try std.fmt.parseUnsigned(usize, max_msgs_list.items, 10);
};
@@ -343,7 +344,7 @@ fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 {
try subject_list.append(alloc, byte);
}
- while (in.takeByte() catch null) |byte| {
+ while (in.takeByte()) |byte| {
if (std.ascii.isWhitespace(byte)) break;
if (std.ascii.isAscii(byte)) {
if (byte == '.') {
@@ -353,7 +354,7 @@ fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 {
}
try subject_list.append(alloc, byte);
}
- } else return error.InvalidStream;
+ } else |err| return err;
return subject_list.toOwnedSlice(alloc);
}
@@ -385,6 +386,13 @@ fn parsePub(in: *std.Io.Reader) !Message.Pub {
};
}
+inline fn assertStreamBytes(cond: bool) !void {
+ if (!cond) {
+ @branchHint(.unlikely);
+ return error.InvalidStream;
+ }
+}
+
// try returning error in debug mode, only null in release?
// pub fn parseNextMessage(alloc: std.mem.Allocator, in: *std.Io.Reader) ?Message {
// const message_type: MessageType = blk: {