diff options
| -rw-r--r-- | src/Server.zig | 48 |
1 files changed, 34 insertions, 14 deletions
diff --git a/src/Server.zig b/src/Server.zig index 35b93e9..65129a5 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -205,7 +205,6 @@ fn handleConnection( @branchHint(.likely); // log.debug("received a pub msg", .{}); server.publishMessage(io, rand, server_allocator, &client, .@"pub") catch |err| switch (err) { - error.WriteFailed => return writer.err.?, error.ReadFailed => return reader.err.?, error.EndOfStream => return error.ClientDisconnected, else => |e| return e, @@ -214,7 +213,6 @@ fn handleConnection( .HPUB => { @branchHint(.likely); server.publishMessage(io, rand, server_allocator, &client, .hpub) catch |err| switch (err) { - error.WriteFailed => return writer.err.?, error.ReadFailed => return reader.err.?, error.EndOfStream => return error.ClientDisconnected, else => |e| return e, @@ -325,10 +323,6 @@ fn publishMessage( var published_queue_groups: ArrayList([]const u8) = .empty; defer published_queue_groups.deinit(alloc); - var line_writer_allocating: std.Io.Writer.Allocating = .init(alloc); - defer line_writer_allocating.deinit(); - var line_writer = &line_writer_allocating.writer; - subs: for (0..server.subscriptions.items.len) |i| { var subscription = server.subscriptions.items[i]; if (subjectMatches(subscription.subject, msg.subject)) { @@ -342,25 +336,51 @@ fn publishMessage( try published_queue_groups.append(alloc, sg); } - line_writer_allocating.clearRetainingCapacity(); + // The rest of this loop is setting up a slice of byte slices to simultaneously + // send to the underlying queue. + // Each "chunk" is a section of the message to be sent. + // The chunk_count starts off at the minimum number of chunks per message, and + // then increases as branches add additional chunks. + // The msg_chunks_buf.len is the maximum number of chunks in a message. + // Each of the appendBounded calls has their error marked as unreachable, + // because it is an error for there to be more appendBounded calls than chunks + // in the chunks buf. + // The reason for this strategy is to avoid any intermediary allocations between + // the publishers read buffer, and the subscribers write buffer. + var chunk_count: usize = 7; + var msg_chunks_buf: [10][]const u8 = undefined; + var msg_chunks: ArrayList([]const u8) = .initBuffer(&msg_chunks_buf); switch (pub_or_hpub) { - .@"pub" => _ = try line_writer.write("MSG "), - .hpub => _ = try line_writer.write("HMSG "), + .@"pub" => _ = msg_chunks.appendBounded("MSG ") catch unreachable, + .hpub => _ = msg_chunks.appendBounded("HMSG ") catch unreachable, } - try line_writer.print("{s} {s} ", .{ msg.subject, subscription.sid }); + msg_chunks.appendBounded(msg.subject) catch unreachable; + msg_chunks.appendBounded(" ") catch unreachable; + msg_chunks.appendBounded(subscription.sid) catch unreachable; + msg_chunks.appendBounded(" ") catch unreachable; if (msg.reply_to) |reply_to| { - try line_writer.print("{s} ", .{reply_to}); + chunk_count += 2; + msg_chunks.appendBounded(reply_to) catch unreachable; + msg_chunks.appendBounded(" ") catch unreachable; } switch (pub_or_hpub) { .hpub => { - try line_writer.print("{d} ", .{hpubmsg.header_bytes}); + chunk_count += 1; + var hlen_buf: [std.fmt.count("{d} ", .{std.math.maxInt(usize)})]u8 = undefined; + msg_chunks.appendBounded( + std.fmt.bufPrint(&hlen_buf, "{d} ", .{hpubmsg.header_bytes}) catch unreachable, + ) catch unreachable; }, else => {}, } - try line_writer.print("{d}\r\n", .{msg.payload.len - 2}); + var len_buf: [std.fmt.count("{d}\r\n", .{std.math.maxInt(usize)})]u8 = undefined; + msg_chunks.appendBounded( + std.fmt.bufPrint(&len_buf, "{d}\r\n", .{msg.payload.len - 2}) catch unreachable, + ) catch unreachable; + msg_chunks.appendBounded(msg.payload) catch unreachable; - try subscription.send(io, &.{ line_writer.buffered(), msg.payload }); + try subscription.send(io, msg_chunks.items[0..chunk_count]); } } |
