From bd9ed88e5c7e112f2f4be8234fd11dd9db82d111 Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Sat, 29 Nov 2025 18:30:39 -0500 Subject: --- src/main.zig | 172 ++++++++++++-------------------------------------- src/root.zig | 1 + src/server/client.zig | 31 +++++---- src/server/main.zig | 109 ++++++++++++++++++++++++++++++++ 4 files changed, 165 insertions(+), 148 deletions(-) create mode 100644 src/server/main.zig (limited to 'src') diff --git a/src/main.zig b/src/main.zig index e24bbcb..ea188fb 100644 --- a/src/main.zig +++ b/src/main.zig @@ -3,6 +3,7 @@ const zits = @import("zits"); const clap = @import("clap"); const Message = zits.MessageParser.Message; +const Server = zits.Server; const SubCommands = enum { help, @@ -23,7 +24,7 @@ const main_params = clap.parseParamsComptime( // To pass around arguments returned by clap, `clap.Result` and `clap.ResultEx` can be used to // get the return type of `clap.parse` and `clap.parseEx`. -const MainArgs = clap.ResultEx(clap.Help, &main_params, main_parsers); +pub const MainArgs = clap.ResultEx(clap.Help, &main_params, main_parsers); pub fn main() !void { var dba: std.heap.DebugAllocator(.{}) = .init; @@ -51,6 +52,7 @@ pub fn main() !void { return err; }; defer res.deinit(); + std.debug.print("res: {any}\n", .{res}); if (res.args.help != 0) return clap.helpToFile(.stderr(), clap.Help, &main_params, .{}); @@ -58,138 +60,44 @@ pub fn main() !void { const command = res.positionals[0] orelse return error.MissingCommand; switch (command) { .help => return clap.helpToFile(.stderr(), clap.Help, &main_params, .{}), - .serve => try serverMain(gpa, &iter, res), + .serve => try Server.main(gpa, &iter, res), .@"pub" => unreachable, } } -const ServerInfo = struct { - /// The unique identifier of the NATS server. - server_id: []const u8, - /// The name of the NATS server. - server_name: []const u8, - /// The version of NATS. - version: []const u8, - /// The version of golang the NATS server was built with. - go: []const u8 = "0.0.0", - /// The IP address used to start the NATS server, - /// by default this will be 0.0.0.0 and can be - /// configured with -client_advertise host:port. - host: []const u8 = "0.0.0.0", - /// The port number the NATS server is configured - /// to listen on. - port: u16 = 6868, - /// Whether the server supports headers. - headers: bool = false, - /// Maximum payload size, in bytes, that the server - /// will accept from the client. - max_payload: u64, - /// An integer indicating the protocol version of - /// the server. The server version 1.2.0 sets this - /// to 1 to indicate that it supports the "Echo" - /// feature. - proto: u32 = 1, -}; - -fn serverMain(gpa: std.mem.Allocator, iter: *std.process.ArgIterator, main_args: MainArgs) !void { - _ = iter; - _ = main_args; - - var threaded: std.Io.Threaded = .init(gpa); - defer threaded.deinit(); - const io = threaded.io(); - - const info: ServerInfo = .{ - .server_id = "NBEK5DBBB4ZO5LTBGPXACZSB2QUTODC6GGN5NLOSPIGSRFWJID4XU52C", - .server_name = "bar", - .version = "2.11.8", - .go = "go1.24.6", - .headers = true, - .max_payload = 1048576, - }; - - var server = try std.Io.net.IpAddress.listen( - .{ - .ip4 = .{ - .bytes = .{ 0, 0, 0, 0 }, - .port = info.port, - }, - }, - io, - .{}, - ); - defer server.deinit(io); - - var group: std.Io.Group = .init; - defer group.wait(io); - for (0..5) |_| { - const stream = try server.accept(io); - group.async(io, handleConnection, .{ gpa, io, stream, info }); - } -} - -fn handleConnection(allocator: std.mem.Allocator, io: std.Io, stream: std.Io.net.Stream, info: ServerInfo) void { - defer stream.close(io); - var w_buffer: [1024]u8 = undefined; - var writer = stream.writer(io, &w_buffer); - const out = &writer.interface; - - var r_buffer: [8192]u8 = undefined; - var reader = stream.reader(io, &r_buffer); - const in = &reader.interface; - - processClient(allocator, in, out, info) catch |err| { - std.debug.panic("Error processing client: {}\n", .{err}); - }; -} - -fn processClient(gpa: std.mem.Allocator, in: *std.Io.Reader, out: *std.Io.Writer, info: ServerInfo) !void { - try writeInfo(out, info); - - var client_state_arena: std.heap.ArenaAllocator = .init(gpa); - defer client_state_arena.deinit(); - const client_state = (try Message.next(client_state_arena.allocator(), in)).connect; - _ = client_state; - - var message_parsing_arena: std.heap.ArenaAllocator = .init(gpa); - defer message_parsing_arena.deinit(); - const message_parsing_allocator = message_parsing_arena.allocator(); - while (true) { - defer _ = message_parsing_arena.reset(.retain_capacity); - const next_message = Message.next(message_parsing_allocator, in) catch |err| { - switch (err) { - error.EndOfStream => { - break; - }, - else => { - return err; - }, - } - }; - switch (next_message) { - .connect => |connect| { - std.debug.panic("Connection message after already connected: {any}\n", .{connect}); - }, - .ping => try writePong(out), - .@"pub" => try writeOk(out), - else => |msg| std.debug.panic("Message type not implemented: {any}\n", .{msg}), - } - } -} - -fn writeOk(out: *std.Io.Writer) !void { - _ = try out.write("+OK\r\n"); - try out.flush(); -} - -fn writePong(out: *std.Io.Writer) !void { - _ = try out.write("PONG\r\n"); - try out.flush(); -} - -fn writeInfo(out: *std.Io.Writer, info: ServerInfo) !void { - _ = try out.write("INFO "); - try std.json.Stringify.value(info, .{}, out); - _ = try out.write("\r\n"); - try out.flush(); -} +// fn serverMain(gpa: std.mem.Allocator, iter: *std.process.ArgIterator, main_args: MainArgs) !void { +// _ = iter; +// _ = main_args; + +// var threaded: std.Io.Threaded = .init(gpa); +// defer threaded.deinit(); +// const io = threaded.io(); + +// const info: ServerInfo = .{ +// .server_id = "NBEK5DBBB4ZO5LTBGPXACZSB2QUTODC6GGN5NLOSPIGSRFWJID4XU52C", +// .server_name = "bar", +// .version = "2.11.8", +// .go = "go1.24.6", +// .headers = true, +// .max_payload = 1048576, +// }; + +// var server = try std.Io.net.IpAddress.listen( +// .{ +// .ip4 = .{ +// .bytes = .{ 0, 0, 0, 0 }, +// .port = info.port, +// }, +// }, +// io, +// .{}, +// ); +// defer server.deinit(io); + +// var group: std.Io.Group = .init; +// defer group.wait(io); +// for (0..5) |_| { +// const stream = try server.accept(io); +// group.async(io, handleConnection, .{ gpa, io, stream, info }); +// } +// } diff --git a/src/root.zig b/src/root.zig index 12ebb2b..e46af60 100644 --- a/src/root.zig +++ b/src/root.zig @@ -1 +1,2 @@ pub const MessageParser = @import("server/message_parser.zig"); +pub const Server = @import("server/main.zig"); diff --git a/src/server/client.zig b/src/server/client.zig index 8b49b89..bc02611 100644 --- a/src/server/client.zig +++ b/src/server/client.zig @@ -1,18 +1,17 @@ +const Message = @import("message_parser.zig").Message; +const std = @import("std"); + const ClientState = struct { - verbose: bool = false, - pedantic: bool = false, - tls_required: bool = false, - auth_token: ?[]const u8 = null, - user: ?[]const u8 = null, - pass: ?[]const u8 = null, - name: ?[]const u8 = null, - lang: []const u8, - version: []const u8, - protocol: u32, - echo: ?bool = null, - sig: ?[]const u8 = null, - jwt: ?[]const u8 = null, - no_responders: ?bool = null, - headers: ?bool = null, - nkey: ?[]const u8 = null, + id: u32, + /// Used to back `connect` strings. + string_buffer: [4096]u8, + connect: Message.Connect, + send_queue: std.Io.Queue(Message) = blk: { + var send_queue_buffer: [1024]Message = undefined; + break :blk .init(&send_queue_buffer); + }, + recv_queue: std.Io.Queue(Message) = blk: { + var recv_queue_buffer: [1024]Message = undefined; + break :blk .init(&recv_queue_buffer); + }, }; diff --git a/src/server/main.zig b/src/server/main.zig new file mode 100644 index 0000000..e0058a7 --- /dev/null +++ b/src/server/main.zig @@ -0,0 +1,109 @@ +const std = @import("std"); +const Message = @import("./message_parser.zig"); + +const ClientState = @import("./client.zig"); + +const ServerInfo = struct { + /// The unique identifier of the NATS server. + server_id: []const u8, + /// The name of the NATS server. + server_name: []const u8, + /// The version of NATS. + version: []const u8, + /// The version of golang the NATS server was built with. + go: []const u8 = "0.0.0", + /// The IP address used to start the NATS server, + /// by default this will be 0.0.0.0 and can be + /// configured with -client_advertise host:port. + host: []const u8 = "0.0.0.0", + /// The port number the NATS server is configured + /// to listen on. + port: u16 = 6868, + /// Whether the server supports headers. + headers: bool = false, + /// Maximum payload size, in bytes, that the server + /// will accept from the client. + max_payload: u64, + /// An integer indicating the protocol version of + /// the server. The server version 1.2.0 sets this + /// to 1 to indicate that it supports the "Echo" + /// feature. + proto: u32 = 1, +}; + +server_info: ServerInfo, +clients: std.AutoHashMapUnmanaged(u64, ClientState) = .empty, +/// Map of subjects to client IDs that are subscribed to that subject. +subscriptions: std.StringHashMapUnmanaged(std.ArrayList(u64)), + +pub fn main(gpa: std.mem.Allocator, iter: *std.process.ArgIterator, main_args: anytype) !void { + _ = gpa; + _ = iter; + _ = main_args; +} + +fn handleConnection(allocator: std.mem.Allocator, io: std.Io, stream: std.Io.net.Stream, info: ServerInfo) void { + defer stream.close(io); + var w_buffer: [1024]u8 = undefined; + var writer = stream.writer(io, &w_buffer); + const out = &writer.interface; + + var r_buffer: [8192]u8 = undefined; + var reader = stream.reader(io, &r_buffer); + const in = &reader.interface; + + processClient(allocator, in, out, info) catch |err| { + std.debug.panic("Error processing client: {}\n", .{err}); + }; +} + +fn processClient(gpa: std.mem.Allocator, in: *std.Io.Reader, out: *std.Io.Writer, info: ServerInfo) !void { + try writeInfo(out, info); + + var client_state_arena: std.heap.ArenaAllocator = .init(gpa); + defer client_state_arena.deinit(); + const client_state = (try Message.next(client_state_arena.allocator(), in)).connect; + _ = client_state; + + var message_parsing_arena: std.heap.ArenaAllocator = .init(gpa); + defer message_parsing_arena.deinit(); + const message_parsing_allocator = message_parsing_arena.allocator(); + while (true) { + defer _ = message_parsing_arena.reset(.retain_capacity); + const next_message = Message.next(message_parsing_allocator, in) catch |err| { + switch (err) { + error.EndOfStream => { + break; + }, + else => { + return err; + }, + } + }; + switch (next_message) { + .connect => |connect| { + std.debug.panic("Connection message after already connected: {any}\n", .{connect}); + }, + .ping => try writePong(out), + .@"pub" => try writeOk(out), + else => |msg| std.debug.panic("Message type not implemented: {any}\n", .{msg}), + } + } +} + +fn writeOk(out: *std.Io.Writer) !void { + _ = try out.write("+OK\r\n"); + try out.flush(); +} + +fn writePong(out: *std.Io.Writer) !void { + _ = try out.write("PONG\r\n"); + try out.flush(); +} + +fn writeInfo(out: *std.Io.Writer, info: ServerInfo) !void { + _ = try out.write("INFO "); + try std.json.Stringify.value(info, .{}, out); + _ = try out.write("\r\n"); + try out.flush(); +} -- cgit