summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Server.zig48
1 files changed, 34 insertions, 14 deletions
diff --git a/src/Server.zig b/src/Server.zig
index 35b93e9..65129a5 100644
--- a/src/Server.zig
+++ b/src/Server.zig
@@ -205,7 +205,6 @@ fn handleConnection(
@branchHint(.likely);
// log.debug("received a pub msg", .{});
server.publishMessage(io, rand, server_allocator, &client, .@"pub") catch |err| switch (err) {
- error.WriteFailed => return writer.err.?,
error.ReadFailed => return reader.err.?,
error.EndOfStream => return error.ClientDisconnected,
else => |e| return e,
@@ -214,7 +213,6 @@ fn handleConnection(
.HPUB => {
@branchHint(.likely);
server.publishMessage(io, rand, server_allocator, &client, .hpub) catch |err| switch (err) {
- error.WriteFailed => return writer.err.?,
error.ReadFailed => return reader.err.?,
error.EndOfStream => return error.ClientDisconnected,
else => |e| return e,
@@ -325,10 +323,6 @@ fn publishMessage(
var published_queue_groups: ArrayList([]const u8) = .empty;
defer published_queue_groups.deinit(alloc);
- var line_writer_allocating: std.Io.Writer.Allocating = .init(alloc);
- defer line_writer_allocating.deinit();
- var line_writer = &line_writer_allocating.writer;
-
subs: for (0..server.subscriptions.items.len) |i| {
var subscription = server.subscriptions.items[i];
if (subjectMatches(subscription.subject, msg.subject)) {
@@ -342,25 +336,51 @@ fn publishMessage(
try published_queue_groups.append(alloc, sg);
}
- line_writer_allocating.clearRetainingCapacity();
+ // The rest of this loop is setting up a slice of byte slices to simultaneously
+ // send to the underlying queue.
+ // Each "chunk" is a section of the message to be sent.
+ // The chunk_count starts off at the minimum number of chunks per message, and
+ // then increases as branches add additional chunks.
+ // The msg_chunks_buf.len is the maximum number of chunks in a message.
+ // Each of the appendBounded calls has their error marked as unreachable,
+ // because it is an error for there to be more appendBounded calls than chunks
+ // in the chunks buf.
+ // The reason for this strategy is to avoid any intermediary allocations between
+ // the publishers read buffer, and the subscribers write buffer.
+ var chunk_count: usize = 7;
+ var msg_chunks_buf: [10][]const u8 = undefined;
+ var msg_chunks: ArrayList([]const u8) = .initBuffer(&msg_chunks_buf);
switch (pub_or_hpub) {
- .@"pub" => _ = try line_writer.write("MSG "),
- .hpub => _ = try line_writer.write("HMSG "),
+ .@"pub" => _ = msg_chunks.appendBounded("MSG ") catch unreachable,
+ .hpub => _ = msg_chunks.appendBounded("HMSG ") catch unreachable,
}
- try line_writer.print("{s} {s} ", .{ msg.subject, subscription.sid });
+ msg_chunks.appendBounded(msg.subject) catch unreachable;
+ msg_chunks.appendBounded(" ") catch unreachable;
+ msg_chunks.appendBounded(subscription.sid) catch unreachable;
+ msg_chunks.appendBounded(" ") catch unreachable;
if (msg.reply_to) |reply_to| {
- try line_writer.print("{s} ", .{reply_to});
+ chunk_count += 2;
+ msg_chunks.appendBounded(reply_to) catch unreachable;
+ msg_chunks.appendBounded(" ") catch unreachable;
}
switch (pub_or_hpub) {
.hpub => {
- try line_writer.print("{d} ", .{hpubmsg.header_bytes});
+ chunk_count += 1;
+ var hlen_buf: [std.fmt.count("{d} ", .{std.math.maxInt(usize)})]u8 = undefined;
+ msg_chunks.appendBounded(
+ std.fmt.bufPrint(&hlen_buf, "{d} ", .{hpubmsg.header_bytes}) catch unreachable,
+ ) catch unreachable;
},
else => {},
}
- try line_writer.print("{d}\r\n", .{msg.payload.len - 2});
+ var len_buf: [std.fmt.count("{d}\r\n", .{std.math.maxInt(usize)})]u8 = undefined;
+ msg_chunks.appendBounded(
+ std.fmt.bufPrint(&len_buf, "{d}\r\n", .{msg.payload.len - 2}) catch unreachable,
+ ) catch unreachable;
+ msg_chunks.appendBounded(msg.payload) catch unreachable;
- try subscription.send(io, &.{ line_writer.buffered(), msg.payload });
+ try subscription.send(io, msg_chunks.items[0..chunk_count]);
}
}