diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-06 18:45:17 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-06 20:43:49 -0500 |
| commit | b87412ee66197d4c89f1fbf93b32fe63ed1c63ab (patch) | |
| tree | 00613d0d3f7178d0c5b974ce04a752443e9a816e /src/server/Client.zig | |
| parent | 025a5344c8c922a8f46c4ee0e73a00ce0c3c4790 (diff) | |
Restructuring
Add a bunch of tests for the client
Diffstat (limited to 'src/server/Client.zig')
| -rw-r--r-- | src/server/Client.zig | 253 |
1 files changed, 0 insertions, 253 deletions
diff --git a/src/server/Client.zig b/src/server/Client.zig deleted file mode 100644 index 690cabf..0000000 --- a/src/server/Client.zig +++ /dev/null @@ -1,253 +0,0 @@ -const Message = @import("message_parser.zig").Message; -const std = @import("std"); -const Queue = std.Io.Queue; - -const Client = @This(); - -pub const Msgs = union(enum) { - MSG: Message.Msg, - HMSG: Message.HMsg, -}; - -connect: ?Message.Connect, -// Used to own messages that we receive in our queues. -alloc: std.mem.Allocator, - -// Messages for this client to receive. -recv_queue: *Queue(Message), -msg_queue: *Queue(Msgs), - -from_client: *std.Io.Reader, -to_client: *std.Io.Writer, - -pub fn init( - connect: ?Message.Connect, - alloc: std.mem.Allocator, - recv_queue: *Queue(Message), - msg_queue: *Queue(Msgs), - in: *std.Io.Reader, - out: *std.Io.Writer, -) Client { - return .{ - .connect = connect, - .alloc = alloc, - .recv_queue = recv_queue, - .msg_queue = msg_queue, - .from_client = in, - .to_client = out, - }; -} - -pub fn deinit(self: *Client, alloc: std.mem.Allocator) void { - if (self.connect) |c| { - c.deinit(alloc); - } - self.* = undefined; -} - -pub fn start(self: *Client, io: std.Io) !void { - var msgs_buf: [1024]Msgs = undefined; - - var recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch @panic("Concurrency unavailable"); - errdefer _ = recv_msgs_task.cancel(io) catch {}; - - var recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; - errdefer _ = recv_proto_task.cancel(io) catch {}; - - while (true) { - switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) { - .msgs => |len_err| { - @branchHint(.likely); - const msgs = msgs_buf[0..try len_err]; - for (0..msgs.len) |i| { - const msg = msgs[i]; - defer switch (msg) { - .MSG => |m| m.deinit(self.alloc), - .HMSG => |h| h.deinit(self.alloc), - }; - errdefer for (msgs[i + 1 ..]) |mg| switch (mg) { - .MSG => |m| { - m.deinit(self.alloc); - }, - .HMSG => |h| { - h.deinit(self.alloc); - }, - }; - switch (msg) { - .MSG => |m| { - try self.to_client.print( - "MSG {s} {s} {s} {d}\r\n", - .{ - m.subject, - m.sid, - m.reply_to orelse "", - m.payload.len, - }, - ); - try m.payload.write(self.to_client); - try self.to_client.print("\r\n", .{}); - }, - .HMSG => |hmsg| { - try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{ - hmsg.msg.subject, - hmsg.msg.sid, - hmsg.msg.reply_to orelse "", - hmsg.header_bytes, - hmsg.msg.payload.len, - }); - try hmsg.msg.payload.write(self.to_client); - try self.to_client.print("\r\n", .{}); - }, - } - } - recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable; - }, - .proto => |msg_err| { - @branchHint(.unlikely); - const msg = try msg_err; - switch (msg) { - .@"+OK" => { - _ = try self.to_client.write("+OK\r\n"); - }, - .PONG => { - _ = try self.to_client.write("PONG\r\n"); - }, - .INFO => |info| { - _ = try self.to_client.write("INFO "); - try std.json.Stringify.value(info, .{}, self.to_client); - _ = try self.to_client.write("\r\n"); - }, - .@"-ERR" => |s| { - _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); - }, - else => |m| { - std.debug.panic("unimplemented write: {any}\n", .{m}); - }, - } - recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; - }, - } - try self.to_client.flush(); - } -} - -pub fn send(self: *Client, io: std.Io, msg: Message) !void { - try self.recv_queue.putOne(io, msg); -} - -pub fn next(self: *Client, allocator: std.mem.Allocator) !Message { - return Message.next(allocator, self.from_client); -} - -test { - const io = std.testing.io; - const gpa = std.testing.allocator; - - var from_client: std.Io.Reader = .fixed( - "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n" ++ - "PING\r\n", - ); - var from_client_buf: [1024]Message = undefined; - var from_client_queue: std.Io.Queue(Message) = .init(&from_client_buf); - - { - // Simulate stream - while (Message.next(gpa, &from_client)) |msg| { - try from_client_queue.putOne(io, msg); - } else |err| switch (err) { - error.EndOfStream => from_client_queue.close(io), - else => return err, - } - - while (from_client_queue.getOne(io)) |msg| { - switch (msg) { - .connect => |*c| { - std.debug.print("Message: {any}\n", .{msg}); - c.deinit(gpa); - }, - else => { - std.debug.print("Message: {any}\n", .{msg}); - }, - } - } else |_| {} - } - - from_client_queue = .init(&from_client_buf); - // Reset the reader to process it again. - from_client.seek = 0; - - // { - // const SemiClient = struct { - // q: std.Io.Queue(Message), - - // fn parseClientInput(self: *@This(), ioh: std.Io, in: *std.Io.Reader) void { - // defer std.debug.print("done parse\n", .{}); - // while (Message.next(gpa, in)) |msg| { - // self.q.putOne(ioh, msg) catch return; - // } else |_| {} - // } - - // fn next(self: *@This(), ioh: std.Io) !Message { - // return self.q.getOne(ioh); - // } - - // fn printAll(self: *@This(), ioh: std.Io) void { - // defer std.debug.print("done print\n", .{}); - // while (self.next(ioh)) |*msg| { - // std.debug.print("Client msg: {any}\n", .{msg}); - // switch (msg.*) { - // .connect => |c| { - // c.deinit(gpa); - // }, - // else => {}, - // } - // } else |_| {} - // } - // }; - - // var c: SemiClient = .{ .q = from_client_queue }; - // var group: std.Io.Group = .init; - // defer group.wait(io); - - // group.concurrent(io, SemiClient.printAll, .{ &c, io }) catch { - // @panic("could not start printAll\n"); - // }; - - // group.concurrent(io, SemiClient.parseClientInput, .{ &c, io, &from_client }) catch { - // @panic("could not start printAll\n"); - // }; - // } - - //////// - - // const connect = (Message.next(gpa, &from_client) catch unreachable).connect; - - // var to_client_alloc: std.Io.Writer.Allocating = .init(gpa); - // defer to_client_alloc.deinit(); - // var to_client = to_client_alloc.writer; - - // var client: ClientState = try .init(io, gpa, 0, connect, &from_client, &to_client); - // defer client.deinit(gpa); - - // { - // var get_next = io.concurrent(ClientState.next, .{ &client, io }) catch unreachable; - // defer if (get_next.cancel(io)) |_| {} else |_| @panic("fail"); - - // var timeout = io.concurrent(std.Io.sleep, .{ io, .fromMilliseconds(1000), .awake }) catch unreachable; - // defer timeout.cancel(io) catch {}; - - // switch (try io.select(.{ - // .get_next = &get_next, - // .timeout = &timeout, - // })) { - // .get_next => |next| { - // std.debug.print("next is {any}\n", .{next}); - // try std.testing.expect((next catch |err| return err) == .ping); - // }, - // .timeout => { - // std.debug.print("reached timeout\n", .{}); - // return error.TestUnexpectedResult; - // }, - // } - // } -} |
