From e50d53ee7ee044df88fa0b31a998ef17eb7e7efa Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Mon, 5 Jan 2026 00:23:43 -0500 Subject: Add Payload type stores short message buffers in a colocated array, overflowing to an allocated slice when needed. --- src/server/Client.zig | 10 +++--- src/server/Server.zig | 2 +- src/server/message_parser.zig | 75 +++++++++++++++++++++++++++++++++++-------- 3 files changed, 68 insertions(+), 19 deletions(-) (limited to 'src') diff --git a/src/server/Client.zig b/src/server/Client.zig index 8ff92e8..4eceb89 100644 --- a/src/server/Client.zig +++ b/src/server/Client.zig @@ -72,25 +72,27 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { switch (msg) { .MSG => |m| { try self.to_client.print( - "MSG {s} {s} {s} {d}\r\n{s}\r\n", + "MSG {s} {s} {s} {d}\r\n", .{ m.subject, m.sid, m.reply_to orelse "", m.payload.len, - m.payload, }, ); + try m.payload.write(self.to_client); + try self.to_client.print("\r\n", .{}); }, .HMSG => |hmsg| { - try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n{s}\r\n", .{ + try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{ hmsg.msg.subject, hmsg.msg.sid, hmsg.msg.reply_to orelse "", hmsg.header_bytes, hmsg.msg.payload.len, - hmsg.msg.payload, }); + try hmsg.msg.payload.write(self.to_client); + try self.to_client.print("\r\n", .{}); }, } } diff --git a/src/server/Server.zig b/src/server/Server.zig index 3fedfcb..6b1eda5 100644 --- a/src/server/Server.zig +++ b/src/server/Server.zig @@ -158,7 +158,7 @@ fn handleConnection( var recv_queue: Queue(Message) = .init(qbuf); defer recv_queue.close(io); - const mbuf: []Msgs = try server_allocator.alloc(Msgs, w_buf_size / (@sizeOf(Msgs) + 128)); + const mbuf: []Msgs = try server_allocator.alloc(Msgs, w_buf_size / @sizeOf(Msgs)); defer server_allocator.free(mbuf); var msgs_queue: Queue(Msgs) = .init(mbuf); defer { diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index ff1a573..81b3858 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -5,7 +5,8 @@ const ArrayList = std.ArrayList; const StaticStringMap = std.StaticStringMap; const Io = std.Io; -const AllocatingWriter = Io.Writer.Allocating; +const Writer = Io.Writer; +const AllocatingWriter = Writer.Allocating; const Reader = Io.Reader; const ascii = std.ascii; @@ -17,6 +18,53 @@ const parseUnsigned = std.fmt.parseUnsigned; const log = std.log; +pub const Payload = struct { + len: u32, + short: [128]u8, + long: ?[]u8, + + pub fn read(alloc: Allocator, in: *Reader, bytes: usize) !Payload { + var res: Payload = .{ + .len = @intCast(bytes), + .short = undefined, + .long = null, + }; + + try in.readSliceAll(res.short[0..@min(bytes, res.short.len)]); + if (bytes > res.short.len) { + const long = try alloc.alloc(u8, bytes - res.short.len); + errdefer alloc.free(long); + try in.readSliceAll(long); + res.long = long; + } + return res; + } + + pub fn write(self: Payload, out: *Writer) !void { + std.debug.assert(out.buffer.len >= self.short.len); + std.debug.assert(self.len <= self.short.len or self.long != null); + try out.writeAll(self.short[0..@min(self.len, self.short.len)]); + if (self.long) |l| { + try out.writeAll(l); + } + } + + pub fn deinit(self: Payload, alloc: Allocator) void { + if (self.long) |l| { + alloc.free(l); + } + } + + pub fn dupe(self: Payload, alloc: Allocator) !Payload { + var res = self; + if (self.long) |l| { + res.long = try alloc.dupe(u8, l); + } + errdefer if (res.long) |l| alloc.free(l); + return res; + } +}; + pub const MessageType = @typeInfo(Message).@"union".tag_type.?; pub const Message = union(enum) { @@ -118,11 +166,11 @@ pub const Message = union(enum) { /// The reply subject that subscribers can use to send a response back to the publisher/requestor. reply_to: ?[]const u8 = null, /// The message payload data. - payload: []const u8, + payload: Payload, pub fn deinit(self: Pub, alloc: Allocator) void { alloc.free(self.subject); - alloc.free(self.payload); + self.payload.deinit(alloc); if (self.reply_to) |r| alloc.free(r); } @@ -163,6 +211,7 @@ pub const Message = union(enum) { pub fn dupe(self: HMsg, alloc: Allocator) !HMsg { var res = self; res.msg = try self.msg.dupe(alloc); + errdefer alloc.free(res.msg); return res; } }; @@ -194,13 +243,13 @@ pub const Message = union(enum) { subject: []const u8, sid: []const u8, reply_to: ?[]const u8, - payload: []const u8, + payload: Payload, pub fn deinit(self: Msg, alloc: Allocator) void { alloc.free(self.subject); alloc.free(self.sid); if (self.reply_to) |r| alloc.free(r); - alloc.free(self.payload); + self.payload.deinit(alloc); } pub fn dupe(self: Msg, alloc: Allocator) !Msg { @@ -211,7 +260,7 @@ pub const Message = union(enum) { errdefer alloc.free(res.sid); res.reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null; errdefer if (res.reply_to) |r| alloc.free(r); - res.payload = try alloc.dupe(u8, self.payload); + res.payload = try self.payload.dupe(alloc); errdefer alloc.free(res.payload); return res; } @@ -613,8 +662,6 @@ fn parsePub(alloc: Allocator, in: *Reader) !Message { defer second.deinit(alloc); var third: ?ArrayList(u8) = null; defer if (third) |*t| t.deinit(alloc); - var payload: AllocatingWriter = .init(alloc); - errdefer payload.deinit(); sw: switch (@as(States, .before_second)) { .before_second => { @@ -671,13 +718,14 @@ fn parsePub(alloc: Allocator, in: *Reader) !Message { try parseUnsigned(usize, second.items, 10), }; - try in.streamExact(&payload.writer, bytes); + const payload: Payload = try .read(alloc, in, bytes); + errdefer payload.deinit(alloc); try expectStreamBytes(in, "\r\n"); return .{ .PUB = .{ .subject = subject, - .payload = try payload.toOwnedSlice(), + .payload = payload, .reply_to = reply_to, }, }; @@ -763,8 +811,6 @@ fn parseHPub(alloc: Allocator, in: *Reader) !Message { defer third.deinit(alloc); var fourth: ?ArrayList(u8) = null; defer if (fourth) |*f| f.deinit(alloc); - var payload: AllocatingWriter = .init(alloc); - errdefer payload.deinit(); sw: switch (@as(States, .before_second)) { .before_second => { @@ -843,7 +889,8 @@ fn parseHPub(alloc: Allocator, in: *Reader) !Message { try parseUnsigned(usize, third.items, 10), }; - try in.streamExact(&payload.writer, total_bytes); + const payload: Payload = try .read(alloc, in, total_bytes); + errdefer payload.deinit(alloc); try expectStreamBytes(in, "\r\n"); return .{ @@ -851,7 +898,7 @@ fn parseHPub(alloc: Allocator, in: *Reader) !Message { .header_bytes = header_bytes, .@"pub" = .{ .subject = subject, - .payload = try payload.toOwnedSlice(), + .payload = payload, .reply_to = reply_to, }, }, -- cgit