summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/server/main.zig12
-rw-r--r--src/server/message_parser.zig12
2 files changed, 17 insertions, 7 deletions
diff --git a/src/server/main.zig b/src/server/main.zig
index aad842c..fca0dee 100644
--- a/src/server/main.zig
+++ b/src/server/main.zig
@@ -233,12 +233,12 @@ fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_
std.debug.print("trying to publish to a client that no longer exists: {d}\n", .{subscription.client_id});
continue;
};
- client.send(io, .{ .msg = .{
- .subject = try alloc.dupe(u8, msg.subject),
- .sid = try alloc.dupe(u8, subscription.sid),
- .reply_to = if (msg.reply_to) |r| try alloc.dupe(u8, r) else null,
- .payload = try alloc.dupe(u8, msg.payload),
- } }) catch continue;
+ client.send(io, .{
+ .msg = try msg.toMsg(alloc, subscription.sid),
+ }) catch |err| switch (err) {
+ error.Canceled => return err,
+ else => {},
+ };
}
}
if (source_client.connect) |c| {
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig
index bd14ec7..68e7a20 100644
--- a/src/server/message_parser.zig
+++ b/src/server/message_parser.zig
@@ -137,6 +137,16 @@ pub const Message = union(MessageType) {
alloc.free(self.payload);
if (self.reply_to) |r| alloc.free(r);
}
+
+ pub fn toMsg(self: Pub, alloc: std.mem.Allocator, sid: []const u8) !Msg {
+ const res: Msg = .{
+ .subject = self.subject,
+ .sid = sid,
+ .reply_to = self.reply_to,
+ .payload = self.payload,
+ };
+ return res.dupe(alloc);
+ }
};
pub const Sub = struct {
/// The subject name to subscribe to.
@@ -177,7 +187,7 @@ pub const Message = union(MessageType) {
errdefer alloc.free(res.subject);
res.sid = try alloc.dupe(u8, self.sid);
errdefer alloc.free(res.sid);
- res.reply_to = if (self.reply_to) |r| alloc.dupe(u8, r) else null;
+ res.reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null;
errdefer if (res.reply_to) |r| alloc.free(r);
res.payload = try alloc.dupe(u8, self.payload);
errdefer alloc.free(res.payload);