summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-06 18:45:17 -0500
committerRobby Zambito <contact@robbyzambito.me>2026-01-06 20:43:49 -0500
commitb87412ee66197d4c89f1fbf93b32fe63ed1c63ab (patch)
tree00613d0d3f7178d0c5b974ce04a752443e9a816e /src
parent025a5344c8c922a8f46c4ee0e73a00ce0c3c4790 (diff)
Restructuring
Add a bunch of tests for the client
Diffstat (limited to 'src')
-rw-r--r--src/Server.zig (renamed from src/server/Server.zig)15
-rw-r--r--src/Server/Client.zig (renamed from src/server/Client.zig)189
-rw-r--r--src/Server/message_parser.zig (renamed from src/server/message_parser.zig)0
-rw-r--r--src/main.zig26
-rw-r--r--src/root.zig4
-rw-r--r--src/subcommand/server.zig (renamed from src/server/main.zig)0
6 files changed, 124 insertions, 110 deletions
diff --git a/src/server/Server.zig b/src/Server.zig
index 18214ae..e7d00b1 100644
--- a/src/server/Server.zig
+++ b/src/Server.zig
@@ -11,16 +11,18 @@ const Mutex = Io.Mutex;
const Queue = Io.Queue;
const Stream = std.Io.net.Stream;
-const message_parser = @import("./message_parser.zig");
+pub const Client = @import("./Server/Client.zig");
+
+const message_parser = @import("./Server/message_parser.zig");
+
pub const MessageType = message_parser.MessageType;
pub const Message = message_parser.Message;
const ServerInfo = Message.ServerInfo;
-pub const Client = @import("./Client.zig");
+
const Msgs = Client.Msgs;
const Server = @This();
const builtin = @import("builtin");
-const safe_build = builtin.mode == .Debug or builtin.mode == .ReleaseSafe;
pub const Subscription = struct {
subject: []const u8,
@@ -39,7 +41,7 @@ pub const Subscription = struct {
};
const eql = std.mem.eql;
-const log = std.log;
+const log = std.log.scoped(.zits);
const panic = std.debug.panic;
info: ServerInfo,
@@ -147,7 +149,10 @@ fn handleConnection(
var dba: std.heap.DebugAllocator(.{}) = .init;
dba.backing_allocator = server_allocator;
defer _ = dba.deinit();
- const alloc = if (safe_build) dba.allocator() else server_allocator;
+ const alloc = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe)
+ dba.allocator()
+ else
+ server_allocator;
// Set up client writer
const w_buffer: []u8 = try alloc.alloc(u8, w_buf_size);
diff --git a/src/server/Client.zig b/src/Server/Client.zig
index 690cabf..dff3534 100644
--- a/src/server/Client.zig
+++ b/src/Server/Client.zig
@@ -132,122 +132,109 @@ pub fn start(self: *Client, io: std.Io) !void {
}
pub fn send(self: *Client, io: std.Io, msg: Message) !void {
- try self.recv_queue.putOne(io, msg);
+ switch (msg) {
+ .MSG => |m| try self.msg_queue.putOne(io, .{ .MSG = m }),
+ .HMSG => |m| try self.msg_queue.putOne(io, .{ .HMSG = m }),
+ else => try self.recv_queue.putOne(io, msg),
+ }
+}
+
+test send {
+ const io = std.testing.io;
+ const gpa = std.testing.allocator;
+
+ var to_client: std.Io.Writer = .fixed(blk: {
+ var buf: [1024]u8 = undefined;
+ break :blk &buf;
+ });
+ var recv_queue: Queue(Message) = .init(&.{});
+ var msgs_queue: Queue(Msgs) = .init(blk: {
+ var buf: [1]Msgs = undefined;
+ break :blk &buf;
+ });
+ var client: Client = .init(null, gpa, &recv_queue, &msgs_queue, undefined, &to_client);
+ defer client.deinit(gpa);
+
+ var c_task = try io.concurrent(Client.start, .{ &client, io });
+ defer c_task.cancel(io) catch {};
+
+ {
+ try client.send(io, .PONG);
+ // Wait for the concurrent client task to write to the writer
+ try io.sleep(.fromMilliseconds(1), .awake);
+ try std.testing.expectEqualSlices(u8, "PONG\r\n", to_client.buffered());
+ }
+
+ to_client.end = 0;
+
+ {
+ const payload = "payload";
+ const msg: Message.Msg = .{
+ .sid = "1",
+ .subject = "subject",
+ .reply_to = "reply",
+ .payload = .{
+ .len = payload.len,
+ .short = blk: {
+ var buf: [128]u8 = undefined;
+ @memcpy(buf[0..payload.len], payload);
+ break :blk buf;
+ },
+ .long = null,
+ },
+ };
+ try client.send(io, .{
+ // msg must be owned by the allocator the client uses
+ .MSG = try msg.dupe(gpa),
+ });
+ try io.sleep(.fromMilliseconds(1), .awake);
+ try std.testing.expectEqualSlices(u8, "MSG subject 1 reply 7\r\npayload\r\n", to_client.buffered());
+ }
}
pub fn next(self: *Client, allocator: std.mem.Allocator) !Message {
return Message.next(allocator, self.from_client);
}
-test {
- const io = std.testing.io;
+test next {
const gpa = std.testing.allocator;
var from_client: std.Io.Reader = .fixed(
- "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n" ++
+ "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_r" ++
+ "equired\":false,\"name\":\"NATS CLI Version v0.2." ++
+ "4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"prot" ++
+ "ocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n" ++
"PING\r\n",
);
- var from_client_buf: [1024]Message = undefined;
- var from_client_queue: std.Io.Queue(Message) = .init(&from_client_buf);
+
+ var client: Client = .init(null, undefined, undefined, undefined, &from_client, undefined);
{
// Simulate stream
- while (Message.next(gpa, &from_client)) |msg| {
- try from_client_queue.putOne(io, msg);
- } else |err| switch (err) {
- error.EndOfStream => from_client_queue.close(io),
- else => return err,
- }
- while (from_client_queue.getOne(io)) |msg| {
- switch (msg) {
- .connect => |*c| {
- std.debug.print("Message: {any}\n", .{msg});
- c.deinit(gpa);
+ {
+ const msg = try client.next(gpa);
+ try std.testing.expectEqual(.CONNECT, std.meta.activeTag(msg));
+ defer msg.CONNECT.deinit(gpa);
+ try std.testing.expectEqualDeep(Message{
+ .CONNECT = .{
+ .verbose = false,
+ .pedantic = false,
+ .tls_required = false,
+ .name = "NATS CLI Version v0.2.4",
+ .lang = "go",
+ .version = "1.43.0",
+ .protocol = 1,
+ .echo = true,
+ .headers = true,
+ .no_responders = true,
},
- else => {
- std.debug.print("Message: {any}\n", .{msg});
- },
- }
- } else |_| {}
- }
+ }, msg);
+ }
- from_client_queue = .init(&from_client_buf);
- // Reset the reader to process it again.
- from_client.seek = 0;
-
- // {
- // const SemiClient = struct {
- // q: std.Io.Queue(Message),
-
- // fn parseClientInput(self: *@This(), ioh: std.Io, in: *std.Io.Reader) void {
- // defer std.debug.print("done parse\n", .{});
- // while (Message.next(gpa, in)) |msg| {
- // self.q.putOne(ioh, msg) catch return;
- // } else |_| {}
- // }
-
- // fn next(self: *@This(), ioh: std.Io) !Message {
- // return self.q.getOne(ioh);
- // }
-
- // fn printAll(self: *@This(), ioh: std.Io) void {
- // defer std.debug.print("done print\n", .{});
- // while (self.next(ioh)) |*msg| {
- // std.debug.print("Client msg: {any}\n", .{msg});
- // switch (msg.*) {
- // .connect => |c| {
- // c.deinit(gpa);
- // },
- // else => {},
- // }
- // } else |_| {}
- // }
- // };
-
- // var c: SemiClient = .{ .q = from_client_queue };
- // var group: std.Io.Group = .init;
- // defer group.wait(io);
-
- // group.concurrent(io, SemiClient.printAll, .{ &c, io }) catch {
- // @panic("could not start printAll\n");
- // };
-
- // group.concurrent(io, SemiClient.parseClientInput, .{ &c, io, &from_client }) catch {
- // @panic("could not start printAll\n");
- // };
- // }
-
- ////////
-
- // const connect = (Message.next(gpa, &from_client) catch unreachable).connect;
-
- // var to_client_alloc: std.Io.Writer.Allocating = .init(gpa);
- // defer to_client_alloc.deinit();
- // var to_client = to_client_alloc.writer;
-
- // var client: ClientState = try .init(io, gpa, 0, connect, &from_client, &to_client);
- // defer client.deinit(gpa);
-
- // {
- // var get_next = io.concurrent(ClientState.next, .{ &client, io }) catch unreachable;
- // defer if (get_next.cancel(io)) |_| {} else |_| @panic("fail");
-
- // var timeout = io.concurrent(std.Io.sleep, .{ io, .fromMilliseconds(1000), .awake }) catch unreachable;
- // defer timeout.cancel(io) catch {};
-
- // switch (try io.select(.{
- // .get_next = &get_next,
- // .timeout = &timeout,
- // })) {
- // .get_next => |next| {
- // std.debug.print("next is {any}\n", .{next});
- // try std.testing.expect((next catch |err| return err) == .ping);
- // },
- // .timeout => {
- // std.debug.print("reached timeout\n", .{});
- // return error.TestUnexpectedResult;
- // },
- // }
- // }
+ {
+ const msg = try client.next(gpa);
+ try std.testing.expectEqual(.PING, std.meta.activeTag(msg));
+ }
+ }
}
diff --git a/src/server/message_parser.zig b/src/Server/message_parser.zig
index fd1b5b1..fd1b5b1 100644
--- a/src/server/message_parser.zig
+++ b/src/Server/message_parser.zig
diff --git a/src/main.zig b/src/main.zig
index 47992af..a413fba 100644
--- a/src/main.zig
+++ b/src/main.zig
@@ -6,6 +6,8 @@ const yazap = @import("yazap");
const Message = zits.MessageParser.Message;
const Server = zits.Server;
+const serverSubcommand = @import("./subcommand/server.zig").main;
+
pub fn main() !void {
var dba: std.heap.DebugAllocator(.{}) = .init;
defer _ = dba.deinit();
@@ -67,7 +69,7 @@ pub fn main() !void {
info.server_name = name;
}
- try @import("./server/main.zig").main(gpa, info);
+ try serverSubcommand(gpa, info);
return;
} else if (matches.subcommandMatches("pub")) |_| {
std.debug.print("Unimplemented\n", .{});
@@ -76,3 +78,25 @@ pub fn main() !void {
try app.displayHelp(io);
}
+
+pub const std_options: std.Options = .{
+ // By default, in safe build modes, the standard library will attach a segfault handler to the program to
+ // print a helpful stack trace if a segmentation fault occurs. Here, we can disable this, or even enable
+ // it in unsafe build modes.
+ .enable_segfault_handler = true,
+ // This is the logging function used by `std.log`.
+ .logFn = myLogFn,
+};
+
+fn myLogFn(
+ comptime level: std.log.Level,
+ comptime scope: @EnumLiteral(),
+ comptime format: []const u8,
+ args: anytype,
+) void {
+ if (scope == .zits) {
+ std.log.defaultLog(level, std.log.default_log_scope, format, args);
+ } else {
+ std.log.defaultLog(level, scope, format, args);
+ }
+}
diff --git a/src/root.zig b/src/root.zig
index 49631cb..d4c7cd8 100644
--- a/src/root.zig
+++ b/src/root.zig
@@ -1,3 +1 @@
-const MessageParser = @import("server/message_parser.zig");
-
-pub const Server = @import("server/Server.zig");
+pub const Server = @import("Server.zig");
diff --git a/src/server/main.zig b/src/subcommand/server.zig
index 1aaf572..1aaf572 100644
--- a/src/server/main.zig
+++ b/src/subcommand/server.zig