summaryrefslogtreecommitdiff
path: root/src/Server
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-09 23:49:10 -0500
committerRobby Zambito <contact@robbyzambito.me>2026-01-10 09:21:18 -0500
commitc9b1cbfa35f1486e6c091123e595959c0b748233 (patch)
treed855156f2458d75b73492bfdd6a99d498dd7bcdf /src/Server
parent78b23ee59c6df29b043df565111f54afd963a61c (diff)
starting minimizing the number of queue putspush-zvnkkqszuxxp
Diffstat (limited to 'src/Server')
-rw-r--r--src/Server/message.zig4
-rw-r--r--src/Server/parse.zig52
2 files changed, 28 insertions, 28 deletions
diff --git a/src/Server/message.zig b/src/Server/message.zig
index 8005453..0b02892 100644
--- a/src/Server/message.zig
+++ b/src/Server/message.zig
@@ -120,7 +120,7 @@ pub const Message = union(enum) {
}
};
pub const HPub = struct {
- header_bytes: usize,
+ header_bytes: []const u8,
@"pub": Pub,
pub fn deinit(self: HPub, alloc: Allocator) void {
@@ -136,7 +136,7 @@ pub const Message = union(enum) {
};
pub const HMsg = struct {
- header_bytes: usize,
+ header_bytes: []const u8,
msg: Msg,
pub fn deinit(self: HMsg, alloc: Allocator) void {
diff --git a/src/Server/parse.zig b/src/Server/parse.zig
index cda5985..46dc239 100644
--- a/src/Server/parse.zig
+++ b/src/Server/parse.zig
@@ -145,21 +145,22 @@ pub fn @"pub"(in: *Reader) Error!Message.Pub {
try in.fill(iter.index + bytes + "\r\n".len + "\r\n".len);
continue;
}
- in.toss(iter.index + "\r\n".len);
+ in.toss(iter.index - second.len);
return .{
.subject = subject,
.reply_to = null,
- .payload = in.take(bytes + 2) catch unreachable,
+ .payload = in.take(bytes + 5) catch unreachable,
};
}
switch (in.buffered()[iter.index]) {
'\t', ' ' => {
const reply_to = second;
- const bytes = parseUnsigned(usize, iter.next() orelse {
+ const bytes_str = iter.next() orelse {
try in.fillMore();
continue;
- }, 10) catch return error.InvalidStream;
+ };
+ const bytes = parseUnsigned(usize, bytes_str, 10) catch return error.InvalidStream;
if (in.buffered().len == iter.index or in.buffered()[iter.index] != '\r') {
try in.fillMore();
@@ -171,11 +172,11 @@ pub fn @"pub"(in: *Reader) Error!Message.Pub {
continue;
}
- in.toss(iter.index + "\r\n".len);
+ in.toss(iter.index - bytes_str.len);
return .{
.subject = subject,
.reply_to = reply_to,
- .payload = in.take(bytes + 2) catch unreachable,
+ .payload = in.take(bytes + 5) catch unreachable,
};
},
else => {},
@@ -195,7 +196,7 @@ test @"pub" {
Message.Pub{
.subject = "foo",
.reply_to = "bar",
- .payload = "hi\r\n",
+ .payload = "2\r\nhi\r\n",
},
try @"pub"(&in.interface),
);
@@ -210,7 +211,7 @@ test @"pub" {
Message.Pub{
.subject = "foo",
.reply_to = null,
- .payload = "hi\r\n",
+ .payload = "2\r\nhi\r\n",
},
try @"pub"(&in.interface),
);
@@ -226,7 +227,7 @@ test @"pub" {
Message.Pub{
.subject = "foo",
.reply_to = null,
- .payload = "hi\r\n",
+ .payload = "2\r\nhi\r\n",
},
try @"pub"(&in.interface),
);
@@ -243,7 +244,7 @@ test @"pub" {
Message.Pub{
.subject = "foo",
.reply_to = null,
- .payload = "hi\r\n",
+ .payload = "2\r\nhi\r\n",
},
try @"pub"(&in.interface),
);
@@ -261,7 +262,7 @@ test @"pub" {
Message.Pub{
.subject = "foo",
.reply_to = null,
- .payload = "hi\r\n",
+ .payload = "2\r\nhi\r\n",
},
try @"pub"(&in.interface),
);
@@ -540,7 +541,7 @@ pub fn hpub(in: *Reader) Error!Message.HPub {
const header_bytes_str = second;
const total_bytes_str = third;
- const header_bytes = parseUnsigned(usize, header_bytes_str, 10) catch return error.InvalidStream;
+ _ = parseUnsigned(usize, header_bytes_str, 10) catch return error.InvalidStream;
const total_bytes = parseUnsigned(usize, total_bytes_str, 10) catch return error.InvalidStream;
if (in.buffered().len < iter.index + total_bytes + 4) {
@@ -548,14 +549,13 @@ pub fn hpub(in: *Reader) Error!Message.HPub {
continue;
}
- // 4 bytes for CRLF on either side of headers and payload.
- in.toss(iter.index + 2);
+ in.toss(iter.index - total_bytes_str.len);
return .{
- .header_bytes = header_bytes,
+ .header_bytes = header_bytes_str.ptr[0 .. header_bytes_str.len + 1],
.@"pub" = .{
.subject = subject,
.reply_to = null,
- .payload = in.take(total_bytes + 2) catch unreachable,
+ .payload = in.take(total_bytes + total_bytes_str.len + 4) catch unreachable,
},
};
}
@@ -565,7 +565,7 @@ pub fn hpub(in: *Reader) Error!Message.HPub {
const header_bytes_str = third;
if (iter.next()) |total_bytes_str| {
if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') {
- const header_bytes = parseUnsigned(usize, header_bytes_str, 10) catch return error.InvalidStream;
+ _ = parseUnsigned(usize, header_bytes_str, 10) catch return error.InvalidStream;
const total_bytes = parseUnsigned(usize, total_bytes_str, 10) catch return error.InvalidStream;
if (in.buffered().len < iter.index + total_bytes + 4) {
@@ -573,14 +573,13 @@ pub fn hpub(in: *Reader) Error!Message.HPub {
continue;
}
- // 4 bytes for CRLF on either side of headers and payload.
- in.toss(iter.index + 2);
+ in.toss(iter.index - total_bytes_str.len);
return .{
- .header_bytes = header_bytes,
+ .header_bytes = header_bytes_str.ptr[0 .. header_bytes_str.len + 1],
.@"pub" = .{
.subject = subject,
.reply_to = reply_to,
- .payload = in.take(total_bytes + 2) catch unreachable,
+ .payload = in.take(total_bytes + total_bytes_str.len + 4) catch unreachable,
},
};
}
@@ -599,16 +598,17 @@ test hpub {
var in: std.testing.Reader = .init(&buf, &.{
.{ .buffer = " foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n" },
});
+ const res = try hpub(&in.interface);
try std.testing.expectEqualDeep(
Message.HPub{
- .header_bytes = 22,
+ .header_bytes = "22 ",
.@"pub" = .{
.subject = "foo",
.reply_to = null,
- .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n",
+ .payload = "33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n",
},
},
- try hpub(&in.interface),
+ res,
);
try std.testing.expectEqualSlices(u8, "", in.interface.buffered());
}
@@ -619,11 +619,11 @@ test hpub {
});
try std.testing.expectEqualDeep(
Message.HPub{
- .header_bytes = 22,
+ .header_bytes = "22 ",
.@"pub" = .{
.subject = "foo",
.reply_to = "reply",
- .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n",
+ .payload = "33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n",
},
},
try hpub(&in.interface),