summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-12 18:08:29 -0500
committerRobby Zambito <contact@robbyzambito.me>2026-01-12 18:08:38 -0500
commit9f690fe27a25dc36047e65e83c8a2f4c8372734b (patch)
tree21a15dea59d71dbb6f919502911522a91d1bcd9f
parent7bcc0c19aa713e2570d2c6b418f6ef27b79b8fda (diff)
Add a warning log when producers are stalled
-rw-r--r--src/Server.zig13
1 files changed, 12 insertions, 1 deletions
diff --git a/src/Server.zig b/src/Server.zig
index 7dc3783..25c938b 100644
--- a/src/Server.zig
+++ b/src/Server.zig
@@ -154,6 +154,8 @@ fn handleConnection(
) !void {
defer stream.close(io);
+ const clock: std.Io.Clock = .real;
+
var dba: std.heap.DebugAllocator(.{}) = .init;
dba.backing_allocator = server_allocator;
defer _ = dba.deinit();
@@ -208,20 +210,29 @@ fn handleConnection(
},
.PUB => {
@branchHint(.likely);
- // log.debug("received a pub msg", .{});
+ const before = try clock.now(io);
server.publishMessage(io, rand, server_allocator, msg_write_buf, &client, .@"pub") catch |err| switch (err) {
error.ReadFailed => return reader.err.?,
error.EndOfStream => return error.ClientDisconnected,
else => |e| return e,
};
+ const pub_ns: i64 = @intCast(before.durationTo(try clock.now(io)).toNanoseconds());
+ if (pub_ns > 5 * std.time.ns_per_ms) {
+ log.warn("Producer was stalled for a total of for {D}", .{pub_ns});
+ }
},
.HPUB => {
@branchHint(.likely);
+ const before = try clock.now(io);
server.publishMessage(io, rand, server_allocator, msg_write_buf, &client, .hpub) catch |err| switch (err) {
error.ReadFailed => return reader.err.?,
error.EndOfStream => return error.ClientDisconnected,
else => |e| return e,
};
+ const pub_ns: i64 = @intCast(before.durationTo(try clock.now(io)).toNanoseconds());
+ if (pub_ns > 5 * std.time.ns_per_ms) {
+ log.warn("Producer was stalled for a total of for {D}", .{pub_ns});
+ }
},
.SUB => {
server.subscribe(io, server_allocator, &client, id) catch |err| switch (err) {