summaryrefslogtreecommitdiff
path: root/src/server/message_parser.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/message_parser.zig')
-rw-r--r--src/server/message_parser.zig59
1 files changed, 57 insertions, 2 deletions
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig
index 5669d2a..0ff3dd4 100644
--- a/src/server/message_parser.zig
+++ b/src/server/message_parser.zig
@@ -38,7 +38,7 @@ pub const Message = union(MessageType) {
@"pub": Pub,
hpub: void,
sub: Sub,
- unsub: void,
+ unsub: Unsub,
msg: Msg,
hmsg: void,
ping,
@@ -117,6 +117,12 @@ pub const Message = union(MessageType) {
/// A unique alphanumeric subscription ID, generated by the client.
sid: []const u8,
};
+ pub const Unsub = struct {
+ /// The unique alphanumeric subscription ID of the subject to unsubscribe from.
+ sid: []const u8,
+ /// A number of messages to wait for before automatically unsubscribing.
+ max_msgs: ?usize = null,
+ };
pub const Msg = struct {
subject: []const u8,
sid: []const u8,
@@ -225,7 +231,7 @@ pub const Message = union(MessageType) {
}
} else return error.InvalidStream;
- break :blk try std.fmt.parseUnsigned(u64, byte_count_list.items, 10);
+ break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
};
const payload = blk: {
@@ -280,6 +286,55 @@ pub const Message = union(MessageType) {
},
};
},
+ .unsub => {
+ std.debug.assert(std.ascii.isWhitespace(try in.takeByte()));
+ // Parse byte count
+ const sid = blk: {
+ var acc: std.ArrayList(u8) = try .initCapacity(alloc, 8);
+ while (in.peekByte() catch null) |byte| {
+ if (std.ascii.isWhitespace(byte)) break;
+ try acc.append(alloc, byte);
+ in.toss(1);
+ } else return error.InvalidStream;
+ break :blk try acc.toOwnedSlice(alloc);
+ };
+
+ if ((try in.peekByte()) == '\r') {
+ std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n"));
+ return .{
+ .unsub = .{
+ .sid = sid,
+ },
+ };
+ } else if (std.ascii.isWhitespace(try in.peekByte())) {
+ in.toss(1);
+ const max_msgs = blk: {
+ var max_msgs_list: std.ArrayList(u8) = try .initCapacity(alloc, 64);
+ while (in.takeByte() catch null) |byte| {
+ if (std.ascii.isWhitespace(byte)) {
+ std.debug.assert(byte == '\r');
+ std.debug.assert(try in.takeByte() == '\n');
+ break;
+ }
+
+ if (std.ascii.isDigit(byte)) {
+ try max_msgs_list.append(alloc, byte);
+ } else {
+ return error.InvalidStream;
+ }
+ } else return error.InvalidStream;
+
+ break :blk try std.fmt.parseUnsigned(usize, max_msgs_list.items, 10);
+ };
+
+ return .{
+ .unsub = .{
+ .sid = sid,
+ .max_msgs = max_msgs,
+ },
+ };
+ } else return error.InvalidStream;
+ },
else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}),
}
}