From bd9829f6842f0c989389aa4ce9784ab6e3cb4ee5 Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Sat, 3 Jan 2026 05:33:13 +0000 Subject: Organize things Making it easier to use the server as a library --- src/server/Client.zig | 221 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 221 insertions(+) create mode 100644 src/server/Client.zig (limited to 'src/server/Client.zig') diff --git a/src/server/Client.zig b/src/server/Client.zig new file mode 100644 index 0000000..2ce3c38 --- /dev/null +++ b/src/server/Client.zig @@ -0,0 +1,221 @@ +const Message = @import("message_parser.zig").Message; +const std = @import("std"); + +const Client = @This(); + +connect: ?Message.Connect, + +// Messages for this client to receive. +recv_queue: *std.Io.Queue(Message), + +from_client: *std.Io.Reader, +to_client: *std.Io.Writer, + +pub fn init( + connect: ?Message.Connect, + recv_queue: *std.Io.Queue(Message), + in: *std.Io.Reader, + out: *std.Io.Writer, +) Client { + return .{ + .connect = connect, + .recv_queue = recv_queue, + .from_client = in, + .to_client = out, + }; +} + +pub fn deinit(self: *Client, alloc: std.mem.Allocator) void { + if (self.connect) |c| { + c.deinit(alloc); + } + self.* = undefined; +} + +pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { + var msgs: [8]Message = undefined; + + while (true) { + const len = try self.recv_queue.get(io, &msgs, 1); + std.debug.assert(len <= msgs.len); + for (0..len) |i| { + const msg = msgs[i]; + defer switch (msg) { + .msg => |m| m.deinit(alloc), + .hmsg => |h| h.deinit(alloc), + else => {}, + }; + errdefer for (msgs[i + 1 .. len]) |mg| switch (mg) { + .msg => |m| { + m.deinit(alloc); + }, + else => {}, + }; + switch (msg) { + .@"+ok" => { + _ = try self.to_client.write("+OK\r\n"); + }, + .pong => { + _ = try self.to_client.write("PONG\r\n"); + }, + .info => |info| { + _ = try self.to_client.write("INFO "); + try std.json.Stringify.value(info, .{}, self.to_client); + _ = try self.to_client.write("\r\n"); + }, + .msg => |m| { + @branchHint(.likely); + try self.to_client.print( + "MSG {s} {s} {s} {d}\r\n{s}\r\n", + .{ + m.subject, + m.sid, + m.reply_to orelse "", + m.payload.len, + m.payload, + }, + ); + }, + .hmsg => |hmsg| { + @branchHint(.likely); + try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n{s}\r\n", .{ + hmsg.msg.subject, + hmsg.msg.sid, + hmsg.msg.reply_to orelse "", + hmsg.header_bytes, + hmsg.msg.payload.len, + hmsg.msg.payload, + }); + }, + .@"-err" => |s| { + _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); + }, + else => |m| { + std.debug.panic("unimplemented write: {any}\n", .{m}); + }, + } + } + try self.to_client.flush(); + } +} + +pub fn send(self: *Client, io: std.Io, msg: Message) !void { + try self.recv_queue.putOne(io, msg); +} + +pub fn next(self: *Client, allocator: std.mem.Allocator) !Message { + return Message.next(allocator, self.from_client); +} + +test { + const io = std.testing.io; + const gpa = std.testing.allocator; + + var from_client: std.Io.Reader = .fixed( + "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n" ++ + "PING\r\n", + ); + var from_client_buf: [1024]Message = undefined; + var from_client_queue: std.Io.Queue(Message) = .init(&from_client_buf); + + { + // Simulate stream + while (Message.next(gpa, &from_client)) |msg| { + try from_client_queue.putOne(io, msg); + } else |err| switch (err) { + error.EndOfStream => from_client_queue.close(io), + else => return err, + } + + while (from_client_queue.getOne(io)) |msg| { + switch (msg) { + .connect => |*c| { + std.debug.print("Message: {any}\n", .{msg}); + c.deinit(gpa); + }, + else => { + std.debug.print("Message: {any}\n", .{msg}); + }, + } + } else |_| {} + } + + from_client_queue = .init(&from_client_buf); + // Reset the reader to process it again. + from_client.seek = 0; + + // { + // const SemiClient = struct { + // q: std.Io.Queue(Message), + + // fn parseClientInput(self: *@This(), ioh: std.Io, in: *std.Io.Reader) void { + // defer std.debug.print("done parse\n", .{}); + // while (Message.next(gpa, in)) |msg| { + // self.q.putOne(ioh, msg) catch return; + // } else |_| {} + // } + + // fn next(self: *@This(), ioh: std.Io) !Message { + // return self.q.getOne(ioh); + // } + + // fn printAll(self: *@This(), ioh: std.Io) void { + // defer std.debug.print("done print\n", .{}); + // while (self.next(ioh)) |*msg| { + // std.debug.print("Client msg: {any}\n", .{msg}); + // switch (msg.*) { + // .connect => |c| { + // c.deinit(gpa); + // }, + // else => {}, + // } + // } else |_| {} + // } + // }; + + // var c: SemiClient = .{ .q = from_client_queue }; + // var group: std.Io.Group = .init; + // defer group.wait(io); + + // group.concurrent(io, SemiClient.printAll, .{ &c, io }) catch { + // @panic("could not start printAll\n"); + // }; + + // group.concurrent(io, SemiClient.parseClientInput, .{ &c, io, &from_client }) catch { + // @panic("could not start printAll\n"); + // }; + // } + + //////// + + // const connect = (Message.next(gpa, &from_client) catch unreachable).connect; + + // var to_client_alloc: std.Io.Writer.Allocating = .init(gpa); + // defer to_client_alloc.deinit(); + // var to_client = to_client_alloc.writer; + + // var client: ClientState = try .init(io, gpa, 0, connect, &from_client, &to_client); + // defer client.deinit(gpa); + + // { + // var get_next = io.concurrent(ClientState.next, .{ &client, io }) catch unreachable; + // defer if (get_next.cancel(io)) |_| {} else |_| @panic("fail"); + + // var timeout = io.concurrent(std.Io.sleep, .{ io, .fromMilliseconds(1000), .awake }) catch unreachable; + // defer timeout.cancel(io) catch {}; + + // switch (try io.select(.{ + // .get_next = &get_next, + // .timeout = &timeout, + // })) { + // .get_next => |next| { + // std.debug.print("next is {any}\n", .{next}); + // try std.testing.expect((next catch |err| return err) == .ping); + // }, + // .timeout => { + // std.debug.print("reached timeout\n", .{}); + // return error.TestUnexpectedResult; + // }, + // } + // } +} -- cgit