diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-12 09:37:24 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-12 09:37:47 -0500 |
| commit | 7bcc0c19aa713e2570d2c6b418f6ef27b79b8fda (patch) | |
| tree | b53db8abf77b34893c631fac767373adb034d809 /src | |
| parent | 72e7df5d5cc56b402f86dbb31b79703bb47dacf1 (diff) | |
Significant speed improvement
Diffstat (limited to 'src')
| -rw-r--r-- | src/Server.zig | 23 | ||||
| -rw-r--r-- | src/Server/Client.zig | 10 |
2 files changed, 13 insertions, 20 deletions
diff --git a/src/Server.zig b/src/Server.zig index f0ee1fe..7dc3783 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -39,15 +39,14 @@ const Subscription = struct { if (self.queue_group) |g| alloc.free(g); } - fn send(self: *Subscription, io: Io, bytes: []const []const u8) !void { - try self.queue_lock.lock(io); - defer self.queue_lock.unlock(io); + fn send(self: *Subscription, io: Io, buf: []u8, bytes: []const []const u8) !void { + var w: std.Io.Writer = .fixed(buf); for (bytes) |chunk| { - // Uncancelable because canceling the sender in the middle of this loop - // would put an invalid set series of bytes in the receivers queue. - _ = try self.queue.putUncancelable(io, chunk, chunk.len); + w.writeAll(chunk) catch unreachable; } - try io.checkCancel(); + try self.queue_lock.lock(io); + defer self.queue_lock.unlock(io); + try self.queue.putAll(io, w.buffered()); } }; @@ -181,6 +180,9 @@ fn handleConnection( var recv_queue: Queue(u8) = .init(qbuf); defer recv_queue.close(io); + const msg_write_buf: []u8 = try alloc.alignedAlloc(u8, .fromByteUnits(std.atomic.cache_line), 1 * 1024 * 1024); + defer alloc.free(msg_write_buf); + // Create client var client: Client = .init(null, &recv_queue, in, out); defer client.deinit(server_allocator); @@ -207,7 +209,7 @@ fn handleConnection( .PUB => { @branchHint(.likely); // log.debug("received a pub msg", .{}); - server.publishMessage(io, rand, server_allocator, &client, .@"pub") catch |err| switch (err) { + server.publishMessage(io, rand, server_allocator, msg_write_buf, &client, .@"pub") catch |err| switch (err) { error.ReadFailed => return reader.err.?, error.EndOfStream => return error.ClientDisconnected, else => |e| return e, @@ -215,7 +217,7 @@ fn handleConnection( }, .HPUB => { @branchHint(.likely); - server.publishMessage(io, rand, server_allocator, &client, .hpub) catch |err| switch (err) { + server.publishMessage(io, rand, server_allocator, msg_write_buf, &client, .hpub) catch |err| switch (err) { error.ReadFailed => return reader.err.?, error.EndOfStream => return error.ClientDisconnected, else => |e| return e, @@ -298,6 +300,7 @@ fn publishMessage( io: Io, rand: std.Random, alloc: Allocator, + msg_write_buf: []u8, source_client: *Client, comptime pub_or_hpub: enum { @"pub", hpub }, ) !void { @@ -384,7 +387,7 @@ fn publishMessage( ); msg_chunks.appendAssumeCapacity(msg.payload); - subscription.send(io, msg_chunks.items[0..chunk_count]) catch |err| switch (err) { + subscription.send(io, msg_write_buf, msg_chunks.items[0..chunk_count]) catch |err| switch (err) { error.Closed => {}, error.Canceled => |e| return e, }; diff --git a/src/Server/Client.zig b/src/Server/Client.zig index 6ad1804..77034fd 100644 --- a/src/Server/Client.zig +++ b/src/Server/Client.zig @@ -40,16 +40,6 @@ pub fn start(self: *Client, io: std.Io) !void { std.debug.assert(self.to_client.end == 0); while (true) { self.to_client.end = try self.recv_queue.get(io, self.to_client.buffer, 1); - // Wait 1 nanosecond to see if more data is in the queue. - // If there is, add it to the write buffer before sending it. - // The reason for this is because if we send the first chunk as soon as we get it, - // we will likely be sending a partial message, which will end up being way slower. - try io.sleep(.fromNanoseconds(1), .awake); - self.to_client.end += try self.recv_queue.get( - io, - self.to_client.buffer[self.to_client.end..], - 0, - ); try self.to_client.flush(); } } |
