diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-01 19:38:36 +0000 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-01 20:26:41 +0000 |
| commit | fc68749669a3bd9e0530d5958b100262537f142a (patch) | |
| tree | 64d170e2b8f6e0190caf1ed1e2749cc043eed024 /src | |
| parent | 987dc492a6ad8e3b4bd2f369d676a2d588342543 (diff) | |
gracefully exit
simplify code
clean up dead code
Diffstat (limited to 'src')
| -rw-r--r-- | src/server/client.zig | 57 | ||||
| -rw-r--r-- | src/server/main.zig | 126 | ||||
| -rw-r--r-- | src/server/message_parser.zig | 137 |
3 files changed, 92 insertions, 228 deletions
diff --git a/src/server/client.zig b/src/server/client.zig index f74e6b3..5c8d0cd 100644 --- a/src/server/client.zig +++ b/src/server/client.zig @@ -32,17 +32,31 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io for (msgs[0..len]) |msg| { switch (msg) { .@"+ok" => { - try writeOk(self.to_client); + _ = try self.to_client.write("+OK\r\n"); }, .pong => { - try writePong(self.to_client); + _ = try self.to_client.write("PONG\r\n"); }, .info => |info| { - try writeInfo(self.to_client, info); + _ = try self.to_client.write("INFO "); + try std.json.Stringify.value(info, .{}, self.to_client); + _ = try self.to_client.write("\r\n"); }, .msg => |m| { defer m.deinit(alloc); - try writeMsg(self.to_client, m); + try self.to_client.print( + "MSG {s} {s} {s} {d}\r\n{s}\r\n", + .{ + m.subject, + m.sid, + m.reply_to orelse "", + m.payload.len, + m.payload, + }, + ); + }, + .@"-err" => |s| { + _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); }, else => |m| { std.debug.panic("unimplemented write: {any}\n", .{m}); @@ -60,42 +74,7 @@ pub fn send(self: *Client, io: std.Io, msg: Message) !void { } pub fn next(self: *Client, allocator: std.mem.Allocator) !Message { - // std.debug.print("in client awaiting next message\n", .{}); - // errdefer std.debug.print("actually it was canceled\n", .{}); - // defer std.debug.print("client returning next message!\n", .{}); return Message.next(allocator, self.from_client); - // return self.send_queue.getOne(io); -} - -fn writeOk(out: *std.Io.Writer) !void { - _ = try out.write("+OK\r\n"); -} - -fn writeErr(out: *std.Io.Writer, msg: []const u8) !void { - _ = try out.print("-ERR '{s}'\r\n", .{msg}); -} - -fn writePong(out: *std.Io.Writer) !void { - _ = try out.write("PONG\r\n"); -} - -pub fn writeInfo(out: *std.Io.Writer, info: Message.ServerInfo) !void { - _ = try out.write("INFO "); - try std.json.Stringify.value(info, .{}, out); - _ = try out.write("\r\n"); -} - -fn writeMsg(out: *std.Io.Writer, msg: Message.Msg) !void { - try out.print( - "MSG {s} {s} {s} {d}\r\n{s}\r\n", - .{ - msg.subject, - msg.sid, - msg.reply_to orelse "", - msg.payload.len, - msg.payload, - }, - ); } test { diff --git a/src/server/main.zig b/src/server/main.zig index d8e8e61..dab0f0a 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -25,25 +25,45 @@ fn handleSigInt(sig: std.os.linux.SIG) callconv(.c) void { keep_running.store(false, .monotonic); } -pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void { +pub fn main(alloc: std.mem.Allocator, server_config: ServerInfo) !void { // Configure the signal action - // const act = std.posix.Sigaction{ - // .handler = .{ .handler = handleSigInt }, - // .mask = std.posix.sigemptyset(), - // .flags = 0, - // }; + const act = std.posix.Sigaction{ + .handler = .{ .handler = handleSigInt }, + .mask = std.posix.sigemptyset(), + .flags = 0, + }; - // // Register the handler for SIGINT (Ctrl+C) - // std.posix.sigaction(std.posix.SIG.INT, &act, null); + // Register the handler for SIGINT (Ctrl+C) + std.posix.sigaction(std.posix.SIG.INT, &act, null); - var server: Server = .{ - .info = server_config, - }; + { + var dba: std.heap.DebugAllocator(.{}) = .init; + dba.backing_allocator = alloc; + defer _ = dba.deinit(); + const gpa = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) dba.allocator() else alloc; + + var server: Server = .{ + .info = server_config, + }; + + var threaded: std.Io.Threaded = .init(gpa, .{}); + defer threaded.deinit(); + const io = threaded.io(); - var threaded: std.Io.Threaded = .init(gpa, .{}); - defer threaded.deinit(); - const io = threaded.io(); + var server_task = try io.concurrent(start, .{ &server, io, gpa }); + defer server_task.cancel(io) catch {}; + + while (keep_running.load(.monotonic)) { + try io.sleep(.fromMilliseconds(1), .awake); + } + + std.debug.print("\nShutting down...\n", .{}); + server_task.cancel(io) catch {}; + } + std.debug.print("Goodbye\n", .{}); +} +pub fn start(server: *Server, io: std.Io, gpa: std.mem.Allocator) !void { var tcp_server = try std.Io.net.IpAddress.listen(try std.Io.net.IpAddress.parse( server.info.host, server.info.port, @@ -52,46 +72,20 @@ pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void { var id: usize = 0; // Run until SIGINT is handled, then exit gracefully - while (keep_running.load(.monotonic)) : (id +%= 1) { + while (true) : (id +%= 1) { std.debug.print("in server loop\n", .{}); if (server.clients.contains(id)) continue; const stream = try tcp_server.accept(io); std.debug.print("accepted connection\n", .{}); - _ = io.concurrent(handleConnection, .{ &server, gpa, io, id, stream }) catch { + _ = io.concurrent(handleConnection, .{ server, gpa, io, id, stream }) catch { std.debug.print("could not start concurrent handler for {d}\n", .{id}); stream.close(io); }; } - - std.debug.print("Exiting gracefully\n", .{}); -} - -fn processMsgs(server: *Server, io: std.Io, alloc: std.mem.Allocator) void { - while (true) { - const msg = server.msg_queue.getOne(io) catch break; - defer msg.deinit(alloc); - - for (server.subscriptions.items) |subscription| { - if (subjectMatches(subscription.subject, msg.subject)) { - const client = server.clients.get(subscription.client_id) orelse { - std.debug.print("trying to publish to a client that no longer exists: {d}", .{subscription.client_id}); - continue; - }; - client.send(io, .{ .msg = .{ - .subject = msg.subject, - .sid = subscription.sid, - .reply_to = msg.reply_to, - .payload = msg.payload, - } }) catch continue; - } - } - } } fn addClient(server: *Server, allocator: std.mem.Allocator, id: usize, client: *Client) !void { - // server.clients.lockPointers(); try server.clients.put(allocator, id, client); - // server.clients.unlockPointers(); } fn removeClient(server: *Server, io: std.Io, allocator: std.mem.Allocator, id: usize) void { @@ -185,13 +179,6 @@ fn handleConnection( } } -// // Result is owned by the caller -// fn subscribers(server: *Server, gpa: std.mem.Allocator, subject: []const u8) []ClientState { -// var acc: std.ArrayList(ClientState) = .empty; - -// return acc.toOwnedSlice(); -// } - fn subjectMatches(expected: []const u8, actual: []const u8) bool { return std.mem.eql(u8, expected, actual); } @@ -200,7 +187,7 @@ fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_ errdefer { if (source_client.connect) |c| { if (c.verbose) { - source_client.send(io, .@"-err") catch {}; + source_client.send(io, .{ .@"-err" = "Slow Consumer" }) catch {}; } } } @@ -259,42 +246,3 @@ pub fn createId() []const u8 { pub fn createName() []const u8 { return "SERVERNAME"; } - -// TESTING - -// fn initTestServer() Server { -// return .{ -// .info = .{ -// .server_id = "ABCD", -// .server_name = "test server", -// .version = "0.1.2", -// .max_payload = 1234, -// }, -// }; -// } - -// fn initTestClient( -// io: std.Io, -// allocator: std.mem.Allocator, -// id: usize, -// data_from: []const u8, -// ) !struct { -// Client, -// *std.Io.Reader, -// *std.Io.Writer, -// } { -// return .init(io, allocator, id, .{}, in, out); -// } - -// test { -// const gpa = std.testing.allocator; -// const io = std.testing.io; - -// const server = initTestServer(); -// const client: Client = .init( -// io, -// gpa, -// 1, -// .{}, -// ); -// } diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index b156dd6..6561c4b 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -43,7 +43,7 @@ pub const Message = union(MessageType) { ping, pong, @"+ok": void, - @"-err": void, + @"-err": []const u8, pub const ServerInfo = struct { /// The unique identifier of the NATS server. server_id: []const u8, @@ -416,34 +416,6 @@ fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 { return subject_list.toOwnedSlice(alloc); } -fn parseJsonMessage(T: type, alloc: std.mem.Allocator, in: *std.Io.Reader) !T { - var reader: std.json.Reader = .init(alloc, in); - return std.json.innerParse(T, alloc, &reader, .{ - .max_value_len = std.json.default_max_value_len, - .allocate = .alloc_always, - }); -} - -fn parsePub(in: *std.Io.Reader) !Message.Pub { - const subject = (try in.takeDelimiter(' ')) orelse return error.EndOfStream; - const next = (try in.takeDelimiter('\r')) orelse return error.EndOfStream; - var reply_to: ?[]const u8 = null; - const bytes = std.fmt.parseUnsigned(usize, next, 10) catch blk: { - reply_to = next; - break :blk try std.fmt.parseUnsigned(usize, (try in.takeDelimiter(' ')) orelse return error.EndOfStream, 10); - }; - - in.toss(1); // LF - const payload = try in.take(bytes); - - return .{ - .subject = subject, - .reply_to = reply_to, - .bytes = bytes, - .payload = payload, - }; -} - inline fn expectStreamBytes(reader: *std.Io.Reader, expected: []const u8) !void { if (!std.mem.eql(u8, try reader.take(expected.len), expected)) { @branchHint(.unlikely); @@ -451,74 +423,39 @@ inline fn expectStreamBytes(reader: *std.Io.Reader, expected: []const u8) !void } } -// try returning error in debug mode, only null in release? -// pub fn parseNextMessage(alloc: std.mem.Allocator, in: *std.Io.Reader) ?Message { -// const message_type: MessageType = blk: { -// var word: ["CONNECT".len]u8 = undefined; -// var len: usize = 0; -// for (&word, 0..) |*b, i| { -// const byte = in.takeByte() catch return null; -// if (std.ascii.isUpper(byte)) { -// b.* = byte; -// len = i + 1; -// } else break; -// } - -// break :blk Message.parse(word[0..len]) orelse return null; -// }; - -// // defer in.toss(2); // CRLF -// return switch (message_type) { -// .connect => blk: { -// const value: ?Message = .{ .connect = parseJsonMessage(Message.Connect, alloc, in) catch return null }; - -// break :blk value; -// }, -// .@"pub" => .{ .@"pub" = parsePub(in) catch |err| std.debug.panic("{}", .{err}) }, -// .ping => .ping, -// else => null, -// }; -// } - -// test parseNextMessage { -// const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\nPUB hi 3\r\nfoo\r\n"; -// var reader: std.Io.Reader = .fixed(input); -// var arena: std.heap.ArenaAllocator = .init(std.testing.allocator); -// defer arena.deinit(); -// const gpa = arena.allocator(); - -// { -// const msg: Message = try Message.next(gpa, &reader); -// const expected: Message = .{ .connect = .{ -// .connect = .{ -// .verbose = false, -// .pedantic = false, -// .tls_required = false, -// .name = try gpa.dupe(u8, "NATS CLI Version v0.2.4"), -// .lang = try gpa.dupe(u8, "go"), -// .version = try gpa.dupe(u8, "1.43.0"), -// .protocol = 1, -// .echo = true, -// .headers = true, -// .no_responders = true, -// }, -// .allocator = arena, -// } }; - -// try std.testing.expectEqualDeep(expected, msg); -// } -// { -// const msg: Message = try Message.next(gpa, &reader); -// const expected: Message = .{ .@"pub" = .{ -// .subject = "hi", -// .payload = "foo", -// } }; -// try std.testing.expectEqualDeep(expected, msg); -// } -// } - -// test "MessageType.parse performance" { -// // Measure perf for parseMemEql -// // Measure perf for parseStaticStringMap -// // assert parse = fastest perf -// } +test "parsing a stream" { + const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":fa" ++ + "lse,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43" ++ + ".0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r" ++ + "\nPUB hi 3\r\nfoo\r\n"; + var reader: std.Io.Reader = .fixed(input); + var arena: std.heap.ArenaAllocator = .init(std.testing.allocator); + defer arena.deinit(); + const gpa = arena.allocator(); + + { + const msg: Message = try Message.next(gpa, &reader); + const expected: Message = .{ .connect = .{ + .verbose = false, + .pedantic = false, + .tls_required = false, + .name = "NATS CLI Version v0.2.4", + .lang = "go", + .version = "1.43.0", + .protocol = 1, + .echo = true, + .headers = true, + .no_responders = true, + } }; + + try std.testing.expectEqualDeep(expected, msg); + } + { + const msg: Message = try Message.next(gpa, &reader); + const expected: Message = .{ .@"pub" = .{ + .subject = "hi", + .payload = "foo", + } }; + try std.testing.expectEqualDeep(expected, msg); + } +} |
