const std = @import("std"); const zits = @import("zits"); const clap = @import("clap"); const MessageType = zits.MessageParser.MessageType; const parseNextMessage = zits.MessageParser.parseNextMessage; const SubCommands = enum { help, serve, @"pub", }; const main_parsers = .{ .command = clap.parsers.enumeration(SubCommands), }; // The parameters for `main`. Parameters for the subcommands are specified further down. const main_params = clap.parseParamsComptime( \\-h, --help Display this help and exit. \\ \\ ); // To pass around arguments returned by clap, `clap.Result` and `clap.ResultEx` can be used to // get the return type of `clap.parse` and `clap.parseEx`. const MainArgs = clap.ResultEx(clap.Help, &main_params, main_parsers); pub fn main() !void { var dba = std.heap.DebugAllocator(.{}){}; defer _ = dba.deinit(); const gpa = dba.allocator(); var iter = try std.process.ArgIterator.initWithAllocator(gpa); defer iter.deinit(); _ = iter.next(); var diag = clap.Diagnostic{}; var res = clap.parseEx(clap.Help, &main_params, main_parsers, &iter, .{ .diagnostic = &diag, .allocator = gpa, // Terminate the parsing of arguments after parsing the first positional (0 is passed // here because parsed positionals are, like slices and arrays, indexed starting at 0). // // This will terminate the parsing after parsing the subcommand enum and leave `iter` // not fully consumed. It can then be reused to parse the arguments for subcommands. .terminating_positional = 0, }) catch |err| { try diag.reportToFile(.stderr(), err); return err; }; defer res.deinit(); if (res.args.help != 0) return clap.helpToFile(.stderr(), clap.Help, &main_params, .{}); const command = res.positionals[0] orelse return error.MissingCommand; switch (command) { .help => return clap.helpToFile(.stderr(), clap.Help, &main_params, .{}), .serve => try serverMain(gpa, &iter, res), .@"pub" => unreachable, } } const ServerInfo = struct { /// The unique identifier of the NATS server. server_id: []const u8, /// The name of the NATS server. server_name: []const u8, /// The version of NATS. version: []const u8, /// The version of golang the NATS server was built with. go: []const u8 = "0.0.0", /// The IP address used to start the NATS server, /// by default this will be 0.0.0.0 and can be /// configured with -client_advertise host:port. host: []const u8 = "0.0.0.0", /// The port number the NATS server is configured /// to listen on. port: u16 = 6868, /// Whether the server supports headers. headers: bool = false, /// Maximum payload size, in bytes, that the server /// will accept from the client. max_payload: u64, /// An integer indicating the protocol version of /// the server. The server version 1.2.0 sets this /// to 1 to indicate that it supports the "Echo" /// feature. proto: u32 = 1, }; fn serverMain(gpa: std.mem.Allocator, iter: *std.process.ArgIterator, main_args: MainArgs) !void { _ = iter; _ = main_args; var threaded: std.Io.Threaded = .init(gpa); defer threaded.deinit(); const io = threaded.io(); const info: ServerInfo = .{ .server_id = "NBEK5DBBB4ZO5LTBGPXACZSB2QUTODC6GGN5NLOSPIGSRFWJID4XU52C", .server_name = "bar", .version = "2.11.8", .go = "go1.24.6", .headers = true, .max_payload = 1048576, }; // const info: ServerInfo = .{ // .server_id = "foo", // .server_name = "bar", // .version = "6.9.0", // .max_payload = 6969, // }; var server = try std.Io.net.IpAddress.listen(.{ .ip4 = .{ .bytes = .{ 0, 0, 0, 0 }, .port = info.port, }, }, io, .{}); defer server.deinit(io); var group: std.Io.Group = .init; defer group.wait(io); for (0..5) |_| { const stream = try server.accept(io); group.async(io, handleConnection, .{ io, stream, info }); } } fn handleConnection(io: std.Io, stream: std.Io.net.Stream, info: ServerInfo) void { defer stream.close(io); var w_buffer: [1024]u8 = undefined; var writer = stream.writer(io, &w_buffer); const out = &writer.interface; var r_buffer: [8192]u8 = undefined; var reader = stream.reader(io, &r_buffer); const in = &reader.interface; processClient(in, out, info) catch |err| { std.debug.panic("Error processing client: {}\n", .{err}); }; // var stdout_buffer: [1024]u8 = undefined; // const stdout_file = std.fs.File.stdout(); // var stdout_file_writer = stdout_file.writer(&stdout_buffer); // const stdout_writer = &stdout_file_writer.interface; // var timeout = io.async(std.Io.sleep, .{ io, .fromSeconds(10), .real }); // defer timeout.cancel(io) catch {}; // var user_res = io.async(std.Io.Reader.streamRemaining, .{ in, stdout_writer }); // defer _ = user_res.cancel(io) catch {}; // switch (io.select(.{ // .timeout = &timeout, // .data = &user_res, // }) catch unreachable) { // .timeout => std.debug.print("timeout\n", .{}), // .data => |_| { // stdout_writer.flush() catch |err| { // std.debug.print("Could not flush stdout: {}\n", .{err}); // }; // // std.debug.print("received data {any}\n", .{d}); // }, // } } fn processClient(in: *std.Io.Reader, out: *std.Io.Writer, info: ServerInfo) !void { try writeInfo(out, info); // move this inside client_state declaration var json_parse_buf: [4096]u8 = undefined; var json_parse_alloc_fb: std.heap.FixedBufferAllocator = std.heap.FixedBufferAllocator.init(&json_parse_buf); var json_parse_alloc = json_parse_alloc_fb.allocator(); var json_reader: std.json.Reader = .init(json_parse_alloc, in); // var client_state = try std.json.parseFromSliceLeaky(ClientState, json_parse_alloc, in.buffered(), .{}); // in.toss(in.buffered().len); // var client_state = try std.json.parseFromTokenSourceLeaky(ClientState, json_parse_alloc, &json_reader, .{}); const client_state = 0; std.debug.print("client_state: {}\n", .{client_state}); while (true) { const next_message_type = parseNextMessage(json_parse_alloc, in) orelse return; switch (next_message_type) { .connect => |connect| { std.debug.print("connect: {s}\n", .{connect.name orelse "\"\""}); json_parse_alloc_fb = .init(&json_parse_buf); json_parse_alloc = json_parse_alloc_fb.allocator(); json_reader = .init(json_parse_alloc, in); // client_state = try std.json.parseFromTokenSourceLeaky(ClientState, json_parse_alloc, &json_reader, .{}); std.debug.print("client_state: {any}\n", .{client_state}); }, .ping => writePong(out) catch |err| { std.debug.panic("failed to pong: {any}\n", .{err}); }, else => |msg| std.debug.print("received {}\n", .{msg}), } } } fn writePong(out: *std.Io.Writer) !void { std.debug.print("in writePong\n", .{}); _ = try out.write("PONG"); _ = try out.write("\r\n"); try out.flush(); } fn writeInfo(out: *std.Io.Writer, info: ServerInfo) !void { _ = try out.write("INFO "); try std.json.Stringify.value(info, .{}, out); _ = try out.write("\r\n"); try out.flush(); }