From 4fcb9e3943b899b01bbb959dcf322af1855abf69 Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Thu, 1 Jan 2026 03:07:19 +0000 Subject: Heap allocate client buffers --- src/server/main.zig | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/src/server/main.zig b/src/server/main.zig index 0efefc3..4f52bec 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -43,6 +43,7 @@ pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void { defer threaded.deinit(); const io = threaded.io(); + var tcp_server = try std.Io.net.IpAddress.listen(try std.Io.net.IpAddress.parse( server.info.host, server.info.port, @@ -121,12 +122,14 @@ fn handleConnection( const allocator = client_allocator.allocator(); defer stream.close(io); - var w_buffer: [4096]u8 = undefined; - var writer = stream.writer(io, &w_buffer); + const w_buffer: []u8 = try allocator.alloc(u8, 1024); + defer allocator.free(w_buffer); + var writer = stream.writer(io, w_buffer); const out = &writer.interface; - var r_buffer: [8192]u8 = undefined; - var reader = stream.reader(io, &r_buffer); + const r_buffer: []u8 = try allocator.alloc(u8, 1024); + defer allocator.free(r_buffer); + var reader = stream.reader(io, r_buffer); const in = &reader.interface; var client_state: ClientState = .init(null, in, out); @@ -147,12 +150,7 @@ fn handleConnection( try client_state.send(io, .pong); }, .@"pub" => |pb| { - try server.publishMessage(io, server_allocator, pb); - if (client_state.connect) |c| { - if (c.verbose) { - try client_state.send(io, .@"+ok"); - } - } + _ = io.async(publishMessage, .{ server, io, server_allocator, &client_state, pb }); }, .sub => |sub| { try server.subscribe(io, server_allocator, id, sub); @@ -183,7 +181,14 @@ fn subjectMatches(expected: []const u8, actual: []const u8) bool { return std.mem.eql(u8, expected, actual); } -fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, msg: Message.Pub) !void { +fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, source_client: *ClientState, msg: Message.Pub) !void { + errdefer { + if (source_client.connect) |c| { + if (c.verbose) { + source_client.send(io, .@"-err") catch {}; + } + } + } defer msg.deinit(gpa); for (server.subscriptions.items) |subscription| { if (subjectMatches(subscription.subject, msg.subject)) { @@ -199,6 +204,11 @@ fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, msg: Mess } }) catch continue; } } + if (source_client.connect) |c| { + if (c.verbose) { + source_client.send(io, .@"+ok") catch {}; + } + } } fn subscribe(server: *Server, io: std.Io, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void { -- cgit