const std = @import("std"); const zits = @import("zits"); const clap = @import("clap"); const Message = zits.MessageParser.Message; const SubCommands = enum { help, serve, @"pub", }; const main_parsers = .{ .command = clap.parsers.enumeration(SubCommands), }; // The parameters for `main`. Parameters for the subcommands are specified further down. const main_params = clap.parseParamsComptime( \\-h, --help Display this help and exit. \\ \\ ); // To pass around arguments returned by clap, `clap.Result` and `clap.ResultEx` can be used to // get the return type of `clap.parse` and `clap.parseEx`. const MainArgs = clap.ResultEx(clap.Help, &main_params, main_parsers); pub fn main() !void { var dba: std.heap.DebugAllocator(.{}) = .init; defer _ = dba.deinit(); const gpa = dba.allocator(); var iter = try std.process.ArgIterator.initWithAllocator(gpa); defer iter.deinit(); _ = iter.next(); var diag = clap.Diagnostic{}; var res = clap.parseEx(clap.Help, &main_params, main_parsers, &iter, .{ .diagnostic = &diag, .allocator = gpa, // Terminate the parsing of arguments after parsing the first positional (0 is passed // here because parsed positionals are, like slices and arrays, indexed starting at 0). // // This will terminate the parsing after parsing the subcommand enum and leave `iter` // not fully consumed. It can then be reused to parse the arguments for subcommands. .terminating_positional = 0, }) catch |err| { try diag.reportToFile(.stderr(), err); return err; }; defer res.deinit(); if (res.args.help != 0) return clap.helpToFile(.stderr(), clap.Help, &main_params, .{}); const command = res.positionals[0] orelse return error.MissingCommand; switch (command) { .help => return clap.helpToFile(.stderr(), clap.Help, &main_params, .{}), .serve => try serverMain(gpa, &iter, res), .@"pub" => unreachable, } } const ServerInfo = struct { /// The unique identifier of the NATS server. server_id: []const u8, /// The name of the NATS server. server_name: []const u8, /// The version of NATS. version: []const u8, /// The version of golang the NATS server was built with. go: []const u8 = "0.0.0", /// The IP address used to start the NATS server, /// by default this will be 0.0.0.0 and can be /// configured with -client_advertise host:port. host: []const u8 = "0.0.0.0", /// The port number the NATS server is configured /// to listen on. port: u16 = 6868, /// Whether the server supports headers. headers: bool = false, /// Maximum payload size, in bytes, that the server /// will accept from the client. max_payload: u64, /// An integer indicating the protocol version of /// the server. The server version 1.2.0 sets this /// to 1 to indicate that it supports the "Echo" /// feature. proto: u32 = 1, }; fn serverMain(gpa: std.mem.Allocator, iter: *std.process.ArgIterator, main_args: MainArgs) !void { _ = iter; _ = main_args; var threaded: std.Io.Threaded = .init(gpa); defer threaded.deinit(); const io = threaded.io(); const info: ServerInfo = .{ .server_id = "NBEK5DBBB4ZO5LTBGPXACZSB2QUTODC6GGN5NLOSPIGSRFWJID4XU52C", .server_name = "bar", .version = "2.11.8", .go = "go1.24.6", .headers = true, .max_payload = 1048576, }; var server = try std.Io.net.IpAddress.listen( .{ .ip4 = .{ .bytes = .{ 0, 0, 0, 0 }, .port = info.port, }, }, io, .{}, ); defer server.deinit(io); var group: std.Io.Group = .init; defer group.wait(io); for (0..5) |_| { const stream = try server.accept(io); group.async(io, handleConnection, .{ gpa, io, stream, info }); } } fn handleConnection(allocator: std.mem.Allocator, io: std.Io, stream: std.Io.net.Stream, info: ServerInfo) void { defer stream.close(io); var w_buffer: [1024]u8 = undefined; var writer = stream.writer(io, &w_buffer); const out = &writer.interface; var r_buffer: [8192]u8 = undefined; var reader = stream.reader(io, &r_buffer); const in = &reader.interface; processClient(allocator, in, out, info) catch |err| { std.debug.panic("Error processing client: {}\n", .{err}); }; } fn processClient(gpa: std.mem.Allocator, in: *std.Io.Reader, out: *std.Io.Writer, info: ServerInfo) !void { try writeInfo(out, info); var client_state_arena: std.heap.ArenaAllocator = .init(gpa); defer client_state_arena.deinit(); const client_state = (try Message.next(client_state_arena.allocator(), in)).connect; _ = client_state; var message_parsing_arena: std.heap.ArenaAllocator = .init(gpa); defer message_parsing_arena.deinit(); const message_parsing_allocator = message_parsing_arena.allocator(); while (true) { defer _ = message_parsing_arena.reset(.retain_capacity); const next_message = Message.next(message_parsing_allocator, in) catch |err| { switch (err) { error.EndOfStream => { break; }, else => { return err; }, } }; switch (next_message) { .connect => |connect| { std.debug.panic("Connection message after already connected: {any}\n", .{connect}); }, .ping => try writePong(out), .@"pub" => try writeOk(out), else => |msg| std.debug.panic("Message type not implemented: {any}\n", .{msg}), } } } fn writeOk(out: *std.Io.Writer) !void { _ = try out.write("+OK\r\n"); try out.flush(); } fn writePong(out: *std.Io.Writer) !void { _ = try out.write("PONG\r\n"); try out.flush(); } fn writeInfo(out: *std.Io.Writer, info: ServerInfo) !void { _ = try out.write("INFO "); try std.json.Stringify.value(info, .{}, out); _ = try out.write("\r\n"); try out.flush(); }