summaryrefslogtreecommitdiff
path: root/src/Server/Client.zig
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-07 17:26:10 -0500
committerRobby Zambito <contact@robbyzambito.me>2026-01-07 23:19:19 -0500
commit96a3705069cf33a00ded143f876734c2a045cf1e (patch)
tree61fca89bddd554fb7f8e4800eddde965f7163b0b /src/Server/Client.zig
parente2a60c9427bfaf63149b4692459e86749553f755 (diff)
starting zero alloc parsing
Diffstat (limited to 'src/Server/Client.zig')
-rw-r--r--src/Server/Client.zig192
1 files changed, 88 insertions, 104 deletions
diff --git a/src/Server/Client.zig b/src/Server/Client.zig
index 9ec928c..047f38d 100644
--- a/src/Server/Client.zig
+++ b/src/Server/Client.zig
@@ -11,29 +11,23 @@ pub const Msgs = union(enum) {
};
connect: ?Message.Connect,
-// Used to own messages that we receive in our queues.
-alloc: std.mem.Allocator,
-
-// Messages for this client to receive.
-recv_queue: *Queue(Message),
-msg_queue: *Queue(Msgs),
+// Byte queue for this client to receive.
+recv_queue: *Queue(u8),
+// Only necessary to hold this lock for writing to the queue (to avoid interleaving message writes).
+recv_queue_write_lock: std.Io.Mutex = .init,
from_client: *std.Io.Reader,
to_client: *std.Io.Writer,
pub fn init(
connect: ?Message.Connect,
- alloc: std.mem.Allocator,
- recv_queue: *Queue(Message),
- msg_queue: *Queue(Msgs),
+ recv_queue: *Queue(u8),
in: *std.Io.Reader,
out: *std.Io.Writer,
) Client {
return .{
.connect = connect,
- .alloc = alloc,
.recv_queue = recv_queue,
- .msg_queue = msg_queue,
.from_client = in,
.to_client = out,
};
@@ -47,97 +41,91 @@ pub fn deinit(self: *Client, alloc: std.mem.Allocator) void {
}
pub fn start(self: *Client, io: std.Io) !void {
- var msgs_buf: [1024]Msgs = undefined;
-
- var recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch @panic("Concurrency unavailable");
- errdefer _ = recv_msgs_task.cancel(io) catch {};
-
- var recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable;
- errdefer _ = recv_proto_task.cancel(io) catch {};
-
+ std.debug.assert(self.to_client.buffer.len > 0);
+ std.debug.assert(self.to_client.end == 0);
while (true) {
- switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) {
- .msgs => |len_err| {
- @branchHint(.likely);
- const msgs = msgs_buf[0..try len_err];
- for (0..msgs.len) |i| {
- const msg = msgs[i];
- defer switch (msg) {
- .MSG => |m| m.deinit(self.alloc),
- .HMSG => |h| h.deinit(self.alloc),
- };
- errdefer for (msgs[i + 1 ..]) |mg| switch (mg) {
- .MSG => |m| {
- m.deinit(self.alloc);
- },
- .HMSG => |h| {
- h.deinit(self.alloc);
- },
- };
- switch (msg) {
- .MSG => |m| {
- try self.to_client.print(
- "MSG {s} {s} {s} {d}\r\n",
- .{
- m.subject,
- m.sid,
- m.reply_to orelse "",
- m.payload.len,
- },
- );
- try m.payload.write(self.to_client);
- try self.to_client.print("\r\n", .{});
- },
- .HMSG => |hmsg| {
- try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{
- hmsg.msg.subject,
- hmsg.msg.sid,
- hmsg.msg.reply_to orelse "",
- hmsg.header_bytes,
- hmsg.msg.payload.len,
- });
- try hmsg.msg.payload.write(self.to_client);
- try self.to_client.print("\r\n", .{});
- },
- }
- }
- recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable;
- },
- .proto => |msg_err| {
- @branchHint(.unlikely);
- const msg = try msg_err;
- switch (msg) {
- .@"+OK" => {
- _ = try self.to_client.write("+OK\r\n");
- },
- .PONG => {
- _ = try self.to_client.write("PONG\r\n");
- },
- .INFO => |info| {
- _ = try self.to_client.write("INFO ");
- try std.json.Stringify.value(info, .{}, self.to_client);
- _ = try self.to_client.write("\r\n");
- },
- .@"-ERR" => |s| {
- _ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
- },
- else => |m| {
- std.debug.panic("unimplemented write: {any}\n", .{m});
- },
- }
- recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable;
- },
- }
+ self.to_client.end = try self.recv_queue.get(io, self.to_client.buffer, 1);
try self.to_client.flush();
}
+ // while (true) {
+ // switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) {
+ // .msgs => |len_err| {
+ // @branchHint(.likely);
+ // const msgs = msgs_buf[0..try len_err];
+ // for (0..msgs.len) |i| {
+ // const msg = msgs[i];
+ // defer switch (msg) {
+ // .MSG => |m| m.deinit(self.alloc),
+ // .HMSG => |h| h.deinit(self.alloc),
+ // };
+ // errdefer for (msgs[i + 1 ..]) |mg| switch (mg) {
+ // .MSG => |m| {
+ // m.deinit(self.alloc);
+ // },
+ // .HMSG => |h| {
+ // h.deinit(self.alloc);
+ // },
+ // };
+ // switch (msg) {
+ // .MSG => |m| {
+ // try self.to_client.print(
+ // "MSG {s} {s} {s} {d}\r\n",
+ // .{
+ // m.subject,
+ // m.sid,
+ // m.reply_to orelse "",
+ // m.payload.len,
+ // },
+ // );
+ // try m.payload.write(self.to_client);
+ // try self.to_client.print("\r\n", .{});
+ // },
+ // .HMSG => |hmsg| {
+ // try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{
+ // hmsg.msg.subject,
+ // hmsg.msg.sid,
+ // hmsg.msg.reply_to orelse "",
+ // hmsg.header_bytes,
+ // hmsg.msg.payload.len,
+ // });
+ // try hmsg.msg.payload.write(self.to_client);
+ // try self.to_client.print("\r\n", .{});
+ // },
+ // }
+ // }
+ // recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable;
+ // },
+ // .proto => |msg_err| {
+ // @branchHint(.unlikely);
+ // const msg = try msg_err;
+ // switch (msg) {
+ // .@"+OK" => {
+ // _ = try self.to_client.write("+OK\r\n");
+ // },
+ // .PONG => {
+ // _ = try self.to_client.write("PONG\r\n");
+ // },
+ // .INFO => |info| {
+ // _ = try self.to_client.write("INFO ");
+ // try std.json.Stringify.value(info, .{}, self.to_client);
+ // _ = try self.to_client.write("\r\n");
+ // },
+ // .@"-ERR" => |s| {
+ // _ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
+ // },
+ // else => |m| {
+ // std.debug.panic("unimplemented write: {any}\n", .{m});
+ // },
+ // }
+ // recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable;
+ // },
+ // }
+ // try self.to_client.flush();
+ // }
}
-pub fn send(self: *Client, io: std.Io, msg: Message) !void {
- switch (msg) {
- .MSG => |m| try self.msg_queue.putOne(io, .{ .MSG = m }),
- .HMSG => |m| try self.msg_queue.putOne(io, .{ .HMSG = m }),
- else => try self.recv_queue.putOne(io, msg),
- }
+pub fn send(self: *Client, io: std.Io, msg: []const u8) !void {
+ try self.recv_queue.putAll(io, msg);
}
test send {
@@ -148,19 +136,15 @@ test send {
var buf: [1024]u8 = undefined;
break :blk &buf;
});
- var recv_queue: Queue(Message) = .init(&.{});
- var msgs_queue: Queue(Msgs) = .init(blk: {
- var buf: [1]Msgs = undefined;
- break :blk &buf;
- });
- var client: Client = .init(null, gpa, &recv_queue, &msgs_queue, undefined, &to_client);
+ var recv_queue: Queue(u8) = .init(&.{});
+ var client: Client = .init(null, &recv_queue, undefined, &to_client);
defer client.deinit(gpa);
var c_task = try io.concurrent(Client.start, .{ &client, io });
defer c_task.cancel(io) catch {};
{
- try client.send(io, .PONG);
+ try client.send(io, "PONG\r\n");
// Wait for the concurrent client task to write to the writer
try io.sleep(.fromMilliseconds(1), .awake);
try std.testing.expectEqualSlices(u8, "PONG\r\n", to_client.buffered());