diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-02 16:01:35 +0000 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-02 18:10:25 +0000 |
| commit | cd5281030ee6cede5a39f8360d47c6c9ed9269d3 (patch) | |
| tree | b639b5f6d9e9fdbdb6d6a0b080f8886f3eec05f8 /src/server/message_parser.zig | |
| parent | 2be370e379959e2763e70851cf14ecfca07754fc (diff) | |
Diffstat (limited to 'src/server/message_parser.zig')
| -rw-r--r-- | src/server/message_parser.zig | 291 |
1 files changed, 248 insertions, 43 deletions
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index 68e7a20..5c86261 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -35,11 +35,11 @@ pub const Message = union(MessageType) { info: ServerInfo, connect: Connect, @"pub": Pub, - hpub: void, + hpub: HPub, sub: Sub, unsub: Unsub, msg: Msg, - hmsg: void, + hmsg: HMsg, ping, pong, @"+ok": void, @@ -148,6 +148,36 @@ pub const Message = union(MessageType) { return res.dupe(alloc); } }; + pub const HPub = struct { + header_bytes: usize, + @"pub": Pub, + + pub fn deinit(self: HPub, alloc: std.mem.Allocator) void { + self.@"pub".deinit(alloc); + } + + pub fn toHMsg(self: HPub, alloc: std.mem.Allocator, sid: []const u8) !HMsg { + return .{ + .header_bytes = self.header_bytes, + .msg = try self.@"pub".toMsg(alloc, sid), + }; + } + }; + + pub const HMsg = struct { + header_bytes: usize, + msg: Msg, + + pub fn deinit(self: HMsg, alloc: std.mem.Allocator) void { + self.msg.deinit(alloc); + } + + pub fn dupe(self: HMsg, alloc: std.mem.Allocator) !HMsg { + var res = self; + res.msg = try self.msg.dupe(alloc); + return res; + } + }; pub const Sub = struct { /// The subject name to subscribe to. subject: []const u8, @@ -235,6 +265,7 @@ pub const Message = union(MessageType) { return error.InvalidOperation; }; + std.log.debug("parsing {s}", .{operation_string.items}); switch (operation) { .connect => { // for storing the json string @@ -265,47 +296,10 @@ pub const Message = union(MessageType) { return .{ .connect = try res.dupe(alloc) }; }, .@"pub" => { - try in.discardAll(1); // throw away space - - // Parse subject - const subject: []const u8 = try readSubject(alloc, in); - errdefer alloc.free(subject); - - // Parse byte count - const byte_count = blk: { - var byte_count_list: std.ArrayList(u8) = try .initCapacity(alloc, 64); - defer byte_count_list.deinit(alloc); - while (in.peekByte()) |byte| { - if (std.ascii.isWhitespace(byte)) { - try expectStreamBytes(in, "\r\n"); - break; - } - defer in.toss(1); - - if (std.ascii.isDigit(byte)) { - try byte_count_list.append(alloc, byte); - } else { - return error.InvalidStream; - } - } else |err| return err; - - break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10); - }; - - const payload = blk: { - const bytes = try alloc.alloc(u8, byte_count); - errdefer alloc.free(bytes); - try in.readSliceAll(bytes); - try expectStreamBytes(in, "\r\n"); - break :blk bytes; - }; - - return .{ - .@"pub" = .{ - .subject = subject, - .payload = payload, - }, - }; + return parsePub(alloc, in); + }, + .hpub => { + return parseHPub(alloc, in); }, .ping => { try expectStreamBytes(in, "\r\n"); @@ -414,6 +408,217 @@ pub const Message = union(MessageType) { } }; +fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { + try in.discardAll(1); // throw away space + + // Parse subject + const subject: []const u8 = try readSubject(alloc, in); + errdefer alloc.free(subject); + + const second = blk: { + // Drop whitespace + while (in.peekByte()) |byte| { + if (std.ascii.isWhitespace(byte)) { + in.toss(1); + } else break; + } else |err| return err; + + var acc: std.ArrayList(u8) = .empty; + errdefer acc.deinit(alloc); + while (in.peekByte()) |byte| { + if (std.ascii.isWhitespace(byte)) break; + try acc.append(alloc, byte); + in.toss(1); + } else |err| return err; + + break :blk try acc.toOwnedSlice(alloc); + }; + defer alloc.free(second); + + const byte_count: usize, const reply_to: ?[]const u8 = + if (std.fmt.parseUnsigned(usize, second, 10)) |s| blk: { + try expectStreamBytes(in, "\r\n"); + break :blk .{ s, null }; + } else |_| .{ + blk: { + var byte_count_list: std.ArrayList(u8) = .empty; + defer byte_count_list.deinit(alloc); + try in.discardAll(1); // discard space + while (in.peekByte()) |byte| { + if (std.ascii.isWhitespace(byte)) { + try expectStreamBytes(in, "\r\n"); + break; + } + defer in.toss(1); + if (std.ascii.isDigit(byte)) { + try byte_count_list.append(alloc, byte); + } else { + return error.InvalidStream; + } + } else |err| return err; + break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10); + }, + try alloc.dupe(u8, second), + }; + + const payload = blk: { + const bytes = try alloc.alloc(u8, byte_count); + errdefer alloc.free(bytes); + try in.readSliceAll(bytes); + try expectStreamBytes(in, "\r\n"); + break :blk bytes; + }; + + return .{ + .@"pub" = .{ + .subject = subject, + .payload = payload, + .reply_to = reply_to, + }, + }; +} + +test parsePub { + { + var in: std.Io.Reader = .fixed(" foo 3\r\nbar\r\n"); + var res = try parsePub(std.testing.allocator, &in); + defer res.@"pub".deinit(std.testing.allocator); + try std.testing.expectEqualDeep( + Message{ .@"pub" = .{ + .subject = "foo", + .reply_to = null, + .payload = "bar", + } }, + res, + ); + } + + { + var in: std.Io.Reader = .fixed(" foo reply.to 3\r\nbar\r\n"); + var res = try parsePub(std.testing.allocator, &in); + defer res.@"pub".deinit(std.testing.allocator); + try std.testing.expectEqualDeep( + Message{ .@"pub" = .{ + .subject = "foo", + .reply_to = "reply.to", + .payload = "bar", + } }, + res, + ); + } + + // numeric reply subject + { + var in: std.Io.Reader = .fixed(" foo 5 3\r\nbar\r\n"); + var res = try parsePub(std.testing.allocator, &in); + defer res.@"pub".deinit(std.testing.allocator); + try std.testing.expectEqualDeep( + Message{ .@"pub" = .{ + .subject = "foo", + .reply_to = "reply.to", + .payload = "bar", + } }, + res, + ); + } +} + +fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { + try in.discardAll(1); // throw away space + + // Parse subject + const subject: []const u8 = try readSubject(alloc, in); + errdefer alloc.free(subject); + + const second = blk: { + // Drop whitespace + while (in.peekByte()) |byte| { + if (std.ascii.isWhitespace(byte)) { + in.toss(1); + } else break; + } else |err| return err; + + var acc: std.ArrayList(u8) = .empty; + errdefer acc.deinit(alloc); + while (in.peekByte()) |byte| { + if (std.ascii.isWhitespace(byte)) break; + try acc.append(alloc, byte); + in.toss(1); + } else |err| return err; + + break :blk try acc.toOwnedSlice(alloc); + }; + errdefer alloc.free(second); + + const header_byte_count: usize, const reply_to: ?[]const u8 = + if (std.fmt.parseUnsigned(usize, second, 10)) |s| blk: { + try expectStreamBytes(in, "\r\n"); + break :blk .{ s, null }; + } else |_| .{ + blk: { + var byte_count_list: std.ArrayList(u8) = .empty; + defer byte_count_list.deinit(alloc); + try in.discardAll(1); // discard space + while (in.peekByte()) |byte| { + if (std.ascii.isWhitespace(byte)) { + try expectStreamBytes(in, "\r\n"); + break; + } + defer in.toss(1); + if (std.ascii.isDigit(byte)) { + try byte_count_list.append(alloc, byte); + } else { + return error.InvalidStream; + } + } else |err| return err; + break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10); + }, + second, + }; + + std.log.debug("buffered: '{s}'", .{in.buffered()}); + + // Parse byte count + const byte_count = blk: { + var byte_count_list: std.ArrayList(u8) = .empty; + defer byte_count_list.deinit(alloc); + while (in.peekByte()) |byte| { + if (std.ascii.isWhitespace(byte)) { + try expectStreamBytes(in, "\r\n"); + break; + } + defer in.toss(1); + + if (std.ascii.isDigit(byte)) { + try byte_count_list.append(alloc, byte); + } else { + return error.InvalidStream; + } + } else |err| return err; + + break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10); + }; + + const payload = blk: { + const bytes = try alloc.alloc(u8, byte_count); + errdefer alloc.free(bytes); + try in.readSliceAll(bytes); + try expectStreamBytes(in, "\r\n"); + break :blk bytes; + }; + + return .{ + .hpub = .{ + .header_bytes = header_byte_count, + .@"pub" = .{ + .subject = subject, + .payload = payload, + .reply_to = reply_to, + }, + }, + }; +} + fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 { var subject_list: std.ArrayList(u8) = try .initCapacity(alloc, 1024); errdefer subject_list.deinit(alloc); |
