From fc68749669a3bd9e0530d5958b100262537f142a Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Thu, 1 Jan 2026 19:38:36 +0000 Subject: gracefully exit simplify code clean up dead code --- src/server/main.zig | 126 +++++++++++++++------------------------------------- 1 file changed, 37 insertions(+), 89 deletions(-) (limited to 'src/server/main.zig') diff --git a/src/server/main.zig b/src/server/main.zig index d8e8e61..dab0f0a 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -25,25 +25,45 @@ fn handleSigInt(sig: std.os.linux.SIG) callconv(.c) void { keep_running.store(false, .monotonic); } -pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void { +pub fn main(alloc: std.mem.Allocator, server_config: ServerInfo) !void { // Configure the signal action - // const act = std.posix.Sigaction{ - // .handler = .{ .handler = handleSigInt }, - // .mask = std.posix.sigemptyset(), - // .flags = 0, - // }; + const act = std.posix.Sigaction{ + .handler = .{ .handler = handleSigInt }, + .mask = std.posix.sigemptyset(), + .flags = 0, + }; - // // Register the handler for SIGINT (Ctrl+C) - // std.posix.sigaction(std.posix.SIG.INT, &act, null); + // Register the handler for SIGINT (Ctrl+C) + std.posix.sigaction(std.posix.SIG.INT, &act, null); - var server: Server = .{ - .info = server_config, - }; + { + var dba: std.heap.DebugAllocator(.{}) = .init; + dba.backing_allocator = alloc; + defer _ = dba.deinit(); + const gpa = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) dba.allocator() else alloc; + + var server: Server = .{ + .info = server_config, + }; + + var threaded: std.Io.Threaded = .init(gpa, .{}); + defer threaded.deinit(); + const io = threaded.io(); - var threaded: std.Io.Threaded = .init(gpa, .{}); - defer threaded.deinit(); - const io = threaded.io(); + var server_task = try io.concurrent(start, .{ &server, io, gpa }); + defer server_task.cancel(io) catch {}; + + while (keep_running.load(.monotonic)) { + try io.sleep(.fromMilliseconds(1), .awake); + } + + std.debug.print("\nShutting down...\n", .{}); + server_task.cancel(io) catch {}; + } + std.debug.print("Goodbye\n", .{}); +} +pub fn start(server: *Server, io: std.Io, gpa: std.mem.Allocator) !void { var tcp_server = try std.Io.net.IpAddress.listen(try std.Io.net.IpAddress.parse( server.info.host, server.info.port, @@ -52,46 +72,20 @@ pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void { var id: usize = 0; // Run until SIGINT is handled, then exit gracefully - while (keep_running.load(.monotonic)) : (id +%= 1) { + while (true) : (id +%= 1) { std.debug.print("in server loop\n", .{}); if (server.clients.contains(id)) continue; const stream = try tcp_server.accept(io); std.debug.print("accepted connection\n", .{}); - _ = io.concurrent(handleConnection, .{ &server, gpa, io, id, stream }) catch { + _ = io.concurrent(handleConnection, .{ server, gpa, io, id, stream }) catch { std.debug.print("could not start concurrent handler for {d}\n", .{id}); stream.close(io); }; } - - std.debug.print("Exiting gracefully\n", .{}); -} - -fn processMsgs(server: *Server, io: std.Io, alloc: std.mem.Allocator) void { - while (true) { - const msg = server.msg_queue.getOne(io) catch break; - defer msg.deinit(alloc); - - for (server.subscriptions.items) |subscription| { - if (subjectMatches(subscription.subject, msg.subject)) { - const client = server.clients.get(subscription.client_id) orelse { - std.debug.print("trying to publish to a client that no longer exists: {d}", .{subscription.client_id}); - continue; - }; - client.send(io, .{ .msg = .{ - .subject = msg.subject, - .sid = subscription.sid, - .reply_to = msg.reply_to, - .payload = msg.payload, - } }) catch continue; - } - } - } } fn addClient(server: *Server, allocator: std.mem.Allocator, id: usize, client: *Client) !void { - // server.clients.lockPointers(); try server.clients.put(allocator, id, client); - // server.clients.unlockPointers(); } fn removeClient(server: *Server, io: std.Io, allocator: std.mem.Allocator, id: usize) void { @@ -185,13 +179,6 @@ fn handleConnection( } } -// // Result is owned by the caller -// fn subscribers(server: *Server, gpa: std.mem.Allocator, subject: []const u8) []ClientState { -// var acc: std.ArrayList(ClientState) = .empty; - -// return acc.toOwnedSlice(); -// } - fn subjectMatches(expected: []const u8, actual: []const u8) bool { return std.mem.eql(u8, expected, actual); } @@ -200,7 +187,7 @@ fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_ errdefer { if (source_client.connect) |c| { if (c.verbose) { - source_client.send(io, .@"-err") catch {}; + source_client.send(io, .{ .@"-err" = "Slow Consumer" }) catch {}; } } } @@ -259,42 +246,3 @@ pub fn createId() []const u8 { pub fn createName() []const u8 { return "SERVERNAME"; } - -// TESTING - -// fn initTestServer() Server { -// return .{ -// .info = .{ -// .server_id = "ABCD", -// .server_name = "test server", -// .version = "0.1.2", -// .max_payload = 1234, -// }, -// }; -// } - -// fn initTestClient( -// io: std.Io, -// allocator: std.mem.Allocator, -// id: usize, -// data_from: []const u8, -// ) !struct { -// Client, -// *std.Io.Reader, -// *std.Io.Writer, -// } { -// return .init(io, allocator, id, .{}, in, out); -// } - -// test { -// const gpa = std.testing.allocator; -// const io = std.testing.io; - -// const server = initTestServer(); -// const client: Client = .init( -// io, -// gpa, -// 1, -// .{}, -// ); -// } -- cgit