From f554e7a3bb472c2a8b9e123a7f8ca19a036ba4ac Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Sat, 24 Jan 2026 21:31:33 -0500 Subject: Kill process after 10 messages or 3 seconds --- build.zig.zon | 2 +- src/main.zig | 61 +++++++++++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 48 insertions(+), 15 deletions(-) diff --git a/build.zig.zon b/build.zig.zon index 4fa51c1..2eec9b7 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -10,7 +10,7 @@ // This is a [Semantic Version](https://semver.org/). // In a future version of Zig it will be used for package deduplication. - .version = "0.0.0", + .version = "0.1.0", // Together with name, this represents a globally unique package // identifier. This field is generated by the Zig toolchain when the diff --git a/src/main.zig b/src/main.zig index 5c4f609..07fe9e2 100644 --- a/src/main.zig +++ b/src/main.zig @@ -157,35 +157,59 @@ pub fn main(init: std.process.Init) !void { continue; }; - const child = std.process.spawn(init.io, .{ + var child = std.process.spawn(init.io, .{ .argv = &.{ "bash", "-c", connection_payload }, .stdout = .pipe, - .stderr = .pipe, + .stderr = .ignore, + .stdin = .ignore, }) catch continue; - var child_stdout: std.ArrayList(u8) = .empty; - defer child_stdout.deinit(init.gpa); - var child_stderr: std.ArrayList(u8) = .empty; - defer child_stderr.deinit(init.gpa); + var child_output_buf: [SaprusClient.max_payload_len]u8 = undefined; + var child_output_reader = child.stdout.?.reader(init.io, &child_output_buf); - child.collectOutput(init.gpa, &child_stdout, &child_stderr, std.math.maxInt(usize)) catch |err| { - log.debug("Failed to collect output: {t}", .{err}); - continue; - }; + var is_killed: std.atomic.Value(bool) = .init(false); + + var kill_task = try init.io.concurrent(killProcessAfter, .{ init.io, &child, .fromSeconds(3), &is_killed }); + defer _ = kill_task.cancel(init.io) catch {}; var cmd_output_buf: [SaprusClient.max_payload_len * 2]u8 = undefined; var cmd_output: Writer = .fixed(&cmd_output_buf); - var cmd_output_window_iter = std.mem.window(u8, child_stdout.items, SaprusClient.max_payload_len, SaprusClient.max_payload_len); - while (cmd_output_window_iter.next()) |chunk| { + // Maximum of 10 messages of output per command + for (0..10) |_| { cmd_output.end = 0; - // Unreachable because the cmd_output_buf is twice the size of the chunk. - cmd_output.print("{b64}", .{chunk}) catch unreachable; + + child_output_reader.interface.fill(child_output_reader.interface.buffer.len) catch |err| switch (err) { + error.ReadFailed => continue :next_message, // TODO: check if there is a better way to handle this + error.EndOfStream => { + cmd_output.print("{b64}", .{child_output_reader.interface.buffered()}) catch unreachable; + if (cmd_output.end > 0) { + connection.send(init.io, cmd_output.buffered()) catch |e| { + log.debug("Failed to send connection chunk: {t}", .{e}); + continue :next_message; + }; + } + break; + }, + }; + cmd_output.print("{b64}", .{try child_output_reader.interface.takeArray(child_output_buf.len)}) catch unreachable; connection.send(init.io, cmd_output.buffered()) catch |err| { log.debug("Failed to send connection chunk: {t}", .{err}); continue :next_message; }; try init.io.sleep(.fromMilliseconds(40), .boot); + } else { + kill_task.cancel(init.io) catch {}; + killProcessAfter(init.io, &child, .zero, &is_killed) catch |err| { + log.debug("Failed to kill process??? {t}", .{err}); + continue :next_message; + }; + } + + if (!is_killed.load(.monotonic)) { + _ = child.wait(init.io) catch |err| { + log.debug("Failed to wait for child: {t}", .{err}); + }; } } } @@ -194,6 +218,15 @@ pub fn main(init: std.process.Init) !void { unreachable; } +fn killProcessAfter(io: std.Io, proc: *std.process.Child, duration: std.Io.Duration, is_killed: *std.atomic.Value(bool)) !void { + io.sleep(duration, .boot) catch |err| switch (err) { + error.Canceled => return, + else => |e| return e, + }; + is_killed.store(true, .monotonic); + proc.kill(io); +} + fn parseDest(in: ?[]const u8) [4]u8 { if (in) |dest| { if (dest.len <= 4) { -- cgit