From c9b1cbfa35f1486e6c091123e595959c0b748233 Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Fri, 9 Jan 2026 23:49:10 -0500 Subject: starting minimizing the number of queue puts --- src/Server.zig | 20 +++++-------------- src/Server/message.zig | 4 ++-- src/Server/parse.zig | 52 +++++++++++++++++++++++++------------------------- 3 files changed, 33 insertions(+), 43 deletions(-) diff --git a/src/Server.zig b/src/Server.zig index 65129a5..eff61d1 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -347,8 +347,8 @@ fn publishMessage( // in the chunks buf. // The reason for this strategy is to avoid any intermediary allocations between // the publishers read buffer, and the subscribers write buffer. - var chunk_count: usize = 7; - var msg_chunks_buf: [10][]const u8 = undefined; + var chunk_count: usize = 6; + var msg_chunks_buf: [9][]const u8 = undefined; var msg_chunks: ArrayList([]const u8) = .initBuffer(&msg_chunks_buf); switch (pub_or_hpub) { @@ -364,20 +364,10 @@ fn publishMessage( msg_chunks.appendBounded(reply_to) catch unreachable; msg_chunks.appendBounded(" ") catch unreachable; } - switch (pub_or_hpub) { - .hpub => { - chunk_count += 1; - var hlen_buf: [std.fmt.count("{d} ", .{std.math.maxInt(usize)})]u8 = undefined; - msg_chunks.appendBounded( - std.fmt.bufPrint(&hlen_buf, "{d} ", .{hpubmsg.header_bytes}) catch unreachable, - ) catch unreachable; - }, - else => {}, + if (pub_or_hpub == .hpub) { + chunk_count += 1; + msg_chunks.appendBounded(hpubmsg.header_bytes) catch unreachable; } - var len_buf: [std.fmt.count("{d}\r\n", .{std.math.maxInt(usize)})]u8 = undefined; - msg_chunks.appendBounded( - std.fmt.bufPrint(&len_buf, "{d}\r\n", .{msg.payload.len - 2}) catch unreachable, - ) catch unreachable; msg_chunks.appendBounded(msg.payload) catch unreachable; try subscription.send(io, msg_chunks.items[0..chunk_count]); diff --git a/src/Server/message.zig b/src/Server/message.zig index 8005453..0b02892 100644 --- a/src/Server/message.zig +++ b/src/Server/message.zig @@ -120,7 +120,7 @@ pub const Message = union(enum) { } }; pub const HPub = struct { - header_bytes: usize, + header_bytes: []const u8, @"pub": Pub, pub fn deinit(self: HPub, alloc: Allocator) void { @@ -136,7 +136,7 @@ pub const Message = union(enum) { }; pub const HMsg = struct { - header_bytes: usize, + header_bytes: []const u8, msg: Msg, pub fn deinit(self: HMsg, alloc: Allocator) void { diff --git a/src/Server/parse.zig b/src/Server/parse.zig index cda5985..46dc239 100644 --- a/src/Server/parse.zig +++ b/src/Server/parse.zig @@ -145,21 +145,22 @@ pub fn @"pub"(in: *Reader) Error!Message.Pub { try in.fill(iter.index + bytes + "\r\n".len + "\r\n".len); continue; } - in.toss(iter.index + "\r\n".len); + in.toss(iter.index - second.len); return .{ .subject = subject, .reply_to = null, - .payload = in.take(bytes + 2) catch unreachable, + .payload = in.take(bytes + 5) catch unreachable, }; } switch (in.buffered()[iter.index]) { '\t', ' ' => { const reply_to = second; - const bytes = parseUnsigned(usize, iter.next() orelse { + const bytes_str = iter.next() orelse { try in.fillMore(); continue; - }, 10) catch return error.InvalidStream; + }; + const bytes = parseUnsigned(usize, bytes_str, 10) catch return error.InvalidStream; if (in.buffered().len == iter.index or in.buffered()[iter.index] != '\r') { try in.fillMore(); @@ -171,11 +172,11 @@ pub fn @"pub"(in: *Reader) Error!Message.Pub { continue; } - in.toss(iter.index + "\r\n".len); + in.toss(iter.index - bytes_str.len); return .{ .subject = subject, .reply_to = reply_to, - .payload = in.take(bytes + 2) catch unreachable, + .payload = in.take(bytes + 5) catch unreachable, }; }, else => {}, @@ -195,7 +196,7 @@ test @"pub" { Message.Pub{ .subject = "foo", .reply_to = "bar", - .payload = "hi\r\n", + .payload = "2\r\nhi\r\n", }, try @"pub"(&in.interface), ); @@ -210,7 +211,7 @@ test @"pub" { Message.Pub{ .subject = "foo", .reply_to = null, - .payload = "hi\r\n", + .payload = "2\r\nhi\r\n", }, try @"pub"(&in.interface), ); @@ -226,7 +227,7 @@ test @"pub" { Message.Pub{ .subject = "foo", .reply_to = null, - .payload = "hi\r\n", + .payload = "2\r\nhi\r\n", }, try @"pub"(&in.interface), ); @@ -243,7 +244,7 @@ test @"pub" { Message.Pub{ .subject = "foo", .reply_to = null, - .payload = "hi\r\n", + .payload = "2\r\nhi\r\n", }, try @"pub"(&in.interface), ); @@ -261,7 +262,7 @@ test @"pub" { Message.Pub{ .subject = "foo", .reply_to = null, - .payload = "hi\r\n", + .payload = "2\r\nhi\r\n", }, try @"pub"(&in.interface), ); @@ -540,7 +541,7 @@ pub fn hpub(in: *Reader) Error!Message.HPub { const header_bytes_str = second; const total_bytes_str = third; - const header_bytes = parseUnsigned(usize, header_bytes_str, 10) catch return error.InvalidStream; + _ = parseUnsigned(usize, header_bytes_str, 10) catch return error.InvalidStream; const total_bytes = parseUnsigned(usize, total_bytes_str, 10) catch return error.InvalidStream; if (in.buffered().len < iter.index + total_bytes + 4) { @@ -548,14 +549,13 @@ pub fn hpub(in: *Reader) Error!Message.HPub { continue; } - // 4 bytes for CRLF on either side of headers and payload. - in.toss(iter.index + 2); + in.toss(iter.index - total_bytes_str.len); return .{ - .header_bytes = header_bytes, + .header_bytes = header_bytes_str.ptr[0 .. header_bytes_str.len + 1], .@"pub" = .{ .subject = subject, .reply_to = null, - .payload = in.take(total_bytes + 2) catch unreachable, + .payload = in.take(total_bytes + total_bytes_str.len + 4) catch unreachable, }, }; } @@ -565,7 +565,7 @@ pub fn hpub(in: *Reader) Error!Message.HPub { const header_bytes_str = third; if (iter.next()) |total_bytes_str| { if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') { - const header_bytes = parseUnsigned(usize, header_bytes_str, 10) catch return error.InvalidStream; + _ = parseUnsigned(usize, header_bytes_str, 10) catch return error.InvalidStream; const total_bytes = parseUnsigned(usize, total_bytes_str, 10) catch return error.InvalidStream; if (in.buffered().len < iter.index + total_bytes + 4) { @@ -573,14 +573,13 @@ pub fn hpub(in: *Reader) Error!Message.HPub { continue; } - // 4 bytes for CRLF on either side of headers and payload. - in.toss(iter.index + 2); + in.toss(iter.index - total_bytes_str.len); return .{ - .header_bytes = header_bytes, + .header_bytes = header_bytes_str.ptr[0 .. header_bytes_str.len + 1], .@"pub" = .{ .subject = subject, .reply_to = reply_to, - .payload = in.take(total_bytes + 2) catch unreachable, + .payload = in.take(total_bytes + total_bytes_str.len + 4) catch unreachable, }, }; } @@ -599,16 +598,17 @@ test hpub { var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = " foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n" }, }); + const res = try hpub(&in.interface); try std.testing.expectEqualDeep( Message.HPub{ - .header_bytes = 22, + .header_bytes = "22 ", .@"pub" = .{ .subject = "foo", .reply_to = null, - .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n", + .payload = "33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n", }, }, - try hpub(&in.interface), + res, ); try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); } @@ -619,11 +619,11 @@ test hpub { }); try std.testing.expectEqualDeep( Message.HPub{ - .header_bytes = 22, + .header_bytes = "22 ", .@"pub" = .{ .subject = "foo", .reply_to = "reply", - .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n", + .payload = "33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n", }, }, try hpub(&in.interface), -- cgit