From 96a3705069cf33a00ded143f876734c2a045cf1e Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Wed, 7 Jan 2026 17:26:10 -0500 Subject: starting zero alloc parsing --- src/Server/Client.zig | 192 +++++++++++++++++++++++--------------------------- 1 file changed, 88 insertions(+), 104 deletions(-) (limited to 'src/Server/Client.zig') diff --git a/src/Server/Client.zig b/src/Server/Client.zig index 9ec928c..047f38d 100644 --- a/src/Server/Client.zig +++ b/src/Server/Client.zig @@ -11,29 +11,23 @@ pub const Msgs = union(enum) { }; connect: ?Message.Connect, -// Used to own messages that we receive in our queues. -alloc: std.mem.Allocator, - -// Messages for this client to receive. -recv_queue: *Queue(Message), -msg_queue: *Queue(Msgs), +// Byte queue for this client to receive. +recv_queue: *Queue(u8), +// Only necessary to hold this lock for writing to the queue (to avoid interleaving message writes). +recv_queue_write_lock: std.Io.Mutex = .init, from_client: *std.Io.Reader, to_client: *std.Io.Writer, pub fn init( connect: ?Message.Connect, - alloc: std.mem.Allocator, - recv_queue: *Queue(Message), - msg_queue: *Queue(Msgs), + recv_queue: *Queue(u8), in: *std.Io.Reader, out: *std.Io.Writer, ) Client { return .{ .connect = connect, - .alloc = alloc, .recv_queue = recv_queue, - .msg_queue = msg_queue, .from_client = in, .to_client = out, }; @@ -47,97 +41,91 @@ pub fn deinit(self: *Client, alloc: std.mem.Allocator) void { } pub fn start(self: *Client, io: std.Io) !void { - var msgs_buf: [1024]Msgs = undefined; - - var recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch @panic("Concurrency unavailable"); - errdefer _ = recv_msgs_task.cancel(io) catch {}; - - var recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; - errdefer _ = recv_proto_task.cancel(io) catch {}; - + std.debug.assert(self.to_client.buffer.len > 0); + std.debug.assert(self.to_client.end == 0); while (true) { - switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) { - .msgs => |len_err| { - @branchHint(.likely); - const msgs = msgs_buf[0..try len_err]; - for (0..msgs.len) |i| { - const msg = msgs[i]; - defer switch (msg) { - .MSG => |m| m.deinit(self.alloc), - .HMSG => |h| h.deinit(self.alloc), - }; - errdefer for (msgs[i + 1 ..]) |mg| switch (mg) { - .MSG => |m| { - m.deinit(self.alloc); - }, - .HMSG => |h| { - h.deinit(self.alloc); - }, - }; - switch (msg) { - .MSG => |m| { - try self.to_client.print( - "MSG {s} {s} {s} {d}\r\n", - .{ - m.subject, - m.sid, - m.reply_to orelse "", - m.payload.len, - }, - ); - try m.payload.write(self.to_client); - try self.to_client.print("\r\n", .{}); - }, - .HMSG => |hmsg| { - try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{ - hmsg.msg.subject, - hmsg.msg.sid, - hmsg.msg.reply_to orelse "", - hmsg.header_bytes, - hmsg.msg.payload.len, - }); - try hmsg.msg.payload.write(self.to_client); - try self.to_client.print("\r\n", .{}); - }, - } - } - recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable; - }, - .proto => |msg_err| { - @branchHint(.unlikely); - const msg = try msg_err; - switch (msg) { - .@"+OK" => { - _ = try self.to_client.write("+OK\r\n"); - }, - .PONG => { - _ = try self.to_client.write("PONG\r\n"); - }, - .INFO => |info| { - _ = try self.to_client.write("INFO "); - try std.json.Stringify.value(info, .{}, self.to_client); - _ = try self.to_client.write("\r\n"); - }, - .@"-ERR" => |s| { - _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); - }, - else => |m| { - std.debug.panic("unimplemented write: {any}\n", .{m}); - }, - } - recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; - }, - } + self.to_client.end = try self.recv_queue.get(io, self.to_client.buffer, 1); try self.to_client.flush(); } + // while (true) { + // switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) { + // .msgs => |len_err| { + // @branchHint(.likely); + // const msgs = msgs_buf[0..try len_err]; + // for (0..msgs.len) |i| { + // const msg = msgs[i]; + // defer switch (msg) { + // .MSG => |m| m.deinit(self.alloc), + // .HMSG => |h| h.deinit(self.alloc), + // }; + // errdefer for (msgs[i + 1 ..]) |mg| switch (mg) { + // .MSG => |m| { + // m.deinit(self.alloc); + // }, + // .HMSG => |h| { + // h.deinit(self.alloc); + // }, + // }; + // switch (msg) { + // .MSG => |m| { + // try self.to_client.print( + // "MSG {s} {s} {s} {d}\r\n", + // .{ + // m.subject, + // m.sid, + // m.reply_to orelse "", + // m.payload.len, + // }, + // ); + // try m.payload.write(self.to_client); + // try self.to_client.print("\r\n", .{}); + // }, + // .HMSG => |hmsg| { + // try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{ + // hmsg.msg.subject, + // hmsg.msg.sid, + // hmsg.msg.reply_to orelse "", + // hmsg.header_bytes, + // hmsg.msg.payload.len, + // }); + // try hmsg.msg.payload.write(self.to_client); + // try self.to_client.print("\r\n", .{}); + // }, + // } + // } + // recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable; + // }, + // .proto => |msg_err| { + // @branchHint(.unlikely); + // const msg = try msg_err; + // switch (msg) { + // .@"+OK" => { + // _ = try self.to_client.write("+OK\r\n"); + // }, + // .PONG => { + // _ = try self.to_client.write("PONG\r\n"); + // }, + // .INFO => |info| { + // _ = try self.to_client.write("INFO "); + // try std.json.Stringify.value(info, .{}, self.to_client); + // _ = try self.to_client.write("\r\n"); + // }, + // .@"-ERR" => |s| { + // _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); + // }, + // else => |m| { + // std.debug.panic("unimplemented write: {any}\n", .{m}); + // }, + // } + // recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable; + // }, + // } + // try self.to_client.flush(); + // } } -pub fn send(self: *Client, io: std.Io, msg: Message) !void { - switch (msg) { - .MSG => |m| try self.msg_queue.putOne(io, .{ .MSG = m }), - .HMSG => |m| try self.msg_queue.putOne(io, .{ .HMSG = m }), - else => try self.recv_queue.putOne(io, msg), - } +pub fn send(self: *Client, io: std.Io, msg: []const u8) !void { + try self.recv_queue.putAll(io, msg); } test send { @@ -148,19 +136,15 @@ test send { var buf: [1024]u8 = undefined; break :blk &buf; }); - var recv_queue: Queue(Message) = .init(&.{}); - var msgs_queue: Queue(Msgs) = .init(blk: { - var buf: [1]Msgs = undefined; - break :blk &buf; - }); - var client: Client = .init(null, gpa, &recv_queue, &msgs_queue, undefined, &to_client); + var recv_queue: Queue(u8) = .init(&.{}); + var client: Client = .init(null, &recv_queue, undefined, &to_client); defer client.deinit(gpa); var c_task = try io.concurrent(Client.start, .{ &client, io }); defer c_task.cancel(io) catch {}; { - try client.send(io, .PONG); + try client.send(io, "PONG\r\n"); // Wait for the concurrent client task to write to the writer try io.sleep(.fromMilliseconds(1), .awake); try std.testing.expectEqualSlices(u8, "PONG\r\n", to_client.buffered()); -- cgit