diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-01 05:12:51 +0000 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-01 05:22:03 +0000 |
| commit | 45bd63dbe1e427f51114edf5df8a1f86dd9fd1b1 (patch) | |
| tree | 94e42dc5d1057c394fdcb7a3035c6060bcf9eb89 /src/server/client.zig | |
| parent | 5dea33367ee56abee7377f6e016d941a518f33b6 (diff) | |
Actually fast again???
way faster than before even??
coder@08714a4174bb:~$ nats bench pub foo -s localhost:4223
05:12:23 Starting Core NATS publisher benchmark [clients=1, msg-size=128 B, msgs=100,000, multi-subject=false, multi-subject-max=100,000, sleep=0s, subject=foo]
05:12:23 [1] Starting Core NATS publisher, publishing 100,000 messages
Finished 0s [====================================================================================] 100%
NATS Core NATS publisher stats: 574,666 msgs/sec ~ 70 MiB/sec ~ 1.74us
So cool.
src/server/client.zig JJ: M src/server/main.zig JJ: JJ: Lines starting with "JJ:" (like this one) will be
removed.
Diffstat (limited to 'src/server/client.zig')
| -rw-r--r-- | src/server/client.zig | 57 |
1 files changed, 34 insertions, 23 deletions
diff --git a/src/server/client.zig b/src/server/client.zig index 44de542..684a50f 100644 --- a/src/server/client.zig +++ b/src/server/client.zig @@ -5,7 +5,8 @@ const Client = @This(); connect: ?Message.Connect, -write_lock: std.Io.Mutex, +// Messages for this client to receive. +recv_queue: ?*std.Io.Queue(Message) = null, from_client: *std.Io.Reader, to_client: *std.Io.Writer, @@ -17,36 +18,46 @@ pub fn init( ) Client { return .{ .connect = connect, - .write_lock = .init, .from_client = in, .to_client = out, }; } -/// Return true if the value was put in the clients buffer to process, else false. -pub fn send(self: *Client, io: std.Io, msg: Message) !void { - try self.write_lock.lock(io); - defer self.write_lock.unlock(io); - - switch (msg) { - .@"+ok" => { - try writeOk(self.to_client); - }, - .pong => { - try writePong(self.to_client); - }, - .info => |info| { - try writeInfo(self.to_client, info); - }, - .msg => |m| { - try writeMsg(self.to_client, m); - }, - else => { - std.debug.panic("unimplemented write", .{}); - }, +pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io.Queue(Message)) !void { + self.recv_queue = queue; + var msgs: [16]Message = undefined; + while (true) { + const len = try queue.get(io, &msgs, 1); + std.debug.assert(len <= msgs.len); + for (msgs[0..len]) |msg| { + switch (msg) { + .@"+ok" => { + try writeOk(self.to_client); + }, + .pong => { + try writePong(self.to_client); + }, + .info => |info| { + try writeInfo(self.to_client, info); + }, + .msg => |m| { + defer m.deinit(alloc); + try writeMsg(self.to_client, m); + }, + else => |m| { + std.debug.panic("unimplemented write: {any}\n", .{m}); + }, + } + } } } +pub fn send(self: *Client, io: std.Io, msg: Message) !void { + if (self.recv_queue) |queue| { + try queue.putOne(io, msg); + } else @panic("Must start() the client before sending it messages."); +} + pub fn next(self: *Client, allocator: std.mem.Allocator) !Message { // std.debug.print("in client awaiting next message\n", .{}); // errdefer std.debug.print("actually it was canceled\n", .{}); |
