diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-12 19:18:45 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-12 22:21:48 -0500 |
| commit | 1ee0f9263cd8e2984237ff34ae4625e8aaf2680c (patch) | |
| tree | e91d4fa1f6c890f02cf9fbaebe3912e3774777d2 /src/Server.zig | |
| parent | c5ad98adc6ebea6627fc08c2c16324610a8a97e0 (diff) | |
add max bytes setting
Diffstat (limited to 'src/Server.zig')
| -rw-r--r-- | src/Server.zig | 54 |
1 files changed, 44 insertions, 10 deletions
diff --git a/src/Server.zig b/src/Server.zig index e0aca28..0259d8f 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -39,8 +39,17 @@ const Subscription = struct { if (self.queue_group) |g| alloc.free(g); } - fn send(self: *Subscription, io: Io, buf: []u8, bytes: []const []const u8) !void { - var w: std.Io.Writer = .fixed(buf); + fn send(self: *Subscription, io: Io, hot_buf: *align(std.atomic.cache_line) [512]u8, buf: []u8, bytes: []const []const u8) !void { + const total_len = blk: { + var total_len: usize = 0; + for (bytes) |chunk| { + total_len += chunk.len; + } + break :blk total_len; + }; + log.debug("Payload len: {d}", .{bytes[bytes.len - 1].len}); + var w: std.Io.Writer = .fixed(if (total_len <= hot_buf.len) hot_buf else buf); + log.debug("Using buffer size: {d}", .{w.buffer.len}); for (bytes) |chunk| { w.writeAll(chunk) catch unreachable; } @@ -171,8 +180,7 @@ fn handleConnection( const out = &writer.interface; // Set up client reader - _ = r_buf_size; - const r_buffer: []u8 = try alloc.alignedAlloc(u8, .fromByteUnits(std.atomic.cache_line), 64 * 1024 * 1024); + const r_buffer: []u8 = try alloc.alignedAlloc(u8, .fromByteUnits(std.atomic.cache_line), r_buf_size); defer alloc.free(r_buffer); var reader = stream.reader(io, r_buffer); const in = &reader.interface; @@ -183,7 +191,8 @@ fn handleConnection( var recv_queue: Queue(u8) = .init(qbuf); defer recv_queue.close(io); - const msg_write_buf: []u8 = try alloc.alignedAlloc(u8, .fromByteUnits(std.atomic.cache_line), 1 * 1024 * 1024); + var hot_msg_buf: [std.atomic.cache_line * 4]u8 align(std.atomic.cache_line) = undefined; + const msg_write_buf: []u8 = try alloc.alloc(u8, server.info.max_payload); defer alloc.free(msg_write_buf); // Create client @@ -212,7 +221,15 @@ fn handleConnection( .PUB => { @branchHint(.likely); const before = try clock.now(io); - server.publishMessage(io, rand, server_allocator, msg_write_buf, &client, .@"pub") catch |err| switch (err) { + server.publishMessage( + io, + rand, + server_allocator, + &hot_msg_buf, + msg_write_buf, + &client, + .@"pub", + ) catch |err| switch (err) { error.ReadFailed => return reader.err.?, error.EndOfStream => return error.ClientDisconnected, else => |e| return e, @@ -225,7 +242,15 @@ fn handleConnection( .HPUB => { @branchHint(.likely); const before = try clock.now(io); - server.publishMessage(io, rand, server_allocator, msg_write_buf, &client, .hpub) catch |err| switch (err) { + server.publishMessage( + io, + rand, + server_allocator, + &hot_msg_buf, + msg_write_buf, + &client, + .hpub, + ) catch |err| switch (err) { error.ReadFailed => return reader.err.?, error.EndOfStream => return error.ClientDisconnected, else => |e| return e, @@ -312,6 +337,7 @@ fn publishMessage( io: Io, rand: std.Random, alloc: Allocator, + hot_write_buf: *align(std.atomic.cache_line) [512]u8, msg_write_buf: []u8, source_client: *Client, comptime pub_or_hpub: enum { @"pub", hpub }, @@ -325,13 +351,16 @@ fn publishMessage( } }; + var big_msg_arena_allocator: std.heap.ArenaAllocator = .init(alloc); + defer big_msg_arena_allocator.deinit(); + const hpubmsg = switch (pub_or_hpub) { .@"pub" => {}, - .hpub => try parse.hpub(source_client.from_client), + .hpub => try parse.hpub(source_client.from_client, &big_msg_arena_allocator), }; const msg: Message.Pub = switch (pub_or_hpub) { - .@"pub" => try parse.@"pub"(source_client.from_client), + .@"pub" => try parse.@"pub"(source_client.from_client, &big_msg_arena_allocator), .hpub => hpubmsg.@"pub", }; @@ -399,7 +428,12 @@ fn publishMessage( ); msg_chunks.appendAssumeCapacity(msg.payload); - subscription.send(io, msg_write_buf, msg_chunks.items[0..chunk_count]) catch |err| switch (err) { + subscription.send( + io, + hot_write_buf, + msg_write_buf, + msg_chunks.items[0..chunk_count], + ) catch |err| switch (err) { error.Closed => {}, error.Canceled => |e| return e, }; |
