summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/server/message_parser.zig366
1 files changed, 261 insertions, 105 deletions
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig
index 0d60df5..3adf704 100644
--- a/src/server/message_parser.zig
+++ b/src/server/message_parser.zig
@@ -168,6 +168,10 @@ pub const Message = union(enum) {
sid: []const u8,
/// A number of messages to wait for before automatically unsubscribing.
max_msgs: ?usize = null,
+
+ pub fn deinit(self: Unsub, alloc: std.mem.Allocator) void {
+ alloc.free(self.sid);
+ }
};
pub const Msg = struct {
subject: []const u8,
@@ -231,6 +235,8 @@ pub const Message = union(enum) {
break :blk .initBuffer(&buf);
};
+ std.log.debug("buffered: '{s}'", .{in.buffered()});
+
while (in.peekByte()) |byte| {
if (std.ascii.isUpper(byte)) {
try operation_string.appendBounded(byte);
@@ -239,9 +245,12 @@ pub const Message = union(enum) {
} else |err| return err;
const operation = parse(operation_string.items) orelse {
+ std.log.err("Invalid operation: '{s}'", .{operation_string.items});
return error.InvalidOperation;
};
+ errdefer std.log.err("Failed to parse {s}", .{operation_string.items});
+
switch (operation) {
.connect => {
// for storing the json string
@@ -290,104 +299,239 @@ pub const Message = union(enum) {
return .pong;
},
.sub => {
- if (!std.ascii.isWhitespace(try in.takeByte())) {
- @branchHint(.unlikely);
- return error.InvalidStream;
- }
- const subject = try readSubject(alloc, in, .sub);
- errdefer alloc.free(subject);
- const second = blk: {
- // Drop whitespace
- while (in.peekByte()) |byte| {
- if (std.ascii.isWhitespace(byte)) {
- in.toss(1);
- } else break;
- } else |err| return err;
-
- var acc: std.ArrayList(u8) = .empty;
- errdefer acc.deinit(alloc);
- while (in.peekByte()) |byte| {
- if (std.ascii.isWhitespace(byte)) break;
- try acc.append(alloc, byte);
- in.toss(1);
- } else |err| return err;
-
- break :blk try acc.toOwnedSlice(alloc);
- };
- errdefer alloc.free(second);
- const queue_group = if ((try in.peekByte()) != '\r') second else null;
- // We do not need an errdefer free for queue group, because it will only be second (already has errdefer free) or null.
- const sid = if (queue_group) |_| try alloc.dupe(u8, try in.takeDelimiterExclusive('\r')) else second;
- // if queue_group is null, that means sid is second, and already has an errdefer free.
- errdefer if (queue_group) |_| alloc.free(sid);
- try expectStreamBytes(in, "\r\n");
- return .{
- .sub = .{
- .subject = subject,
- .queue_group = queue_group,
- .sid = sid,
- },
- };
+ return parseSub(alloc, in);
},
.unsub => {
- if (!std.ascii.isWhitespace(try in.takeByte())) {
- @branchHint(.unlikely);
- return error.InvalidStream;
- }
- // Parse sid
- const sid = blk: {
- var acc: std.ArrayList(u8) = .empty;
- errdefer acc.deinit(alloc);
- while (in.peekByte()) |byte| {
- if (std.ascii.isWhitespace(byte)) break;
- try acc.append(alloc, byte);
- in.toss(1);
- } else |err| return err;
- break :blk try acc.toOwnedSlice(alloc);
- };
- errdefer alloc.free(sid);
-
- if ((try in.peekByte()) == '\r') {
- try expectStreamBytes(in, "\r\n");
- return .{
- .unsub = .{
- .sid = sid,
- },
- };
- } else if (std.ascii.isWhitespace(try in.peekByte())) {
- in.toss(1);
- const max_msgs = blk: {
- var max_msgs_list: std.ArrayList(u8) = .empty;
- errdefer max_msgs_list.deinit(alloc);
- while (in.peekByte()) |byte| {
- if (std.ascii.isWhitespace(byte)) {
- try expectStreamBytes(in, "\r\n");
- break;
- }
-
- if (std.ascii.isDigit(byte)) {
- try max_msgs_list.append(alloc, byte);
- } else {
- return error.InvalidStream;
- }
- } else |err| return err;
-
- break :blk try std.fmt.parseUnsigned(usize, max_msgs_list.items, 10);
- };
-
- return .{
- .unsub = .{
- .sid = sid,
- .max_msgs = max_msgs,
- },
- };
- } else return error.InvalidStream;
+ return parseUnsub(alloc, in);
},
else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}),
}
}
};
+fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
+ try in.discardAll(1); // throw away space
+ const subject = try readSubject(alloc, in, .sub);
+
+ const States = enum {
+ before_second,
+ in_second,
+ after_second,
+ in_third,
+ in_end,
+ };
+
+ var second: std.ArrayList(u8) = .empty;
+ errdefer second.deinit(alloc);
+ var third: ?std.ArrayList(u8) = null;
+ errdefer if (third) |*t| t.deinit(alloc);
+
+ sw: switch (@as(States, .before_second)) {
+ .before_second => {
+ const byte = try in.peekByte();
+ if (std.ascii.isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .before_second;
+ }
+ continue :sw .in_second;
+ },
+ .in_second => {
+ const byte = try in.peekByte();
+ if (!std.ascii.isWhitespace(byte)) {
+ try second.append(alloc, byte);
+ in.toss(1);
+ continue :sw .in_second;
+ }
+ continue :sw .after_second;
+ },
+ .after_second => {
+ const byte = try in.peekByte();
+ if (byte == '\r') {
+ continue :sw .in_end;
+ } else if (std.ascii.isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .after_second;
+ }
+ third = .empty;
+ continue :sw .in_third;
+ },
+ .in_third => {
+ const byte = try in.peekByte();
+ if (byte == '\r') {
+ continue :sw .in_end;
+ }
+ try third.?.append(alloc, byte);
+ in.toss(1);
+ },
+ .in_end => {
+ try expectStreamBytes(in, "\r\n");
+ },
+ }
+
+ return .{
+ .sub = .{
+ .subject = subject,
+ .queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null,
+ .sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc),
+ },
+ };
+}
+
+test parseSub {
+ {
+ var in: std.Io.Reader = .fixed(" foo 1\r\n");
+ var res = try parseSub(std.testing.allocator, &in);
+ defer res.sub.deinit(std.testing.allocator);
+ try std.testing.expectEqualDeep(
+ Message{
+ .sub = .{
+ .subject = "foo",
+ .queue_group = null,
+ .sid = "1",
+ },
+ },
+ res,
+ );
+ }
+ {
+ var in: std.Io.Reader = .fixed(" foo 1\r\n");
+ var res = try parseSub(std.testing.allocator, &in);
+ defer res.sub.deinit(std.testing.allocator);
+ try std.testing.expectEqualDeep(
+ Message{
+ .sub = .{
+ .subject = "foo",
+ .queue_group = null,
+ .sid = "1",
+ },
+ },
+ res,
+ );
+ }
+ {
+ var in: std.Io.Reader = .fixed(" foo q 1\r\n");
+ var res = try parseSub(std.testing.allocator, &in);
+ defer res.sub.deinit(std.testing.allocator);
+ try std.testing.expectEqualDeep(
+ Message{
+ .sub = .{
+ .subject = "foo",
+ .queue_group = "q",
+ .sid = "1",
+ },
+ },
+ res,
+ );
+ }
+ {
+ var in: std.Io.Reader = .fixed(" 1 q 1\r\n");
+ var res = try parseSub(std.testing.allocator, &in);
+ defer res.sub.deinit(std.testing.allocator);
+ try std.testing.expectEqualDeep(
+ Message{
+ .sub = .{
+ .subject = "1",
+ .queue_group = "q",
+ .sid = "1",
+ },
+ },
+ res,
+ );
+ }
+ {
+ var in: std.Io.Reader = .fixed(" $SRV.PING 4\r\n");
+ var res = try parseSub(std.testing.allocator, &in);
+ defer res.sub.deinit(std.testing.allocator);
+ try std.testing.expectEqualDeep(
+ Message{
+ .sub = .{
+ .subject = "$SRV.PING",
+ .queue_group = null,
+ .sid = "4",
+ },
+ },
+ res,
+ );
+ }
+}
+
+fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
+ try in.discardAll(1); // throw away space
+ var first: std.ArrayList(u8) = .empty;
+ errdefer first.deinit(alloc);
+
+ while (in.peekByte()) |byte| {
+ if (std.ascii.isWhitespace(byte)) break;
+ try first.append(alloc, byte);
+ in.toss(1);
+ } else |err| return err;
+
+ while (in.peekByte()) |byte| {
+ if (!std.ascii.isWhitespace(byte) or byte == '\r') break;
+ in.toss(1);
+ } else |err| return err;
+
+ if (try in.peekByte() == '\r') {
+ try expectStreamBytes(in, "\r\n");
+ return .{
+ .unsub = .{
+ .sid = try first.toOwnedSlice(alloc),
+ },
+ };
+ } else {
+ var second: std.ArrayList(u8) = .empty;
+ defer second.deinit(alloc);
+
+ while (in.peekByte()) |byte| {
+ if (std.ascii.isWhitespace(byte)) break;
+ try second.append(alloc, byte);
+ in.toss(1);
+ } else |err| return err;
+
+ try expectStreamBytes(in, "\r\n");
+ return .{
+ .unsub = .{
+ .max_msgs = try std.fmt.parseUnsigned(usize, second.items, 10),
+ .sid = try first.toOwnedSlice(alloc),
+ },
+ };
+ }
+}
+
+test parseUnsub {
+ {
+ var in: std.Io.Reader = .fixed(" 1\r\n");
+ var res = try parseUnsub(std.testing.allocator, &in);
+ defer res.unsub.deinit(std.testing.allocator);
+ try std.testing.expectEqualDeep(
+ Message{
+ .unsub = .{
+ .sid = "1",
+ .max_msgs = null,
+ },
+ },
+ res,
+ );
+ try std.testing.expectEqual(0, in.buffered().len);
+ }
+
+ {
+ var in: std.Io.Reader = .fixed(" 1 1\r\n");
+ var res = try parseUnsub(std.testing.allocator, &in);
+ defer res.unsub.deinit(std.testing.allocator);
+ try std.testing.expectEqualDeep(
+ Message{
+ .unsub = .{
+ .sid = "1",
+ .max_msgs = 1,
+ },
+ },
+ res,
+ );
+ try std.testing.expectEqual(0, in.buffered().len);
+ }
+}
+
fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
try in.discardAll(1); // throw away space
@@ -483,13 +627,16 @@ test parsePub {
var res = try parsePub(std.testing.allocator, &in);
defer res.@"pub".deinit(std.testing.allocator);
try std.testing.expectEqualDeep(
- Message{ .@"pub" = .{
- .subject = "foo",
- .reply_to = null,
- .payload = "bar",
- } },
+ Message{
+ .@"pub" = .{
+ .subject = "foo",
+ .reply_to = null,
+ .payload = "bar",
+ },
+ },
res,
);
+ try std.testing.expectEqual(0, in.buffered().len);
}
{
@@ -497,13 +644,16 @@ test parsePub {
var res = try parsePub(std.testing.allocator, &in);
defer res.@"pub".deinit(std.testing.allocator);
try std.testing.expectEqualDeep(
- Message{ .@"pub" = .{
- .subject = "foo",
- .reply_to = "reply.to",
- .payload = "bar",
- } },
+ Message{
+ .@"pub" = .{
+ .subject = "foo",
+ .reply_to = "reply.to",
+ .payload = "bar",
+ },
+ },
res,
);
+ try std.testing.expectEqual(0, in.buffered().len);
}
// numeric reply subject
@@ -512,13 +662,16 @@ test parsePub {
var res = try parsePub(std.testing.allocator, &in);
defer res.@"pub".deinit(std.testing.allocator);
try std.testing.expectEqualDeep(
- Message{ .@"pub" = .{
- .subject = "foo",
- .reply_to = "5",
- .payload = "bar",
- } },
+ Message{
+ .@"pub" = .{
+ .subject = "foo",
+ .reply_to = "5",
+ .payload = "bar",
+ },
+ },
res,
);
+ try std.testing.expectEqual(0, in.buffered().len);
}
}
@@ -658,6 +811,7 @@ test parseHPub {
},
res,
);
+ try std.testing.expectEqual(0, in.buffered().len);
}
{
@@ -677,6 +831,7 @@ test parseHPub {
},
res,
);
+ try std.testing.expectEqual(0, in.buffered().len);
}
{
@@ -696,6 +851,7 @@ test parseHPub {
},
res,
);
+ try std.testing.expectEqual(0, in.buffered().len);
}
}