From 4bf5ddca1508fc485238d9bfebfe67740a7668b1 Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Mon, 29 Dec 2025 01:34:10 +0000 Subject: publish works starting to use errors instead of unreachable for stream parsing --- src/server/client.zig | 11 ----------- src/server/main.zig | 1 + src/server/message_parser.zig | 44 +++++++++++++++++++++++++------------------ 3 files changed, 27 insertions(+), 29 deletions(-) (limited to 'src/server') diff --git a/src/server/client.zig b/src/server/client.zig index c7aa41a..8ad1a7a 100644 --- a/src/server/client.zig +++ b/src/server/client.zig @@ -109,17 +109,6 @@ pub fn writeInfo(out: *std.Io.Writer, info: Message.ServerInfo) !void { } fn writeMsg(out: *std.Io.Writer, msg: Message.Msg) !void { - std.debug.print("PRINTING MESSAGE\n\n\n\n", .{}); - std.debug.print( - "MSG {s} {s} {s} {d}\r\n{s}\r\n-\n\n\n", - .{ - msg.subject, - msg.sid, - msg.reply_to orelse "", - msg.payload.len, - msg.payload, - }, - ); try out.print( "MSG {s} {s} {s} {d}\r\n{s}\r\n", .{ diff --git a/src/server/main.zig b/src/server/main.zig index f3702e9..aa452bc 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -180,6 +180,7 @@ fn publishMessage(server: *Server, io: std.Io, msg: Message.Pub) !void { } fn subscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void { + std.debug.print("Recieved SUBSCRIBE message: {any}\n\n", .{msg}); var subs_for_subject: std.AutoHashMapUnmanaged(usize, []const u8) = if (server.subscriptions.fetchRemove(msg.subject)) |s| s.value else .empty; try subs_for_subject.put(gpa, id, msg.sid); try server.subscriptions.put(gpa, msg.subject, subs_for_subject); diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index ed5bc76..f99dfcb 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -206,7 +206,7 @@ pub const Message = union(MessageType) { // Parse byte count const byte_count = blk: { var byte_count_list: std.ArrayList(u8) = try .initCapacity(alloc, 64); - while (in.takeByte() catch null) |byte| { + while (in.takeByte()) |byte| { if (std.ascii.isWhitespace(byte)) { std.debug.assert(byte == '\r'); std.debug.assert(try in.takeByte() == '\n'); @@ -218,7 +218,7 @@ pub const Message = union(MessageType) { } else { return error.InvalidStream; } - } else return error.InvalidStream; + } else |err| return err; break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10); }; @@ -226,7 +226,7 @@ pub const Message = union(MessageType) { const payload = blk: { const bytes = try alloc.alloc(u8, byte_count); try in.readSliceAll(bytes); - std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n")); + try assertStreamBytes(std.mem.eql(u8, try in.take(2), "\r\n")); break :blk bytes; }; @@ -238,34 +238,35 @@ pub const Message = union(MessageType) { }; }, .ping => { - std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n")); + try assertStreamBytes(std.mem.eql(u8, try in.take(2), "\r\n")); return .ping; }, .pong => { - std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n")); + try assertStreamBytes(std.mem.eql(u8, try in.take(2), "\r\n")); return .pong; }, .sub => { - std.debug.assert(std.ascii.isWhitespace(try in.takeByte())); + try assertStreamBytes(std.ascii.isWhitespace(try in.takeByte())); const subject = try readSubject(alloc, in); const second = blk: { // Drop whitespace - while (in.peekByte() catch null) |byte| { + while (in.peekByte()) |byte| { if (std.ascii.isWhitespace(byte)) { in.toss(1); } else break; - } else return error.InvalidStream; + } else |err| return err; var acc: std.ArrayList(u8) = try .initCapacity(alloc, 32); - while (in.takeByte() catch null) |byte| { + while (in.takeByte()) |byte| { if (std.ascii.isWhitespace(byte)) break; try acc.append(alloc, byte); - } else return error.InvalidStream; + } else |err| return err; break :blk try acc.toOwnedSlice(alloc); }; const queue_group = if ((try in.peekByte()) != '\r') second else null; const sid = if (queue_group) |_| try in.takeDelimiterExclusive('\r') else second; + std.debug.print("SID is '{s}'\n", .{sid}); std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n")); return .{ .sub = .{ @@ -276,20 +277,20 @@ pub const Message = union(MessageType) { }; }, .unsub => { - std.debug.assert(std.ascii.isWhitespace(try in.takeByte())); + try assertStreamBytes(std.ascii.isWhitespace(try in.takeByte())); // Parse byte count const sid = blk: { var acc: std.ArrayList(u8) = try .initCapacity(alloc, 8); - while (in.peekByte() catch null) |byte| { + while (in.peekByte()) |byte| { if (std.ascii.isWhitespace(byte)) break; try acc.append(alloc, byte); in.toss(1); - } else return error.InvalidStream; + } else |err| return err; break :blk try acc.toOwnedSlice(alloc); }; if ((try in.peekByte()) == '\r') { - std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n")); + try assertStreamBytes(std.mem.eql(u8, try in.take(2), "\r\n")); return .{ .unsub = .{ .sid = sid, @@ -299,7 +300,7 @@ pub const Message = union(MessageType) { in.toss(1); const max_msgs = blk: { var max_msgs_list: std.ArrayList(u8) = try .initCapacity(alloc, 64); - while (in.takeByte() catch null) |byte| { + while (in.takeByte()) |byte| { if (std.ascii.isWhitespace(byte)) { std.debug.assert(byte == '\r'); std.debug.assert(try in.takeByte() == '\n'); @@ -311,7 +312,7 @@ pub const Message = union(MessageType) { } else { return error.InvalidStream; } - } else return error.InvalidStream; + } else |err| return err; break :blk try std.fmt.parseUnsigned(usize, max_msgs_list.items, 10); }; @@ -343,7 +344,7 @@ fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 { try subject_list.append(alloc, byte); } - while (in.takeByte() catch null) |byte| { + while (in.takeByte()) |byte| { if (std.ascii.isWhitespace(byte)) break; if (std.ascii.isAscii(byte)) { if (byte == '.') { @@ -353,7 +354,7 @@ fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 { } try subject_list.append(alloc, byte); } - } else return error.InvalidStream; + } else |err| return err; return subject_list.toOwnedSlice(alloc); } @@ -385,6 +386,13 @@ fn parsePub(in: *std.Io.Reader) !Message.Pub { }; } +inline fn assertStreamBytes(cond: bool) !void { + if (!cond) { + @branchHint(.unlikely); + return error.InvalidStream; + } +} + // try returning error in debug mode, only null in release? // pub fn parseNextMessage(alloc: std.mem.Allocator, in: *std.Io.Reader) ?Message { // const message_type: MessageType = blk: { -- cgit