summaryrefslogtreecommitdiff
path: root/src/Server.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/Server.zig')
-rw-r--r--src/Server.zig66
1 files changed, 30 insertions, 36 deletions
diff --git a/src/Server.zig b/src/Server.zig
index b5f9ee9..4ae959f 100644
--- a/src/Server.zig
+++ b/src/Server.zig
@@ -281,9 +281,15 @@ fn publishMessage(
}
};
- _ = pub_or_hpub;
+ const hpubmsg = switch (pub_or_hpub) {
+ .@"pub" => {},
+ .hpub => try parse.hpub(source_client.from_client),
+ };
- const msg = try parse.@"pub"(source_client.from_client);
+ const msg: Message.Pub = switch (pub_or_hpub) {
+ .@"pub" => try parse.@"pub"(source_client.from_client),
+ .hpub => hpubmsg.@"pub",
+ };
// const subject = switch (pub_or_hpub) {
// .PUB => |pb| pb.subject,
@@ -297,6 +303,10 @@ fn publishMessage(
var published_queue_sub_idxs: ArrayList(usize) = .empty;
defer published_queue_sub_idxs.deinit(alloc);
+ var line_writer_allocating: std.Io.Writer.Allocating = .init(alloc);
+ defer line_writer_allocating.deinit();
+ var line_writer = &line_writer_allocating.writer;
+
subs: for (0..server.subscriptions.items.len) |i| {
const subscription = server.subscriptions.items[i];
if (subjectMatches(subscription.subject, msg.subject)) {
@@ -313,45 +323,29 @@ fn publishMessage(
try published_queue_sub_idxs.append(alloc, i);
}
- const m = msg.toMsg(subscription.sid);
- var msg_line_buf: [1024]u8 = undefined;
- var msg_line_writer: std.Io.Writer = .fixed(&msg_line_buf);
-
- // try self.to_client.print(
- // ,
-
- // );
- // try m.payload.write(self.to_client);
- // try self.to_client.print("\r\n", .{});
- try msg_line_writer.print(
- "MSG {s} {s} {s} {d}\r\n",
- .{
- m.subject,
- m.sid,
- m.reply_to orelse "",
- m.payload.len,
+ line_writer_allocating.clearRetainingCapacity();
+
+ switch (pub_or_hpub) {
+ .@"pub" => _ = try line_writer.write("MSG "),
+ .hpub => _ = try line_writer.write("HMSG "),
+ }
+ try line_writer.print("{s} {s} ", .{ msg.subject, subscription.sid });
+ if (msg.reply_to) |reply_to| {
+ try line_writer.print("{s} ", .{reply_to});
+ }
+ switch (pub_or_hpub) {
+ .hpub => {
+ try line_writer.print("{d} ", .{hpubmsg.header_bytes});
},
- );
+ else => {},
+ }
+ try line_writer.print("{d}\r\n", .{msg.payload.len});
try subscription.queue_lock.lock(io);
defer subscription.queue_lock.unlock(io);
- try subscription.queue.putAll(io, msg_line_writer.buffered());
- try subscription.queue.putAll(io, m.payload);
+ try subscription.queue.putAll(io, line_writer.buffered());
+ try subscription.queue.putAll(io, msg.payload);
try subscription.queue.putAll(io, "\r\n");
-
- // switch (msg) {
- // .PUB => |pb| {
- // try subscription.queue.putOne(io, .{
- // .MSG = try pb.toMsg(subscription.alloc, subscription.sid),
- // });
- // },
- // .HPUB => |hp| {
- // try subscription.queue.putOne(io, .{
- // .HMSG = try hp.toHMsg(subscription.alloc, subscription.sid),
- // });
- // },
- // else => unreachable,
- // }
}
}