diff options
| -rw-r--r-- | src/server/client.zig | 18 | ||||
| -rw-r--r-- | src/server/main.zig | 32 | ||||
| -rw-r--r-- | src/server/message_parser.zig | 13 |
3 files changed, 43 insertions, 20 deletions
diff --git a/src/server/client.zig b/src/server/client.zig index 87bb177..c7aa41a 100644 --- a/src/server/client.zig +++ b/src/server/client.zig @@ -77,7 +77,7 @@ pub const ClientState = struct { } /// Return true if the value was put in the clients buffer to process, else false. - pub fn send(self: *ClientState, io: std.Io, msg: Message) (std.Io.Cancelable||std.Io.QueueClosedError)!bool { + pub fn send(self: *ClientState, io: std.Io, msg: Message) (std.Io.Cancelable || std.Io.QueueClosedError)!bool { try self.recv_queue.putOne(io, msg); return true; } @@ -110,6 +110,16 @@ pub fn writeInfo(out: *std.Io.Writer, info: Message.ServerInfo) !void { fn writeMsg(out: *std.Io.Writer, msg: Message.Msg) !void { std.debug.print("PRINTING MESSAGE\n\n\n\n", .{}); + std.debug.print( + "MSG {s} {s} {s} {d}\r\n{s}\r\n-\n\n\n", + .{ + msg.subject, + msg.sid, + msg.reply_to orelse "", + msg.payload.len, + msg.payload, + }, + ); try out.print( "MSG {s} {s} {s} {d}\r\n{s}\r\n", .{ @@ -135,6 +145,7 @@ test { var from_client_queue: std.Io.Queue(Message) = .init(&from_client_buf); { + // Simulate stream while (Message.next(gpa, &from_client)) |msg| { switch (msg) { .eos => { @@ -145,7 +156,10 @@ test { try from_client_queue.putOne(io, msg); }, } - } else |_| {} + } else |err| switch (err) { + error.EndOfStream => try from_client_queue.close(io), + else => return err, + } while (from_client_queue.getOne(io)) |msg| { switch (msg) { diff --git a/src/server/main.zig b/src/server/main.zig index e1f9891..f3702e9 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -10,7 +10,24 @@ clients: std.AutoHashMapUnmanaged(usize, *ClientState) = .empty, /// Map of subjects to a map of (client ID => SID) subscriptions: std.StringHashMapUnmanaged(std.AutoHashMapUnmanaged(usize, []const u8)) = .empty, +var keep_running = std.atomic.Value(bool).init(true); + +fn handleSigInt(sig: std.os.linux.SIG) callconv(.c) void { + _ = sig; + keep_running.store(false, .monotonic); +} + pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void { + // Configure the signal action + // const act = std.posix.Sigaction{ + // .handler = .{ .handler = handleSigInt }, + // .mask = std.posix.sigemptyset(), + // .flags = 0, + // }; + + // // Register the handler for SIGINT (Ctrl+C) + // std.posix.sigaction(std.posix.SIG.INT, &act, null); + var server: Server = .{ .info = server_config, }; @@ -26,7 +43,8 @@ pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void { defer tcp_server.deinit(io); var id: usize = 0; - while (true) : (id +%= 1) { + // Run until SIGINT is handled, then exit gracefully + while (keep_running.load(.monotonic)) : (id +%= 1) { std.debug.print("in server loop\n", .{}); if (server.clients.contains(id)) continue; const stream = try tcp_server.accept(io); @@ -36,6 +54,8 @@ pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void { stream.close(io); }; } + + std.debug.print("Exiting gracefully\n", .{}); } fn addClient(server: *Server, allocator: std.mem.Allocator, id: usize, client: *ClientState) !void { @@ -58,8 +78,8 @@ fn handleConnection( id: usize, stream: std.Io.net.Stream, ) !void { - _ = server_allocator; var client_allocator: std.heap.DebugAllocator(.{}) = .init; + client_allocator.backing_allocator = server_allocator; defer { std.debug.print("deinitializing debug allocator\n", .{}); _ = client_allocator.deinit(); @@ -113,9 +133,6 @@ fn handleConnection( .unsub => |unsub| { try server.unsubscribe(client_state.id, unsub); }, - .eos => { - break; - }, else => |e| { std.debug.panic("Unimplemented message: {any}\n", .{e}); }, @@ -123,7 +140,10 @@ fn handleConnection( std.debug.print("processed message from client\n", .{}); std.debug.print("awaiting next message from client\n", .{}); - } else |_| {} + } else |err| { + // This is probably going to be normal on disconnect + std.debug.print("Ran into error in client process loop: {}\n", .{err}); + } // client_state.task.await(io); } diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index 0ff3dd4..ed5bc76 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -13,7 +13,6 @@ pub const MessageType = enum { pong, @"+ok", @"-err", - eos, fn parseMemEql(input: []const u8) ?MessageType { // if (std.mem.eql(u8, "INFO", input)) return .info; @@ -45,9 +44,6 @@ pub const Message = union(MessageType) { pong, @"+ok": void, @"-err": void, - // Not an actual NATS message, but used to signal end of stream was reached in the input, - // and we should close the reader. - eos: void, pub const ServerInfo = struct { /// The unique identifier of the NATS server. server_id: []const u8, @@ -176,14 +172,7 @@ pub const Message = union(MessageType) { try operation_string.appendBounded(byte); try in.discardAll(1); } else break; - } else |err| switch (err) { - error.EndOfStream => { - return .{ .eos = {} }; - }, - else => { - return err; - }, - } + } else |err| return err; const operation = parse(operation_string.items) orelse { return error.InvalidOperation; |
