summaryrefslogtreecommitdiff
path: root/src/server/message_parser.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/message_parser.zig')
-rw-r--r--src/server/message_parser.zig291
1 files changed, 248 insertions, 43 deletions
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig
index 68e7a20..5c86261 100644
--- a/src/server/message_parser.zig
+++ b/src/server/message_parser.zig
@@ -35,11 +35,11 @@ pub const Message = union(MessageType) {
info: ServerInfo,
connect: Connect,
@"pub": Pub,
- hpub: void,
+ hpub: HPub,
sub: Sub,
unsub: Unsub,
msg: Msg,
- hmsg: void,
+ hmsg: HMsg,
ping,
pong,
@"+ok": void,
@@ -148,6 +148,36 @@ pub const Message = union(MessageType) {
return res.dupe(alloc);
}
};
+ pub const HPub = struct {
+ header_bytes: usize,
+ @"pub": Pub,
+
+ pub fn deinit(self: HPub, alloc: std.mem.Allocator) void {
+ self.@"pub".deinit(alloc);
+ }
+
+ pub fn toHMsg(self: HPub, alloc: std.mem.Allocator, sid: []const u8) !HMsg {
+ return .{
+ .header_bytes = self.header_bytes,
+ .msg = try self.@"pub".toMsg(alloc, sid),
+ };
+ }
+ };
+
+ pub const HMsg = struct {
+ header_bytes: usize,
+ msg: Msg,
+
+ pub fn deinit(self: HMsg, alloc: std.mem.Allocator) void {
+ self.msg.deinit(alloc);
+ }
+
+ pub fn dupe(self: HMsg, alloc: std.mem.Allocator) !HMsg {
+ var res = self;
+ res.msg = try self.msg.dupe(alloc);
+ return res;
+ }
+ };
pub const Sub = struct {
/// The subject name to subscribe to.
subject: []const u8,
@@ -235,6 +265,7 @@ pub const Message = union(MessageType) {
return error.InvalidOperation;
};
+ std.log.debug("parsing {s}", .{operation_string.items});
switch (operation) {
.connect => {
// for storing the json string
@@ -265,47 +296,10 @@ pub const Message = union(MessageType) {
return .{ .connect = try res.dupe(alloc) };
},
.@"pub" => {
- try in.discardAll(1); // throw away space
-
- // Parse subject
- const subject: []const u8 = try readSubject(alloc, in);
- errdefer alloc.free(subject);
-
- // Parse byte count
- const byte_count = blk: {
- var byte_count_list: std.ArrayList(u8) = try .initCapacity(alloc, 64);
- defer byte_count_list.deinit(alloc);
- while (in.peekByte()) |byte| {
- if (std.ascii.isWhitespace(byte)) {
- try expectStreamBytes(in, "\r\n");
- break;
- }
- defer in.toss(1);
-
- if (std.ascii.isDigit(byte)) {
- try byte_count_list.append(alloc, byte);
- } else {
- return error.InvalidStream;
- }
- } else |err| return err;
-
- break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
- };
-
- const payload = blk: {
- const bytes = try alloc.alloc(u8, byte_count);
- errdefer alloc.free(bytes);
- try in.readSliceAll(bytes);
- try expectStreamBytes(in, "\r\n");
- break :blk bytes;
- };
-
- return .{
- .@"pub" = .{
- .subject = subject,
- .payload = payload,
- },
- };
+ return parsePub(alloc, in);
+ },
+ .hpub => {
+ return parseHPub(alloc, in);
},
.ping => {
try expectStreamBytes(in, "\r\n");
@@ -414,6 +408,217 @@ pub const Message = union(MessageType) {
}
};
+fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
+ try in.discardAll(1); // throw away space
+
+ // Parse subject
+ const subject: []const u8 = try readSubject(alloc, in);
+ errdefer alloc.free(subject);
+
+ const second = blk: {
+ // Drop whitespace
+ while (in.peekByte()) |byte| {
+ if (std.ascii.isWhitespace(byte)) {
+ in.toss(1);
+ } else break;
+ } else |err| return err;
+
+ var acc: std.ArrayList(u8) = .empty;
+ errdefer acc.deinit(alloc);
+ while (in.peekByte()) |byte| {
+ if (std.ascii.isWhitespace(byte)) break;
+ try acc.append(alloc, byte);
+ in.toss(1);
+ } else |err| return err;
+
+ break :blk try acc.toOwnedSlice(alloc);
+ };
+ defer alloc.free(second);
+
+ const byte_count: usize, const reply_to: ?[]const u8 =
+ if (std.fmt.parseUnsigned(usize, second, 10)) |s| blk: {
+ try expectStreamBytes(in, "\r\n");
+ break :blk .{ s, null };
+ } else |_| .{
+ blk: {
+ var byte_count_list: std.ArrayList(u8) = .empty;
+ defer byte_count_list.deinit(alloc);
+ try in.discardAll(1); // discard space
+ while (in.peekByte()) |byte| {
+ if (std.ascii.isWhitespace(byte)) {
+ try expectStreamBytes(in, "\r\n");
+ break;
+ }
+ defer in.toss(1);
+ if (std.ascii.isDigit(byte)) {
+ try byte_count_list.append(alloc, byte);
+ } else {
+ return error.InvalidStream;
+ }
+ } else |err| return err;
+ break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
+ },
+ try alloc.dupe(u8, second),
+ };
+
+ const payload = blk: {
+ const bytes = try alloc.alloc(u8, byte_count);
+ errdefer alloc.free(bytes);
+ try in.readSliceAll(bytes);
+ try expectStreamBytes(in, "\r\n");
+ break :blk bytes;
+ };
+
+ return .{
+ .@"pub" = .{
+ .subject = subject,
+ .payload = payload,
+ .reply_to = reply_to,
+ },
+ };
+}
+
+test parsePub {
+ {
+ var in: std.Io.Reader = .fixed(" foo 3\r\nbar\r\n");
+ var res = try parsePub(std.testing.allocator, &in);
+ defer res.@"pub".deinit(std.testing.allocator);
+ try std.testing.expectEqualDeep(
+ Message{ .@"pub" = .{
+ .subject = "foo",
+ .reply_to = null,
+ .payload = "bar",
+ } },
+ res,
+ );
+ }
+
+ {
+ var in: std.Io.Reader = .fixed(" foo reply.to 3\r\nbar\r\n");
+ var res = try parsePub(std.testing.allocator, &in);
+ defer res.@"pub".deinit(std.testing.allocator);
+ try std.testing.expectEqualDeep(
+ Message{ .@"pub" = .{
+ .subject = "foo",
+ .reply_to = "reply.to",
+ .payload = "bar",
+ } },
+ res,
+ );
+ }
+
+ // numeric reply subject
+ {
+ var in: std.Io.Reader = .fixed(" foo 5 3\r\nbar\r\n");
+ var res = try parsePub(std.testing.allocator, &in);
+ defer res.@"pub".deinit(std.testing.allocator);
+ try std.testing.expectEqualDeep(
+ Message{ .@"pub" = .{
+ .subject = "foo",
+ .reply_to = "reply.to",
+ .payload = "bar",
+ } },
+ res,
+ );
+ }
+}
+
+fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
+ try in.discardAll(1); // throw away space
+
+ // Parse subject
+ const subject: []const u8 = try readSubject(alloc, in);
+ errdefer alloc.free(subject);
+
+ const second = blk: {
+ // Drop whitespace
+ while (in.peekByte()) |byte| {
+ if (std.ascii.isWhitespace(byte)) {
+ in.toss(1);
+ } else break;
+ } else |err| return err;
+
+ var acc: std.ArrayList(u8) = .empty;
+ errdefer acc.deinit(alloc);
+ while (in.peekByte()) |byte| {
+ if (std.ascii.isWhitespace(byte)) break;
+ try acc.append(alloc, byte);
+ in.toss(1);
+ } else |err| return err;
+
+ break :blk try acc.toOwnedSlice(alloc);
+ };
+ errdefer alloc.free(second);
+
+ const header_byte_count: usize, const reply_to: ?[]const u8 =
+ if (std.fmt.parseUnsigned(usize, second, 10)) |s| blk: {
+ try expectStreamBytes(in, "\r\n");
+ break :blk .{ s, null };
+ } else |_| .{
+ blk: {
+ var byte_count_list: std.ArrayList(u8) = .empty;
+ defer byte_count_list.deinit(alloc);
+ try in.discardAll(1); // discard space
+ while (in.peekByte()) |byte| {
+ if (std.ascii.isWhitespace(byte)) {
+ try expectStreamBytes(in, "\r\n");
+ break;
+ }
+ defer in.toss(1);
+ if (std.ascii.isDigit(byte)) {
+ try byte_count_list.append(alloc, byte);
+ } else {
+ return error.InvalidStream;
+ }
+ } else |err| return err;
+ break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
+ },
+ second,
+ };
+
+ std.log.debug("buffered: '{s}'", .{in.buffered()});
+
+ // Parse byte count
+ const byte_count = blk: {
+ var byte_count_list: std.ArrayList(u8) = .empty;
+ defer byte_count_list.deinit(alloc);
+ while (in.peekByte()) |byte| {
+ if (std.ascii.isWhitespace(byte)) {
+ try expectStreamBytes(in, "\r\n");
+ break;
+ }
+ defer in.toss(1);
+
+ if (std.ascii.isDigit(byte)) {
+ try byte_count_list.append(alloc, byte);
+ } else {
+ return error.InvalidStream;
+ }
+ } else |err| return err;
+
+ break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
+ };
+
+ const payload = blk: {
+ const bytes = try alloc.alloc(u8, byte_count);
+ errdefer alloc.free(bytes);
+ try in.readSliceAll(bytes);
+ try expectStreamBytes(in, "\r\n");
+ break :blk bytes;
+ };
+
+ return .{
+ .hpub = .{
+ .header_bytes = header_byte_count,
+ .@"pub" = .{
+ .subject = subject,
+ .payload = payload,
+ .reply_to = reply_to,
+ },
+ },
+ };
+}
+
fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 {
var subject_list: std.ArrayList(u8) = try .initCapacity(alloc, 1024);
errdefer subject_list.deinit(alloc);