diff options
Diffstat (limited to 'src/server/message_parser.zig')
| -rw-r--r-- | src/server/message_parser.zig | 59 |
1 files changed, 57 insertions, 2 deletions
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index 5669d2a..0ff3dd4 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -38,7 +38,7 @@ pub const Message = union(MessageType) { @"pub": Pub, hpub: void, sub: Sub, - unsub: void, + unsub: Unsub, msg: Msg, hmsg: void, ping, @@ -117,6 +117,12 @@ pub const Message = union(MessageType) { /// A unique alphanumeric subscription ID, generated by the client. sid: []const u8, }; + pub const Unsub = struct { + /// The unique alphanumeric subscription ID of the subject to unsubscribe from. + sid: []const u8, + /// A number of messages to wait for before automatically unsubscribing. + max_msgs: ?usize = null, + }; pub const Msg = struct { subject: []const u8, sid: []const u8, @@ -225,7 +231,7 @@ pub const Message = union(MessageType) { } } else return error.InvalidStream; - break :blk try std.fmt.parseUnsigned(u64, byte_count_list.items, 10); + break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10); }; const payload = blk: { @@ -280,6 +286,55 @@ pub const Message = union(MessageType) { }, }; }, + .unsub => { + std.debug.assert(std.ascii.isWhitespace(try in.takeByte())); + // Parse byte count + const sid = blk: { + var acc: std.ArrayList(u8) = try .initCapacity(alloc, 8); + while (in.peekByte() catch null) |byte| { + if (std.ascii.isWhitespace(byte)) break; + try acc.append(alloc, byte); + in.toss(1); + } else return error.InvalidStream; + break :blk try acc.toOwnedSlice(alloc); + }; + + if ((try in.peekByte()) == '\r') { + std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n")); + return .{ + .unsub = .{ + .sid = sid, + }, + }; + } else if (std.ascii.isWhitespace(try in.peekByte())) { + in.toss(1); + const max_msgs = blk: { + var max_msgs_list: std.ArrayList(u8) = try .initCapacity(alloc, 64); + while (in.takeByte() catch null) |byte| { + if (std.ascii.isWhitespace(byte)) { + std.debug.assert(byte == '\r'); + std.debug.assert(try in.takeByte() == '\n'); + break; + } + + if (std.ascii.isDigit(byte)) { + try max_msgs_list.append(alloc, byte); + } else { + return error.InvalidStream; + } + } else return error.InvalidStream; + + break :blk try std.fmt.parseUnsigned(usize, max_msgs_list.items, 10); + }; + + return .{ + .unsub = .{ + .sid = sid, + .max_msgs = max_msgs, + }, + }; + } else return error.InvalidStream; + }, else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}), } } |
