summaryrefslogtreecommitdiff
path: root/src/server/message_parser.zig
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-04 21:09:35 -0500
committerRobby Zambito <contact@robbyzambito.me>2026-01-04 23:36:44 -0500
commitca43a12b9b754d5a3c7e89b9bb8d622358db3795 (patch)
tree07edb13cf54115aa73ad1fef1f722e3357b82d2e /src/server/message_parser.zig
parent69528a1b72bc578430e3a3e12d7cd63680986c29 (diff)
Using separate queue for high throughput messages
Diffstat (limited to 'src/server/message_parser.zig')
-rw-r--r--src/server/message_parser.zig60
1 files changed, 32 insertions, 28 deletions
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig
index 8b4859b..ff1a573 100644
--- a/src/server/message_parser.zig
+++ b/src/server/message_parser.zig
@@ -268,34 +268,7 @@ pub const Message = union(enum) {
switch (operation) {
.CONNECT => {
- // for storing the json string
- var connect_string_writer_allocating: AllocatingWriter = .init(alloc);
- defer connect_string_writer_allocating.deinit();
- var connect_string_writer = &connect_string_writer_allocating.writer;
-
- // for parsing the json string
- var connect_arena_allocator: ArenaAllocator = .init(alloc);
- defer connect_arena_allocator.deinit();
- const connect_allocator = connect_arena_allocator.allocator();
-
- try in.discardAll(1); // throw away space
-
- // Should read the next JSON object to the fixed buffer writer.
- _ = try in.streamDelimiter(connect_string_writer, '}');
- try connect_string_writer.writeByte('}');
- try expectStreamBytes(in, "}\r\n"); // discard '}\r\n'
-
- const connect_str = try connect_string_writer_allocating.toOwnedSlice();
- defer alloc.free(connect_str);
- // TODO: should be CONNECTION allocator
- const res = try std.json.parseFromSliceLeaky(
- Connect,
- connect_allocator,
- connect_str,
- .{ .allocate = .alloc_always },
- );
-
- return .{ .CONNECT = try res.dupe(alloc) };
+ return parseConnect(alloc, in);
},
.PUB => {
@branchHint(.likely);
@@ -324,6 +297,37 @@ pub const Message = union(enum) {
}
};
+fn parseConnect(alloc: Allocator, in: *Reader) !Message {
+ // for storing the json string
+ var connect_string_writer_allocating: AllocatingWriter = .init(alloc);
+ defer connect_string_writer_allocating.deinit();
+ var connect_string_writer = &connect_string_writer_allocating.writer;
+
+ // for parsing the json string
+ var connect_arena_allocator: ArenaAllocator = .init(alloc);
+ defer connect_arena_allocator.deinit();
+ const connect_allocator = connect_arena_allocator.allocator();
+
+ try in.discardAll(1); // throw away space
+
+ // Should read the next JSON object to the fixed buffer writer.
+ _ = try in.streamDelimiter(connect_string_writer, '}');
+ try connect_string_writer.writeByte('}');
+ try expectStreamBytes(in, "}\r\n"); // discard '}\r\n'
+
+ const connect_str = try connect_string_writer_allocating.toOwnedSlice();
+ defer alloc.free(connect_str);
+ // TODO: should be CONNECTION allocator
+ const res = try std.json.parseFromSliceLeaky(
+ Message.Connect,
+ connect_allocator,
+ connect_str,
+ .{ .allocate = .alloc_always },
+ );
+
+ return .{ .CONNECT = try res.dupe(alloc) };
+}
+
fn parseSub(alloc: Allocator, in: *Reader) !Message {
try in.discardAll(1); // throw away space
const subject = try readSubject(alloc, in, .sub);