summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-01 03:07:19 +0000
committerRobby Zambito <contact@robbyzambito.me>2026-01-01 05:22:02 +0000
commit4fcb9e3943b899b01bbb959dcf322af1855abf69 (patch)
treedc8ee4bc5ccb535782d7ac73d5a4253a7be2554e
parent0233bc278c263d356d427ca9df48028bfac42515 (diff)
Heap allocate client buffers
-rw-r--r--src/server/main.zig32
1 files changed, 21 insertions, 11 deletions
diff --git a/src/server/main.zig b/src/server/main.zig
index 0efefc3..4f52bec 100644
--- a/src/server/main.zig
+++ b/src/server/main.zig
@@ -43,6 +43,7 @@ pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void {
defer threaded.deinit();
const io = threaded.io();
+
var tcp_server = try std.Io.net.IpAddress.listen(try std.Io.net.IpAddress.parse(
server.info.host,
server.info.port,
@@ -121,12 +122,14 @@ fn handleConnection(
const allocator = client_allocator.allocator();
defer stream.close(io);
- var w_buffer: [4096]u8 = undefined;
- var writer = stream.writer(io, &w_buffer);
+ const w_buffer: []u8 = try allocator.alloc(u8, 1024);
+ defer allocator.free(w_buffer);
+ var writer = stream.writer(io, w_buffer);
const out = &writer.interface;
- var r_buffer: [8192]u8 = undefined;
- var reader = stream.reader(io, &r_buffer);
+ const r_buffer: []u8 = try allocator.alloc(u8, 1024);
+ defer allocator.free(r_buffer);
+ var reader = stream.reader(io, r_buffer);
const in = &reader.interface;
var client_state: ClientState = .init(null, in, out);
@@ -147,12 +150,7 @@ fn handleConnection(
try client_state.send(io, .pong);
},
.@"pub" => |pb| {
- try server.publishMessage(io, server_allocator, pb);
- if (client_state.connect) |c| {
- if (c.verbose) {
- try client_state.send(io, .@"+ok");
- }
- }
+ _ = io.async(publishMessage, .{ server, io, server_allocator, &client_state, pb });
},
.sub => |sub| {
try server.subscribe(io, server_allocator, id, sub);
@@ -183,7 +181,14 @@ fn subjectMatches(expected: []const u8, actual: []const u8) bool {
return std.mem.eql(u8, expected, actual);
}
-fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, msg: Message.Pub) !void {
+fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, source_client: *ClientState, msg: Message.Pub) !void {
+ errdefer {
+ if (source_client.connect) |c| {
+ if (c.verbose) {
+ source_client.send(io, .@"-err") catch {};
+ }
+ }
+ }
defer msg.deinit(gpa);
for (server.subscriptions.items) |subscription| {
if (subjectMatches(subscription.subject, msg.subject)) {
@@ -199,6 +204,11 @@ fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, msg: Mess
} }) catch continue;
}
}
+ if (source_client.connect) |c| {
+ if (c.verbose) {
+ source_client.send(io, .@"+ok") catch {};
+ }
+ }
}
fn subscribe(server: *Server, io: std.Io, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void {