summaryrefslogtreecommitdiff
path: root/src/server/main.zig
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-01 19:38:36 +0000
committerRobby Zambito <contact@robbyzambito.me>2026-01-01 20:26:41 +0000
commitfc68749669a3bd9e0530d5958b100262537f142a (patch)
tree64d170e2b8f6e0190caf1ed1e2749cc043eed024 /src/server/main.zig
parent987dc492a6ad8e3b4bd2f369d676a2d588342543 (diff)
gracefully exit
simplify code clean up dead code
Diffstat (limited to 'src/server/main.zig')
-rw-r--r--src/server/main.zig126
1 files changed, 37 insertions, 89 deletions
diff --git a/src/server/main.zig b/src/server/main.zig
index d8e8e61..dab0f0a 100644
--- a/src/server/main.zig
+++ b/src/server/main.zig
@@ -25,25 +25,45 @@ fn handleSigInt(sig: std.os.linux.SIG) callconv(.c) void {
keep_running.store(false, .monotonic);
}
-pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void {
+pub fn main(alloc: std.mem.Allocator, server_config: ServerInfo) !void {
// Configure the signal action
- // const act = std.posix.Sigaction{
- // .handler = .{ .handler = handleSigInt },
- // .mask = std.posix.sigemptyset(),
- // .flags = 0,
- // };
+ const act = std.posix.Sigaction{
+ .handler = .{ .handler = handleSigInt },
+ .mask = std.posix.sigemptyset(),
+ .flags = 0,
+ };
- // // Register the handler for SIGINT (Ctrl+C)
- // std.posix.sigaction(std.posix.SIG.INT, &act, null);
+ // Register the handler for SIGINT (Ctrl+C)
+ std.posix.sigaction(std.posix.SIG.INT, &act, null);
- var server: Server = .{
- .info = server_config,
- };
+ {
+ var dba: std.heap.DebugAllocator(.{}) = .init;
+ dba.backing_allocator = alloc;
+ defer _ = dba.deinit();
+ const gpa = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) dba.allocator() else alloc;
+
+ var server: Server = .{
+ .info = server_config,
+ };
+
+ var threaded: std.Io.Threaded = .init(gpa, .{});
+ defer threaded.deinit();
+ const io = threaded.io();
- var threaded: std.Io.Threaded = .init(gpa, .{});
- defer threaded.deinit();
- const io = threaded.io();
+ var server_task = try io.concurrent(start, .{ &server, io, gpa });
+ defer server_task.cancel(io) catch {};
+
+ while (keep_running.load(.monotonic)) {
+ try io.sleep(.fromMilliseconds(1), .awake);
+ }
+
+ std.debug.print("\nShutting down...\n", .{});
+ server_task.cancel(io) catch {};
+ }
+ std.debug.print("Goodbye\n", .{});
+}
+pub fn start(server: *Server, io: std.Io, gpa: std.mem.Allocator) !void {
var tcp_server = try std.Io.net.IpAddress.listen(try std.Io.net.IpAddress.parse(
server.info.host,
server.info.port,
@@ -52,46 +72,20 @@ pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void {
var id: usize = 0;
// Run until SIGINT is handled, then exit gracefully
- while (keep_running.load(.monotonic)) : (id +%= 1) {
+ while (true) : (id +%= 1) {
std.debug.print("in server loop\n", .{});
if (server.clients.contains(id)) continue;
const stream = try tcp_server.accept(io);
std.debug.print("accepted connection\n", .{});
- _ = io.concurrent(handleConnection, .{ &server, gpa, io, id, stream }) catch {
+ _ = io.concurrent(handleConnection, .{ server, gpa, io, id, stream }) catch {
std.debug.print("could not start concurrent handler for {d}\n", .{id});
stream.close(io);
};
}
-
- std.debug.print("Exiting gracefully\n", .{});
-}
-
-fn processMsgs(server: *Server, io: std.Io, alloc: std.mem.Allocator) void {
- while (true) {
- const msg = server.msg_queue.getOne(io) catch break;
- defer msg.deinit(alloc);
-
- for (server.subscriptions.items) |subscription| {
- if (subjectMatches(subscription.subject, msg.subject)) {
- const client = server.clients.get(subscription.client_id) orelse {
- std.debug.print("trying to publish to a client that no longer exists: {d}", .{subscription.client_id});
- continue;
- };
- client.send(io, .{ .msg = .{
- .subject = msg.subject,
- .sid = subscription.sid,
- .reply_to = msg.reply_to,
- .payload = msg.payload,
- } }) catch continue;
- }
- }
- }
}
fn addClient(server: *Server, allocator: std.mem.Allocator, id: usize, client: *Client) !void {
- // server.clients.lockPointers();
try server.clients.put(allocator, id, client);
- // server.clients.unlockPointers();
}
fn removeClient(server: *Server, io: std.Io, allocator: std.mem.Allocator, id: usize) void {
@@ -185,13 +179,6 @@ fn handleConnection(
}
}
-// // Result is owned by the caller
-// fn subscribers(server: *Server, gpa: std.mem.Allocator, subject: []const u8) []ClientState {
-// var acc: std.ArrayList(ClientState) = .empty;
-
-// return acc.toOwnedSlice();
-// }
-
fn subjectMatches(expected: []const u8, actual: []const u8) bool {
return std.mem.eql(u8, expected, actual);
}
@@ -200,7 +187,7 @@ fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_
errdefer {
if (source_client.connect) |c| {
if (c.verbose) {
- source_client.send(io, .@"-err") catch {};
+ source_client.send(io, .{ .@"-err" = "Slow Consumer" }) catch {};
}
}
}
@@ -259,42 +246,3 @@ pub fn createId() []const u8 {
pub fn createName() []const u8 {
return "SERVERNAME";
}
-
-// TESTING
-
-// fn initTestServer() Server {
-// return .{
-// .info = .{
-// .server_id = "ABCD",
-// .server_name = "test server",
-// .version = "0.1.2",
-// .max_payload = 1234,
-// },
-// };
-// }
-
-// fn initTestClient(
-// io: std.Io,
-// allocator: std.mem.Allocator,
-// id: usize,
-// data_from: []const u8,
-// ) !struct {
-// Client,
-// *std.Io.Reader,
-// *std.Io.Writer,
-// } {
-// return .init(io, allocator, id, .{}, in, out);
-// }
-
-// test {
-// const gpa = std.testing.allocator;
-// const io = std.testing.io;
-
-// const server = initTestServer();
-// const client: Client = .init(
-// io,
-// gpa,
-// 1,
-// .{},
-// );
-// }