diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-08 11:48:12 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-08 16:47:52 -0500 |
| commit | d8488fde4902565f4ac8519565f234918dab6b11 (patch) | |
| tree | b50e523a32f4ccab4c5ec093a5dce24250f5b7b8 /src/Server.zig | |
| parent | 45feccbad8c7306c15137a6003f3df1183d9c2a9 (diff) | |
support hpub
fixed issue where not all data was being sent
request reply has a performance issue but technically works
Diffstat (limited to 'src/Server.zig')
| -rw-r--r-- | src/Server.zig | 66 |
1 files changed, 30 insertions, 36 deletions
diff --git a/src/Server.zig b/src/Server.zig index b5f9ee9..4ae959f 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -281,9 +281,15 @@ fn publishMessage( } }; - _ = pub_or_hpub; + const hpubmsg = switch (pub_or_hpub) { + .@"pub" => {}, + .hpub => try parse.hpub(source_client.from_client), + }; - const msg = try parse.@"pub"(source_client.from_client); + const msg: Message.Pub = switch (pub_or_hpub) { + .@"pub" => try parse.@"pub"(source_client.from_client), + .hpub => hpubmsg.@"pub", + }; // const subject = switch (pub_or_hpub) { // .PUB => |pb| pb.subject, @@ -297,6 +303,10 @@ fn publishMessage( var published_queue_sub_idxs: ArrayList(usize) = .empty; defer published_queue_sub_idxs.deinit(alloc); + var line_writer_allocating: std.Io.Writer.Allocating = .init(alloc); + defer line_writer_allocating.deinit(); + var line_writer = &line_writer_allocating.writer; + subs: for (0..server.subscriptions.items.len) |i| { const subscription = server.subscriptions.items[i]; if (subjectMatches(subscription.subject, msg.subject)) { @@ -313,45 +323,29 @@ fn publishMessage( try published_queue_sub_idxs.append(alloc, i); } - const m = msg.toMsg(subscription.sid); - var msg_line_buf: [1024]u8 = undefined; - var msg_line_writer: std.Io.Writer = .fixed(&msg_line_buf); - - // try self.to_client.print( - // , - - // ); - // try m.payload.write(self.to_client); - // try self.to_client.print("\r\n", .{}); - try msg_line_writer.print( - "MSG {s} {s} {s} {d}\r\n", - .{ - m.subject, - m.sid, - m.reply_to orelse "", - m.payload.len, + line_writer_allocating.clearRetainingCapacity(); + + switch (pub_or_hpub) { + .@"pub" => _ = try line_writer.write("MSG "), + .hpub => _ = try line_writer.write("HMSG "), + } + try line_writer.print("{s} {s} ", .{ msg.subject, subscription.sid }); + if (msg.reply_to) |reply_to| { + try line_writer.print("{s} ", .{reply_to}); + } + switch (pub_or_hpub) { + .hpub => { + try line_writer.print("{d} ", .{hpubmsg.header_bytes}); }, - ); + else => {}, + } + try line_writer.print("{d}\r\n", .{msg.payload.len}); try subscription.queue_lock.lock(io); defer subscription.queue_lock.unlock(io); - try subscription.queue.putAll(io, msg_line_writer.buffered()); - try subscription.queue.putAll(io, m.payload); + try subscription.queue.putAll(io, line_writer.buffered()); + try subscription.queue.putAll(io, msg.payload); try subscription.queue.putAll(io, "\r\n"); - - // switch (msg) { - // .PUB => |pb| { - // try subscription.queue.putOne(io, .{ - // .MSG = try pb.toMsg(subscription.alloc, subscription.sid), - // }); - // }, - // .HPUB => |hp| { - // try subscription.queue.putOne(io, .{ - // .HMSG = try hp.toHMsg(subscription.alloc, subscription.sid), - // }); - // }, - // else => unreachable, - // } } } |
