const std = @import("std"); const Message = @import("./message_parser.zig"); const ClientState = @import("./client.zig"); const ServerInfo = struct { /// The unique identifier of the NATS server. server_id: []const u8, /// The name of the NATS server. server_name: []const u8, /// The version of NATS. version: []const u8, /// The version of golang the NATS server was built with. go: []const u8 = "0.0.0", /// The IP address used to start the NATS server, /// by default this will be 0.0.0.0 and can be /// configured with -client_advertise host:port. host: []const u8 = "0.0.0.0", /// The port number the NATS server is configured /// to listen on. port: u16 = 6868, /// Whether the server supports headers. headers: bool = false, /// Maximum payload size, in bytes, that the server /// will accept from the client. max_payload: u64, /// An integer indicating the protocol version of /// the server. The server version 1.2.0 sets this /// to 1 to indicate that it supports the "Echo" /// feature. proto: u32 = 1, }; server_info: ServerInfo, clients: std.AutoHashMapUnmanaged(u64, ClientState) = .empty, /// Map of subjects to client IDs that are subscribed to that subject. subscriptions: std.StringHashMapUnmanaged(std.ArrayList(u64)), pub fn main(gpa: std.mem.Allocator, iter: *std.process.ArgIterator, main_args: anytype) !void { _ = gpa; _ = iter; _ = main_args; } fn handleConnection(allocator: std.mem.Allocator, io: std.Io, stream: std.Io.net.Stream, info: ServerInfo) void { defer stream.close(io); var w_buffer: [1024]u8 = undefined; var writer = stream.writer(io, &w_buffer); const out = &writer.interface; var r_buffer: [8192]u8 = undefined; var reader = stream.reader(io, &r_buffer); const in = &reader.interface; processClient(allocator, in, out, info) catch |err| { std.debug.panic("Error processing client: {}\n", .{err}); }; } fn processClient(gpa: std.mem.Allocator, in: *std.Io.Reader, out: *std.Io.Writer, info: ServerInfo) !void { try writeInfo(out, info); var client_state_arena: std.heap.ArenaAllocator = .init(gpa); defer client_state_arena.deinit(); const client_state = (try Message.next(client_state_arena.allocator(), in)).connect; _ = client_state; var message_parsing_arena: std.heap.ArenaAllocator = .init(gpa); defer message_parsing_arena.deinit(); const message_parsing_allocator = message_parsing_arena.allocator(); while (true) { defer _ = message_parsing_arena.reset(.retain_capacity); const next_message = Message.next(message_parsing_allocator, in) catch |err| { switch (err) { error.EndOfStream => { break; }, else => { return err; }, } }; switch (next_message) { .connect => |connect| { std.debug.panic("Connection message after already connected: {any}\n", .{connect}); }, .ping => try writePong(out), .@"pub" => try writeOk(out), else => |msg| std.debug.panic("Message type not implemented: {any}\n", .{msg}), } } } fn writeOk(out: *std.Io.Writer) !void { _ = try out.write("+OK\r\n"); try out.flush(); } fn writePong(out: *std.Io.Writer) !void { _ = try out.write("PONG\r\n"); try out.flush(); } fn writeInfo(out: *std.Io.Writer, info: ServerInfo) !void { _ = try out.write("INFO "); try std.json.Stringify.value(info, .{}, out); _ = try out.write("\r\n"); try out.flush(); }