From 987dc492a6ad8e3b4bd2f369d676a2d588342543 Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Thu, 1 Jan 2026 19:06:00 +0000 Subject: 97 mbps !!! super fast dosen't flush every message, pulls batches from the queue to send, and flushes at the end of each batch. batches are a min of 1 message, but may be more. --- src/server/client.zig | 11 ++++++----- src/server/main.zig | 7 ++++--- 2 files changed, 10 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/server/client.zig b/src/server/client.zig index 684a50f..f74e6b3 100644 --- a/src/server/client.zig +++ b/src/server/client.zig @@ -25,7 +25,7 @@ pub fn init( pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io.Queue(Message)) !void { self.recv_queue = queue; - var msgs: [16]Message = undefined; + var msgs: [8]Message = undefined; while (true) { const len = try queue.get(io, &msgs, 1); std.debug.assert(len <= msgs.len); @@ -49,6 +49,7 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io }, } } + try self.to_client.flush(); } } @@ -68,19 +69,20 @@ pub fn next(self: *Client, allocator: std.mem.Allocator) !Message { fn writeOk(out: *std.Io.Writer) !void { _ = try out.write("+OK\r\n"); - try out.flush(); +} + +fn writeErr(out: *std.Io.Writer, msg: []const u8) !void { + _ = try out.print("-ERR '{s}'\r\n", .{msg}); } fn writePong(out: *std.Io.Writer) !void { _ = try out.write("PONG\r\n"); - try out.flush(); } pub fn writeInfo(out: *std.Io.Writer, info: Message.ServerInfo) !void { _ = try out.write("INFO "); try std.json.Stringify.value(info, .{}, out); _ = try out.write("\r\n"); - try out.flush(); } fn writeMsg(out: *std.Io.Writer, msg: Message.Msg) !void { @@ -94,7 +96,6 @@ fn writeMsg(out: *std.Io.Writer, msg: Message.Msg) !void { msg.payload, }, ); - try out.flush(); } test { diff --git a/src/server/main.zig b/src/server/main.zig index e622304..d8e8e61 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -126,12 +126,12 @@ fn handleConnection( const allocator = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) client_allocator.allocator() else server_allocator; // Set up client writer - var w_buffer: [256]u8 = undefined; + var w_buffer: [1024]u8 = undefined; var writer = stream.writer(io, &w_buffer); const out = &writer.interface; // Set up client reader - var r_buffer: [256]u8 = undefined; + var r_buffer: [1024]u8 = undefined; var reader = stream.reader(io, &r_buffer); const in = &reader.interface; @@ -140,8 +140,9 @@ fn handleConnection( try server.addClient(server_allocator, id, &client); defer server.removeClient(io, server_allocator, id); - var qbuf: [16]Message = undefined; + var qbuf: [8]Message = undefined; var queue: std.Io.Queue(Message) = .init(&qbuf); + defer queue.close(io); var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator, &queue }); defer client_task.cancel(io) catch {}; -- cgit