diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-12 18:08:29 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-12 18:08:38 -0500 |
| commit | 9f690fe27a25dc36047e65e83c8a2f4c8372734b (patch) | |
| tree | 21a15dea59d71dbb6f919502911522a91d1bcd9f | |
| parent | 7bcc0c19aa713e2570d2c6b418f6ef27b79b8fda (diff) | |
Add a warning log when producers are stalled
| -rw-r--r-- | src/Server.zig | 13 |
1 files changed, 12 insertions, 1 deletions
diff --git a/src/Server.zig b/src/Server.zig index 7dc3783..25c938b 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -154,6 +154,8 @@ fn handleConnection( ) !void { defer stream.close(io); + const clock: std.Io.Clock = .real; + var dba: std.heap.DebugAllocator(.{}) = .init; dba.backing_allocator = server_allocator; defer _ = dba.deinit(); @@ -208,20 +210,29 @@ fn handleConnection( }, .PUB => { @branchHint(.likely); - // log.debug("received a pub msg", .{}); + const before = try clock.now(io); server.publishMessage(io, rand, server_allocator, msg_write_buf, &client, .@"pub") catch |err| switch (err) { error.ReadFailed => return reader.err.?, error.EndOfStream => return error.ClientDisconnected, else => |e| return e, }; + const pub_ns: i64 = @intCast(before.durationTo(try clock.now(io)).toNanoseconds()); + if (pub_ns > 5 * std.time.ns_per_ms) { + log.warn("Producer was stalled for a total of for {D}", .{pub_ns}); + } }, .HPUB => { @branchHint(.likely); + const before = try clock.now(io); server.publishMessage(io, rand, server_allocator, msg_write_buf, &client, .hpub) catch |err| switch (err) { error.ReadFailed => return reader.err.?, error.EndOfStream => return error.ClientDisconnected, else => |e| return e, }; + const pub_ns: i64 = @intCast(before.durationTo(try clock.now(io)).toNanoseconds()); + if (pub_ns > 5 * std.time.ns_per_ms) { + log.warn("Producer was stalled for a total of for {D}", .{pub_ns}); + } }, .SUB => { server.subscribe(io, server_allocator, &client, id) catch |err| switch (err) { |
