summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.zig172
-rw-r--r--src/root.zig1
-rw-r--r--src/server/client.zig31
-rw-r--r--src/server/main.zig109
4 files changed, 165 insertions, 148 deletions
diff --git a/src/main.zig b/src/main.zig
index e24bbcb..ea188fb 100644
--- a/src/main.zig
+++ b/src/main.zig
@@ -3,6 +3,7 @@ const zits = @import("zits");
const clap = @import("clap");
const Message = zits.MessageParser.Message;
+const Server = zits.Server;
const SubCommands = enum {
help,
@@ -23,7 +24,7 @@ const main_params = clap.parseParamsComptime(
// To pass around arguments returned by clap, `clap.Result` and `clap.ResultEx` can be used to
// get the return type of `clap.parse` and `clap.parseEx`.
-const MainArgs = clap.ResultEx(clap.Help, &main_params, main_parsers);
+pub const MainArgs = clap.ResultEx(clap.Help, &main_params, main_parsers);
pub fn main() !void {
var dba: std.heap.DebugAllocator(.{}) = .init;
@@ -51,6 +52,7 @@ pub fn main() !void {
return err;
};
defer res.deinit();
+ std.debug.print("res: {any}\n", .{res});
if (res.args.help != 0)
return clap.helpToFile(.stderr(), clap.Help, &main_params, .{});
@@ -58,138 +60,44 @@ pub fn main() !void {
const command = res.positionals[0] orelse return error.MissingCommand;
switch (command) {
.help => return clap.helpToFile(.stderr(), clap.Help, &main_params, .{}),
- .serve => try serverMain(gpa, &iter, res),
+ .serve => try Server.main(gpa, &iter, res),
.@"pub" => unreachable,
}
}
-const ServerInfo = struct {
- /// The unique identifier of the NATS server.
- server_id: []const u8,
- /// The name of the NATS server.
- server_name: []const u8,
- /// The version of NATS.
- version: []const u8,
- /// The version of golang the NATS server was built with.
- go: []const u8 = "0.0.0",
- /// The IP address used to start the NATS server,
- /// by default this will be 0.0.0.0 and can be
- /// configured with -client_advertise host:port.
- host: []const u8 = "0.0.0.0",
- /// The port number the NATS server is configured
- /// to listen on.
- port: u16 = 6868,
- /// Whether the server supports headers.
- headers: bool = false,
- /// Maximum payload size, in bytes, that the server
- /// will accept from the client.
- max_payload: u64,
- /// An integer indicating the protocol version of
- /// the server. The server version 1.2.0 sets this
- /// to 1 to indicate that it supports the "Echo"
- /// feature.
- proto: u32 = 1,
-};
-
-fn serverMain(gpa: std.mem.Allocator, iter: *std.process.ArgIterator, main_args: MainArgs) !void {
- _ = iter;
- _ = main_args;
-
- var threaded: std.Io.Threaded = .init(gpa);
- defer threaded.deinit();
- const io = threaded.io();
-
- const info: ServerInfo = .{
- .server_id = "NBEK5DBBB4ZO5LTBGPXACZSB2QUTODC6GGN5NLOSPIGSRFWJID4XU52C",
- .server_name = "bar",
- .version = "2.11.8",
- .go = "go1.24.6",
- .headers = true,
- .max_payload = 1048576,
- };
-
- var server = try std.Io.net.IpAddress.listen(
- .{
- .ip4 = .{
- .bytes = .{ 0, 0, 0, 0 },
- .port = info.port,
- },
- },
- io,
- .{},
- );
- defer server.deinit(io);
-
- var group: std.Io.Group = .init;
- defer group.wait(io);
- for (0..5) |_| {
- const stream = try server.accept(io);
- group.async(io, handleConnection, .{ gpa, io, stream, info });
- }
-}
-
-fn handleConnection(allocator: std.mem.Allocator, io: std.Io, stream: std.Io.net.Stream, info: ServerInfo) void {
- defer stream.close(io);
- var w_buffer: [1024]u8 = undefined;
- var writer = stream.writer(io, &w_buffer);
- const out = &writer.interface;
-
- var r_buffer: [8192]u8 = undefined;
- var reader = stream.reader(io, &r_buffer);
- const in = &reader.interface;
-
- processClient(allocator, in, out, info) catch |err| {
- std.debug.panic("Error processing client: {}\n", .{err});
- };
-}
-
-fn processClient(gpa: std.mem.Allocator, in: *std.Io.Reader, out: *std.Io.Writer, info: ServerInfo) !void {
- try writeInfo(out, info);
-
- var client_state_arena: std.heap.ArenaAllocator = .init(gpa);
- defer client_state_arena.deinit();
- const client_state = (try Message.next(client_state_arena.allocator(), in)).connect;
- _ = client_state;
-
- var message_parsing_arena: std.heap.ArenaAllocator = .init(gpa);
- defer message_parsing_arena.deinit();
- const message_parsing_allocator = message_parsing_arena.allocator();
- while (true) {
- defer _ = message_parsing_arena.reset(.retain_capacity);
- const next_message = Message.next(message_parsing_allocator, in) catch |err| {
- switch (err) {
- error.EndOfStream => {
- break;
- },
- else => {
- return err;
- },
- }
- };
- switch (next_message) {
- .connect => |connect| {
- std.debug.panic("Connection message after already connected: {any}\n", .{connect});
- },
- .ping => try writePong(out),
- .@"pub" => try writeOk(out),
- else => |msg| std.debug.panic("Message type not implemented: {any}\n", .{msg}),
- }
- }
-}
-
-fn writeOk(out: *std.Io.Writer) !void {
- _ = try out.write("+OK\r\n");
- try out.flush();
-}
-
-fn writePong(out: *std.Io.Writer) !void {
- _ = try out.write("PONG\r\n");
- try out.flush();
-}
-
-fn writeInfo(out: *std.Io.Writer, info: ServerInfo) !void {
- _ = try out.write("INFO ");
- try std.json.Stringify.value(info, .{}, out);
- _ = try out.write("\r\n");
- try out.flush();
-}
+// fn serverMain(gpa: std.mem.Allocator, iter: *std.process.ArgIterator, main_args: MainArgs) !void {
+// _ = iter;
+// _ = main_args;
+
+// var threaded: std.Io.Threaded = .init(gpa);
+// defer threaded.deinit();
+// const io = threaded.io();
+
+// const info: ServerInfo = .{
+// .server_id = "NBEK5DBBB4ZO5LTBGPXACZSB2QUTODC6GGN5NLOSPIGSRFWJID4XU52C",
+// .server_name = "bar",
+// .version = "2.11.8",
+// .go = "go1.24.6",
+// .headers = true,
+// .max_payload = 1048576,
+// };
+
+// var server = try std.Io.net.IpAddress.listen(
+// .{
+// .ip4 = .{
+// .bytes = .{ 0, 0, 0, 0 },
+// .port = info.port,
+// },
+// },
+// io,
+// .{},
+// );
+// defer server.deinit(io);
+
+// var group: std.Io.Group = .init;
+// defer group.wait(io);
+// for (0..5) |_| {
+// const stream = try server.accept(io);
+// group.async(io, handleConnection, .{ gpa, io, stream, info });
+// }
+// }
diff --git a/src/root.zig b/src/root.zig
index 12ebb2b..e46af60 100644
--- a/src/root.zig
+++ b/src/root.zig
@@ -1 +1,2 @@
pub const MessageParser = @import("server/message_parser.zig");
+pub const Server = @import("server/main.zig");
diff --git a/src/server/client.zig b/src/server/client.zig
index 8b49b89..bc02611 100644
--- a/src/server/client.zig
+++ b/src/server/client.zig
@@ -1,18 +1,17 @@
+const Message = @import("message_parser.zig").Message;
+const std = @import("std");
+
const ClientState = struct {
- verbose: bool = false,
- pedantic: bool = false,
- tls_required: bool = false,
- auth_token: ?[]const u8 = null,
- user: ?[]const u8 = null,
- pass: ?[]const u8 = null,
- name: ?[]const u8 = null,
- lang: []const u8,
- version: []const u8,
- protocol: u32,
- echo: ?bool = null,
- sig: ?[]const u8 = null,
- jwt: ?[]const u8 = null,
- no_responders: ?bool = null,
- headers: ?bool = null,
- nkey: ?[]const u8 = null,
+ id: u32,
+ /// Used to back `connect` strings.
+ string_buffer: [4096]u8,
+ connect: Message.Connect,
+ send_queue: std.Io.Queue(Message) = blk: {
+ var send_queue_buffer: [1024]Message = undefined;
+ break :blk .init(&send_queue_buffer);
+ },
+ recv_queue: std.Io.Queue(Message) = blk: {
+ var recv_queue_buffer: [1024]Message = undefined;
+ break :blk .init(&recv_queue_buffer);
+ },
};
diff --git a/src/server/main.zig b/src/server/main.zig
new file mode 100644
index 0000000..e0058a7
--- /dev/null
+++ b/src/server/main.zig
@@ -0,0 +1,109 @@
+const std = @import("std");
+const Message = @import("./message_parser.zig");
+
+const ClientState = @import("./client.zig");
+
+const ServerInfo = struct {
+ /// The unique identifier of the NATS server.
+ server_id: []const u8,
+ /// The name of the NATS server.
+ server_name: []const u8,
+ /// The version of NATS.
+ version: []const u8,
+ /// The version of golang the NATS server was built with.
+ go: []const u8 = "0.0.0",
+ /// The IP address used to start the NATS server,
+ /// by default this will be 0.0.0.0 and can be
+ /// configured with -client_advertise host:port.
+ host: []const u8 = "0.0.0.0",
+ /// The port number the NATS server is configured
+ /// to listen on.
+ port: u16 = 6868,
+ /// Whether the server supports headers.
+ headers: bool = false,
+ /// Maximum payload size, in bytes, that the server
+ /// will accept from the client.
+ max_payload: u64,
+ /// An integer indicating the protocol version of
+ /// the server. The server version 1.2.0 sets this
+ /// to 1 to indicate that it supports the "Echo"
+ /// feature.
+ proto: u32 = 1,
+};
+
+server_info: ServerInfo,
+clients: std.AutoHashMapUnmanaged(u64, ClientState) = .empty,
+/// Map of subjects to client IDs that are subscribed to that subject.
+subscriptions: std.StringHashMapUnmanaged(std.ArrayList(u64)),
+
+pub fn main(gpa: std.mem.Allocator, iter: *std.process.ArgIterator, main_args: anytype) !void {
+ _ = gpa;
+ _ = iter;
+ _ = main_args;
+}
+
+fn handleConnection(allocator: std.mem.Allocator, io: std.Io, stream: std.Io.net.Stream, info: ServerInfo) void {
+ defer stream.close(io);
+ var w_buffer: [1024]u8 = undefined;
+ var writer = stream.writer(io, &w_buffer);
+ const out = &writer.interface;
+
+ var r_buffer: [8192]u8 = undefined;
+ var reader = stream.reader(io, &r_buffer);
+ const in = &reader.interface;
+
+ processClient(allocator, in, out, info) catch |err| {
+ std.debug.panic("Error processing client: {}\n", .{err});
+ };
+}
+
+fn processClient(gpa: std.mem.Allocator, in: *std.Io.Reader, out: *std.Io.Writer, info: ServerInfo) !void {
+ try writeInfo(out, info);
+
+ var client_state_arena: std.heap.ArenaAllocator = .init(gpa);
+ defer client_state_arena.deinit();
+ const client_state = (try Message.next(client_state_arena.allocator(), in)).connect;
+ _ = client_state;
+
+ var message_parsing_arena: std.heap.ArenaAllocator = .init(gpa);
+ defer message_parsing_arena.deinit();
+ const message_parsing_allocator = message_parsing_arena.allocator();
+ while (true) {
+ defer _ = message_parsing_arena.reset(.retain_capacity);
+ const next_message = Message.next(message_parsing_allocator, in) catch |err| {
+ switch (err) {
+ error.EndOfStream => {
+ break;
+ },
+ else => {
+ return err;
+ },
+ }
+ };
+ switch (next_message) {
+ .connect => |connect| {
+ std.debug.panic("Connection message after already connected: {any}\n", .{connect});
+ },
+ .ping => try writePong(out),
+ .@"pub" => try writeOk(out),
+ else => |msg| std.debug.panic("Message type not implemented: {any}\n", .{msg}),
+ }
+ }
+}
+
+fn writeOk(out: *std.Io.Writer) !void {
+ _ = try out.write("+OK\r\n");
+ try out.flush();
+}
+
+fn writePong(out: *std.Io.Writer) !void {
+ _ = try out.write("PONG\r\n");
+ try out.flush();
+}
+
+fn writeInfo(out: *std.Io.Writer, info: ServerInfo) !void {
+ _ = try out.write("INFO ");
+ try std.json.Stringify.value(info, .{}, out);
+ _ = try out.write("\r\n");
+ try out.flush();
+}