summaryrefslogtreecommitdiff
path: root/src/Server.zig
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-12 19:18:45 -0500
committerRobby Zambito <contact@robbyzambito.me>2026-01-12 22:21:48 -0500
commit1ee0f9263cd8e2984237ff34ae4625e8aaf2680c (patch)
treee91d4fa1f6c890f02cf9fbaebe3912e3774777d2 /src/Server.zig
parentc5ad98adc6ebea6627fc08c2c16324610a8a97e0 (diff)
Properly handle large messagesHEADdev
add max bytes setting
Diffstat (limited to 'src/Server.zig')
-rw-r--r--src/Server.zig54
1 files changed, 44 insertions, 10 deletions
diff --git a/src/Server.zig b/src/Server.zig
index e0aca28..0259d8f 100644
--- a/src/Server.zig
+++ b/src/Server.zig
@@ -39,8 +39,17 @@ const Subscription = struct {
if (self.queue_group) |g| alloc.free(g);
}
- fn send(self: *Subscription, io: Io, buf: []u8, bytes: []const []const u8) !void {
- var w: std.Io.Writer = .fixed(buf);
+ fn send(self: *Subscription, io: Io, hot_buf: *align(std.atomic.cache_line) [512]u8, buf: []u8, bytes: []const []const u8) !void {
+ const total_len = blk: {
+ var total_len: usize = 0;
+ for (bytes) |chunk| {
+ total_len += chunk.len;
+ }
+ break :blk total_len;
+ };
+ log.debug("Payload len: {d}", .{bytes[bytes.len - 1].len});
+ var w: std.Io.Writer = .fixed(if (total_len <= hot_buf.len) hot_buf else buf);
+ log.debug("Using buffer size: {d}", .{w.buffer.len});
for (bytes) |chunk| {
w.writeAll(chunk) catch unreachable;
}
@@ -171,8 +180,7 @@ fn handleConnection(
const out = &writer.interface;
// Set up client reader
- _ = r_buf_size;
- const r_buffer: []u8 = try alloc.alignedAlloc(u8, .fromByteUnits(std.atomic.cache_line), 64 * 1024 * 1024);
+ const r_buffer: []u8 = try alloc.alignedAlloc(u8, .fromByteUnits(std.atomic.cache_line), r_buf_size);
defer alloc.free(r_buffer);
var reader = stream.reader(io, r_buffer);
const in = &reader.interface;
@@ -183,7 +191,8 @@ fn handleConnection(
var recv_queue: Queue(u8) = .init(qbuf);
defer recv_queue.close(io);
- const msg_write_buf: []u8 = try alloc.alignedAlloc(u8, .fromByteUnits(std.atomic.cache_line), 1 * 1024 * 1024);
+ var hot_msg_buf: [std.atomic.cache_line * 4]u8 align(std.atomic.cache_line) = undefined;
+ const msg_write_buf: []u8 = try alloc.alloc(u8, server.info.max_payload);
defer alloc.free(msg_write_buf);
// Create client
@@ -212,7 +221,15 @@ fn handleConnection(
.PUB => {
@branchHint(.likely);
const before = try clock.now(io);
- server.publishMessage(io, rand, server_allocator, msg_write_buf, &client, .@"pub") catch |err| switch (err) {
+ server.publishMessage(
+ io,
+ rand,
+ server_allocator,
+ &hot_msg_buf,
+ msg_write_buf,
+ &client,
+ .@"pub",
+ ) catch |err| switch (err) {
error.ReadFailed => return reader.err.?,
error.EndOfStream => return error.ClientDisconnected,
else => |e| return e,
@@ -225,7 +242,15 @@ fn handleConnection(
.HPUB => {
@branchHint(.likely);
const before = try clock.now(io);
- server.publishMessage(io, rand, server_allocator, msg_write_buf, &client, .hpub) catch |err| switch (err) {
+ server.publishMessage(
+ io,
+ rand,
+ server_allocator,
+ &hot_msg_buf,
+ msg_write_buf,
+ &client,
+ .hpub,
+ ) catch |err| switch (err) {
error.ReadFailed => return reader.err.?,
error.EndOfStream => return error.ClientDisconnected,
else => |e| return e,
@@ -312,6 +337,7 @@ fn publishMessage(
io: Io,
rand: std.Random,
alloc: Allocator,
+ hot_write_buf: *align(std.atomic.cache_line) [512]u8,
msg_write_buf: []u8,
source_client: *Client,
comptime pub_or_hpub: enum { @"pub", hpub },
@@ -325,13 +351,16 @@ fn publishMessage(
}
};
+ var big_msg_arena_allocator: std.heap.ArenaAllocator = .init(alloc);
+ defer big_msg_arena_allocator.deinit();
+
const hpubmsg = switch (pub_or_hpub) {
.@"pub" => {},
- .hpub => try parse.hpub(source_client.from_client),
+ .hpub => try parse.hpub(source_client.from_client, &big_msg_arena_allocator),
};
const msg: Message.Pub = switch (pub_or_hpub) {
- .@"pub" => try parse.@"pub"(source_client.from_client),
+ .@"pub" => try parse.@"pub"(source_client.from_client, &big_msg_arena_allocator),
.hpub => hpubmsg.@"pub",
};
@@ -399,7 +428,12 @@ fn publishMessage(
);
msg_chunks.appendAssumeCapacity(msg.payload);
- subscription.send(io, msg_write_buf, msg_chunks.items[0..chunk_count]) catch |err| switch (err) {
+ subscription.send(
+ io,
+ hot_write_buf,
+ msg_write_buf,
+ msg_chunks.items[0..chunk_count],
+ ) catch |err| switch (err) {
error.Closed => {},
error.Canceled => |e| return e,
};