diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-06 18:45:17 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-06 20:43:49 -0500 |
| commit | b87412ee66197d4c89f1fbf93b32fe63ed1c63ab (patch) | |
| tree | 00613d0d3f7178d0c5b974ce04a752443e9a816e /src | |
| parent | 025a5344c8c922a8f46c4ee0e73a00ce0c3c4790 (diff) | |
Restructuring
Add a bunch of tests for the client
Diffstat (limited to 'src')
| -rw-r--r-- | src/Server.zig (renamed from src/server/Server.zig) | 15 | ||||
| -rw-r--r-- | src/Server/Client.zig (renamed from src/server/Client.zig) | 189 | ||||
| -rw-r--r-- | src/Server/message_parser.zig (renamed from src/server/message_parser.zig) | 0 | ||||
| -rw-r--r-- | src/main.zig | 26 | ||||
| -rw-r--r-- | src/root.zig | 4 | ||||
| -rw-r--r-- | src/subcommand/server.zig (renamed from src/server/main.zig) | 0 |
6 files changed, 124 insertions, 110 deletions
diff --git a/src/server/Server.zig b/src/Server.zig index 18214ae..e7d00b1 100644 --- a/src/server/Server.zig +++ b/src/Server.zig @@ -11,16 +11,18 @@ const Mutex = Io.Mutex; const Queue = Io.Queue; const Stream = std.Io.net.Stream; -const message_parser = @import("./message_parser.zig"); +pub const Client = @import("./Server/Client.zig"); + +const message_parser = @import("./Server/message_parser.zig"); + pub const MessageType = message_parser.MessageType; pub const Message = message_parser.Message; const ServerInfo = Message.ServerInfo; -pub const Client = @import("./Client.zig"); + const Msgs = Client.Msgs; const Server = @This(); const builtin = @import("builtin"); -const safe_build = builtin.mode == .Debug or builtin.mode == .ReleaseSafe; pub const Subscription = struct { subject: []const u8, @@ -39,7 +41,7 @@ pub const Subscription = struct { }; const eql = std.mem.eql; -const log = std.log; +const log = std.log.scoped(.zits); const panic = std.debug.panic; info: ServerInfo, @@ -147,7 +149,10 @@ fn handleConnection( var dba: std.heap.DebugAllocator(.{}) = .init; dba.backing_allocator = server_allocator; defer _ = dba.deinit(); - const alloc = if (safe_build) dba.allocator() else server_allocator; + const alloc = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) + dba.allocator() + else + server_allocator; // Set up client writer const w_buffer: []u8 = try alloc.alloc(u8, w_buf_size); diff --git a/src/server/Client.zig b/src/Server/Client.zig index 690cabf..dff3534 100644 --- a/src/server/Client.zig +++ b/src/Server/Client.zig @@ -132,122 +132,109 @@ pub fn start(self: *Client, io: std.Io) !void { } pub fn send(self: *Client, io: std.Io, msg: Message) !void { - try self.recv_queue.putOne(io, msg); + switch (msg) { + .MSG => |m| try self.msg_queue.putOne(io, .{ .MSG = m }), + .HMSG => |m| try self.msg_queue.putOne(io, .{ .HMSG = m }), + else => try self.recv_queue.putOne(io, msg), + } +} + +test send { + const io = std.testing.io; + const gpa = std.testing.allocator; + + var to_client: std.Io.Writer = .fixed(blk: { + var buf: [1024]u8 = undefined; + break :blk &buf; + }); + var recv_queue: Queue(Message) = .init(&.{}); + var msgs_queue: Queue(Msgs) = .init(blk: { + var buf: [1]Msgs = undefined; + break :blk &buf; + }); + var client: Client = .init(null, gpa, &recv_queue, &msgs_queue, undefined, &to_client); + defer client.deinit(gpa); + + var c_task = try io.concurrent(Client.start, .{ &client, io }); + defer c_task.cancel(io) catch {}; + + { + try client.send(io, .PONG); + // Wait for the concurrent client task to write to the writer + try io.sleep(.fromMilliseconds(1), .awake); + try std.testing.expectEqualSlices(u8, "PONG\r\n", to_client.buffered()); + } + + to_client.end = 0; + + { + const payload = "payload"; + const msg: Message.Msg = .{ + .sid = "1", + .subject = "subject", + .reply_to = "reply", + .payload = .{ + .len = payload.len, + .short = blk: { + var buf: [128]u8 = undefined; + @memcpy(buf[0..payload.len], payload); + break :blk buf; + }, + .long = null, + }, + }; + try client.send(io, .{ + // msg must be owned by the allocator the client uses + .MSG = try msg.dupe(gpa), + }); + try io.sleep(.fromMilliseconds(1), .awake); + try std.testing.expectEqualSlices(u8, "MSG subject 1 reply 7\r\npayload\r\n", to_client.buffered()); + } } pub fn next(self: *Client, allocator: std.mem.Allocator) !Message { return Message.next(allocator, self.from_client); } -test { - const io = std.testing.io; +test next { const gpa = std.testing.allocator; var from_client: std.Io.Reader = .fixed( - "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n" ++ + "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_r" ++ + "equired\":false,\"name\":\"NATS CLI Version v0.2." ++ + "4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"prot" ++ + "ocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n" ++ "PING\r\n", ); - var from_client_buf: [1024]Message = undefined; - var from_client_queue: std.Io.Queue(Message) = .init(&from_client_buf); + + var client: Client = .init(null, undefined, undefined, undefined, &from_client, undefined); { // Simulate stream - while (Message.next(gpa, &from_client)) |msg| { - try from_client_queue.putOne(io, msg); - } else |err| switch (err) { - error.EndOfStream => from_client_queue.close(io), - else => return err, - } - while (from_client_queue.getOne(io)) |msg| { - switch (msg) { - .connect => |*c| { - std.debug.print("Message: {any}\n", .{msg}); - c.deinit(gpa); + { + const msg = try client.next(gpa); + try std.testing.expectEqual(.CONNECT, std.meta.activeTag(msg)); + defer msg.CONNECT.deinit(gpa); + try std.testing.expectEqualDeep(Message{ + .CONNECT = .{ + .verbose = false, + .pedantic = false, + .tls_required = false, + .name = "NATS CLI Version v0.2.4", + .lang = "go", + .version = "1.43.0", + .protocol = 1, + .echo = true, + .headers = true, + .no_responders = true, }, - else => { - std.debug.print("Message: {any}\n", .{msg}); - }, - } - } else |_| {} - } + }, msg); + } - from_client_queue = .init(&from_client_buf); - // Reset the reader to process it again. - from_client.seek = 0; - - // { - // const SemiClient = struct { - // q: std.Io.Queue(Message), - - // fn parseClientInput(self: *@This(), ioh: std.Io, in: *std.Io.Reader) void { - // defer std.debug.print("done parse\n", .{}); - // while (Message.next(gpa, in)) |msg| { - // self.q.putOne(ioh, msg) catch return; - // } else |_| {} - // } - - // fn next(self: *@This(), ioh: std.Io) !Message { - // return self.q.getOne(ioh); - // } - - // fn printAll(self: *@This(), ioh: std.Io) void { - // defer std.debug.print("done print\n", .{}); - // while (self.next(ioh)) |*msg| { - // std.debug.print("Client msg: {any}\n", .{msg}); - // switch (msg.*) { - // .connect => |c| { - // c.deinit(gpa); - // }, - // else => {}, - // } - // } else |_| {} - // } - // }; - - // var c: SemiClient = .{ .q = from_client_queue }; - // var group: std.Io.Group = .init; - // defer group.wait(io); - - // group.concurrent(io, SemiClient.printAll, .{ &c, io }) catch { - // @panic("could not start printAll\n"); - // }; - - // group.concurrent(io, SemiClient.parseClientInput, .{ &c, io, &from_client }) catch { - // @panic("could not start printAll\n"); - // }; - // } - - //////// - - // const connect = (Message.next(gpa, &from_client) catch unreachable).connect; - - // var to_client_alloc: std.Io.Writer.Allocating = .init(gpa); - // defer to_client_alloc.deinit(); - // var to_client = to_client_alloc.writer; - - // var client: ClientState = try .init(io, gpa, 0, connect, &from_client, &to_client); - // defer client.deinit(gpa); - - // { - // var get_next = io.concurrent(ClientState.next, .{ &client, io }) catch unreachable; - // defer if (get_next.cancel(io)) |_| {} else |_| @panic("fail"); - - // var timeout = io.concurrent(std.Io.sleep, .{ io, .fromMilliseconds(1000), .awake }) catch unreachable; - // defer timeout.cancel(io) catch {}; - - // switch (try io.select(.{ - // .get_next = &get_next, - // .timeout = &timeout, - // })) { - // .get_next => |next| { - // std.debug.print("next is {any}\n", .{next}); - // try std.testing.expect((next catch |err| return err) == .ping); - // }, - // .timeout => { - // std.debug.print("reached timeout\n", .{}); - // return error.TestUnexpectedResult; - // }, - // } - // } + { + const msg = try client.next(gpa); + try std.testing.expectEqual(.PING, std.meta.activeTag(msg)); + } + } } diff --git a/src/server/message_parser.zig b/src/Server/message_parser.zig index fd1b5b1..fd1b5b1 100644 --- a/src/server/message_parser.zig +++ b/src/Server/message_parser.zig diff --git a/src/main.zig b/src/main.zig index 47992af..a413fba 100644 --- a/src/main.zig +++ b/src/main.zig @@ -6,6 +6,8 @@ const yazap = @import("yazap"); const Message = zits.MessageParser.Message; const Server = zits.Server; +const serverSubcommand = @import("./subcommand/server.zig").main; + pub fn main() !void { var dba: std.heap.DebugAllocator(.{}) = .init; defer _ = dba.deinit(); @@ -67,7 +69,7 @@ pub fn main() !void { info.server_name = name; } - try @import("./server/main.zig").main(gpa, info); + try serverSubcommand(gpa, info); return; } else if (matches.subcommandMatches("pub")) |_| { std.debug.print("Unimplemented\n", .{}); @@ -76,3 +78,25 @@ pub fn main() !void { try app.displayHelp(io); } + +pub const std_options: std.Options = .{ + // By default, in safe build modes, the standard library will attach a segfault handler to the program to + // print a helpful stack trace if a segmentation fault occurs. Here, we can disable this, or even enable + // it in unsafe build modes. + .enable_segfault_handler = true, + // This is the logging function used by `std.log`. + .logFn = myLogFn, +}; + +fn myLogFn( + comptime level: std.log.Level, + comptime scope: @EnumLiteral(), + comptime format: []const u8, + args: anytype, +) void { + if (scope == .zits) { + std.log.defaultLog(level, std.log.default_log_scope, format, args); + } else { + std.log.defaultLog(level, scope, format, args); + } +} diff --git a/src/root.zig b/src/root.zig index 49631cb..d4c7cd8 100644 --- a/src/root.zig +++ b/src/root.zig @@ -1,3 +1 @@ -const MessageParser = @import("server/message_parser.zig"); - -pub const Server = @import("server/Server.zig"); +pub const Server = @import("Server.zig"); diff --git a/src/server/main.zig b/src/subcommand/server.zig index 1aaf572..1aaf572 100644 --- a/src/server/main.zig +++ b/src/subcommand/server.zig |
