From ca43a12b9b754d5a3c7e89b9bb8d622358db3795 Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Sun, 4 Jan 2026 21:09:35 -0500 Subject: Using separate queue for high throughput messages --- src/server/message_parser.zig | 60 +++++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 28 deletions(-) (limited to 'src/server/message_parser.zig') diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index 8b4859b..ff1a573 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -268,34 +268,7 @@ pub const Message = union(enum) { switch (operation) { .CONNECT => { - // for storing the json string - var connect_string_writer_allocating: AllocatingWriter = .init(alloc); - defer connect_string_writer_allocating.deinit(); - var connect_string_writer = &connect_string_writer_allocating.writer; - - // for parsing the json string - var connect_arena_allocator: ArenaAllocator = .init(alloc); - defer connect_arena_allocator.deinit(); - const connect_allocator = connect_arena_allocator.allocator(); - - try in.discardAll(1); // throw away space - - // Should read the next JSON object to the fixed buffer writer. - _ = try in.streamDelimiter(connect_string_writer, '}'); - try connect_string_writer.writeByte('}'); - try expectStreamBytes(in, "}\r\n"); // discard '}\r\n' - - const connect_str = try connect_string_writer_allocating.toOwnedSlice(); - defer alloc.free(connect_str); - // TODO: should be CONNECTION allocator - const res = try std.json.parseFromSliceLeaky( - Connect, - connect_allocator, - connect_str, - .{ .allocate = .alloc_always }, - ); - - return .{ .CONNECT = try res.dupe(alloc) }; + return parseConnect(alloc, in); }, .PUB => { @branchHint(.likely); @@ -324,6 +297,37 @@ pub const Message = union(enum) { } }; +fn parseConnect(alloc: Allocator, in: *Reader) !Message { + // for storing the json string + var connect_string_writer_allocating: AllocatingWriter = .init(alloc); + defer connect_string_writer_allocating.deinit(); + var connect_string_writer = &connect_string_writer_allocating.writer; + + // for parsing the json string + var connect_arena_allocator: ArenaAllocator = .init(alloc); + defer connect_arena_allocator.deinit(); + const connect_allocator = connect_arena_allocator.allocator(); + + try in.discardAll(1); // throw away space + + // Should read the next JSON object to the fixed buffer writer. + _ = try in.streamDelimiter(connect_string_writer, '}'); + try connect_string_writer.writeByte('}'); + try expectStreamBytes(in, "}\r\n"); // discard '}\r\n' + + const connect_str = try connect_string_writer_allocating.toOwnedSlice(); + defer alloc.free(connect_str); + // TODO: should be CONNECTION allocator + const res = try std.json.parseFromSliceLeaky( + Message.Connect, + connect_allocator, + connect_str, + .{ .allocate = .alloc_always }, + ); + + return .{ .CONNECT = try res.dupe(alloc) }; +} + fn parseSub(alloc: Allocator, in: *Reader) !Message { try in.discardAll(1); // throw away space const subject = try readSubject(alloc, in, .sub); -- cgit