diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2025-11-29 18:30:39 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2025-11-29 18:30:39 -0500 |
| commit | bd9ed88e5c7e112f2f4be8234fd11dd9db82d111 (patch) | |
| tree | 5cb8a963599585a974e33b54b0cf04b54eb3e529 /src/server/main.zig | |
| parent | c6dfcc541d14a934ab739cb56f4e11882f46e9ea (diff) | |
Diffstat (limited to 'src/server/main.zig')
| -rw-r--r-- | src/server/main.zig | 109 |
1 files changed, 109 insertions, 0 deletions
diff --git a/src/server/main.zig b/src/server/main.zig new file mode 100644 index 0000000..e0058a7 --- /dev/null +++ b/src/server/main.zig @@ -0,0 +1,109 @@ +const std = @import("std"); +const Message = @import("./message_parser.zig"); + +const ClientState = @import("./client.zig"); + +const ServerInfo = struct { + /// The unique identifier of the NATS server. + server_id: []const u8, + /// The name of the NATS server. + server_name: []const u8, + /// The version of NATS. + version: []const u8, + /// The version of golang the NATS server was built with. + go: []const u8 = "0.0.0", + /// The IP address used to start the NATS server, + /// by default this will be 0.0.0.0 and can be + /// configured with -client_advertise host:port. + host: []const u8 = "0.0.0.0", + /// The port number the NATS server is configured + /// to listen on. + port: u16 = 6868, + /// Whether the server supports headers. + headers: bool = false, + /// Maximum payload size, in bytes, that the server + /// will accept from the client. + max_payload: u64, + /// An integer indicating the protocol version of + /// the server. The server version 1.2.0 sets this + /// to 1 to indicate that it supports the "Echo" + /// feature. + proto: u32 = 1, +}; + +server_info: ServerInfo, +clients: std.AutoHashMapUnmanaged(u64, ClientState) = .empty, +/// Map of subjects to client IDs that are subscribed to that subject. +subscriptions: std.StringHashMapUnmanaged(std.ArrayList(u64)), + +pub fn main(gpa: std.mem.Allocator, iter: *std.process.ArgIterator, main_args: anytype) !void { + _ = gpa; + _ = iter; + _ = main_args; +} + +fn handleConnection(allocator: std.mem.Allocator, io: std.Io, stream: std.Io.net.Stream, info: ServerInfo) void { + defer stream.close(io); + var w_buffer: [1024]u8 = undefined; + var writer = stream.writer(io, &w_buffer); + const out = &writer.interface; + + var r_buffer: [8192]u8 = undefined; + var reader = stream.reader(io, &r_buffer); + const in = &reader.interface; + + processClient(allocator, in, out, info) catch |err| { + std.debug.panic("Error processing client: {}\n", .{err}); + }; +} + +fn processClient(gpa: std.mem.Allocator, in: *std.Io.Reader, out: *std.Io.Writer, info: ServerInfo) !void { + try writeInfo(out, info); + + var client_state_arena: std.heap.ArenaAllocator = .init(gpa); + defer client_state_arena.deinit(); + const client_state = (try Message.next(client_state_arena.allocator(), in)).connect; + _ = client_state; + + var message_parsing_arena: std.heap.ArenaAllocator = .init(gpa); + defer message_parsing_arena.deinit(); + const message_parsing_allocator = message_parsing_arena.allocator(); + while (true) { + defer _ = message_parsing_arena.reset(.retain_capacity); + const next_message = Message.next(message_parsing_allocator, in) catch |err| { + switch (err) { + error.EndOfStream => { + break; + }, + else => { + return err; + }, + } + }; + switch (next_message) { + .connect => |connect| { + std.debug.panic("Connection message after already connected: {any}\n", .{connect}); + }, + .ping => try writePong(out), + .@"pub" => try writeOk(out), + else => |msg| std.debug.panic("Message type not implemented: {any}\n", .{msg}), + } + } +} + +fn writeOk(out: *std.Io.Writer) !void { + _ = try out.write("+OK\r\n"); + try out.flush(); +} + +fn writePong(out: *std.Io.Writer) !void { + _ = try out.write("PONG\r\n"); + try out.flush(); +} + +fn writeInfo(out: *std.Io.Writer, info: ServerInfo) !void { + _ = try out.write("INFO "); + try std.json.Stringify.value(info, .{}, out); + _ = try out.write("\r\n"); + try out.flush(); +} |
