summaryrefslogtreecommitdiff
path: root/src/server/Server.zig
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-04 20:25:30 -0500
committerRobby Zambito <contact@robbyzambito.me>2026-01-04 20:57:31 -0500
commit69528a1b72bc578430e3a3e12d7cd63680986c29 (patch)
treebccc5fabd0e3966080cec90da9041156734719ee /src/server/Server.zig
parente81bcda9208a01daa87ab5f74fa27439c2fd05f3 (diff)
Probe for optimal network buffer size.
We want to match the underlying system socket buffer. Filling this buffer minimizes the number of syscalls we do. Larger would be a waste. Also changed parser to use enums that more closely match the NATS message types.
Diffstat (limited to 'src/server/Server.zig')
-rw-r--r--src/server/Server.zig172
1 files changed, 117 insertions, 55 deletions
diff --git a/src/server/Server.zig b/src/server/Server.zig
index eaecdf2..f7b849c 100644
--- a/src/server/Server.zig
+++ b/src/server/Server.zig
@@ -4,6 +4,7 @@ const ArrayList = std.ArrayList;
const AutoHashMapUnmanaged = std.AutoHashMapUnmanaged;
const Io = std.Io;
+const Dir = Io.Dir;
const Group = Io.Group;
const IpAddress = std.Io.net.IpAddress;
const Mutex = Io.Mutex;
@@ -21,6 +22,7 @@ pub const Subscription = struct {
subject: []const u8,
client_id: usize,
sid: []const u8,
+ queue: *Queue(Message),
fn deinit(self: Subscription, alloc: Allocator) void {
alloc.free(self.subject);
@@ -63,13 +65,24 @@ pub fn start(server: *Server, io: Io, gpa: Allocator) !void {
var client_group: Group = .init;
defer client_group.cancel(io);
+ const read_buffer_size, const write_buffer_size = getBufferSizes(io);
+ log.debug("read buf: {d} write buf: {d}", .{ read_buffer_size, write_buffer_size });
+
var id: usize = 0;
while (true) : (id +%= 1) {
if (server.clients.contains(id)) continue;
log.debug("Accepting next client", .{});
const stream = try tcp_server.accept(io);
log.debug("Accepted connection {d}", .{id});
- _ = client_group.concurrent(io, handleConnectionInfallible, .{ server, gpa, io, id, stream }) catch {
+ _ = client_group.concurrent(io, handleConnectionInfallible, .{
+ server,
+ gpa,
+ io,
+ id,
+ stream,
+ read_buffer_size,
+ write_buffer_size,
+ }) catch {
log.err("Could not start concurrent handler for {d}", .{id});
stream.close(io);
};
@@ -96,13 +109,29 @@ fn removeClient(server: *Server, io: Io, allocator: Allocator, id: usize) void {
}
}
-fn handleConnectionInfallible(server: *Server, server_allocator: Allocator, io: Io, id: usize, stream: Stream) void {
- handleConnection(server, server_allocator, io, id, stream) catch |err| {
+fn handleConnectionInfallible(
+ server: *Server,
+ server_allocator: Allocator,
+ io: Io,
+ id: usize,
+ stream: Stream,
+ r_buf_size: usize,
+ w_buf_size: usize,
+) void {
+ handleConnection(server, server_allocator, io, id, stream, r_buf_size, w_buf_size) catch |err| {
log.err("Failed processing client {d}: {any}", .{ id, err });
};
}
-fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: usize, stream: Stream) !void {
+fn handleConnection(
+ server: *Server,
+ server_allocator: Allocator,
+ io: Io,
+ id: usize,
+ stream: Stream,
+ r_buf_size: usize,
+ w_buf_size: usize,
+) !void {
defer stream.close(io);
// TODO: use a client allocator for things that should only live for as long as the client?
@@ -111,26 +140,27 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us
// messages when done processing them (usually outside the client process, ie: publish).
// Set up client writer
- // TODO: how many bytes can fit in a network write syscall? cat /proc/sys/net/core/wmem_max
- var w_buffer: [212992]u8 = undefined;
- var writer = stream.writer(io, &w_buffer);
+ const w_buffer: []u8 = try server_allocator.alloc(u8, w_buf_size);
+ defer server_allocator.free(w_buffer);
+ var writer = stream.writer(io, w_buffer);
const out = &writer.interface;
// Set up client reader
- // TODO: how many bytes can fit in a network read syscall? cat /proc/sys/net/core/rmem_max
- var r_buffer: [212992]u8 = undefined;
- var reader = stream.reader(io, &r_buffer);
+ const r_buffer: []u8 = try server_allocator.alloc(u8, r_buf_size);
+ defer server_allocator.free(r_buffer);
+ var reader = stream.reader(io, r_buffer);
const in = &reader.interface;
// Set up buffer queue
- var qbuf: [r_buffer.len / @sizeOf(Message)]Message = undefined;
- var queue: Queue(Message) = .init(&qbuf);
+ const qbuf: []Message = try server_allocator.alloc(Message, r_buffer.len / @sizeOf(Message));
+ defer server_allocator.free(qbuf);
+ var queue: Queue(Message) = .init(qbuf);
defer {
queue.close(io);
while (queue.getOne(io)) |msg| {
switch (msg) {
- .msg => |m| m.deinit(server_allocator),
- .hmsg => |h| h.deinit(server_allocator),
+ .MSG => |m| m.deinit(server_allocator),
+ .HMSG => |h| h.deinit(server_allocator),
else => {},
}
} else |_| {}
@@ -144,7 +174,7 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us
defer server.removeClient(io, server_allocator, id);
// Do initial handshake with client
- try queue.putOne(io, .{ .info = server.info });
+ try queue.putOne(io, .{ .INFO = server.info });
var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator });
defer client_task.cancel(io) catch {};
@@ -152,27 +182,28 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us
// Messages are owned by the server after they are received from the client
while (client.next(server_allocator)) |msg| {
switch (msg) {
- .ping => {
+ .PING => {
// Respond to ping with pong.
- try client.send(io, .pong);
+ try client.send(io, .PONG);
},
- .@"pub", .hpub => {
+ .PUB, .HPUB => {
+ @branchHint(.likely);
defer switch (msg) {
- .@"pub" => |pb| pb.deinit(server_allocator),
- .hpub => |hp| hp.deinit(server_allocator),
+ .PUB => |pb| pb.deinit(server_allocator),
+ .HPUB => |hp| hp.deinit(server_allocator),
else => unreachable,
};
try server.publishMessage(io, server_allocator, &client, msg);
},
- .sub => |sub| {
+ .SUB => |sub| {
defer sub.deinit(server_allocator);
- try server.subscribe(io, server_allocator, id, sub);
+ try server.subscribe(io, server_allocator, id, &queue, sub);
},
- .unsub => |unsub| {
+ .UNSUB => |unsub| {
defer unsub.deinit(server_allocator);
try server.unsubscribe(io, server_allocator, id, unsub);
},
- .connect => |connect| {
+ .CONNECT => |connect| {
if (client.connect) |*current| {
current.deinit(server_allocator);
}
@@ -227,53 +258,46 @@ test subjectMatches {
}
fn publishMessage(server: *Server, io: Io, alloc: Allocator, source_client: *Client, msg: Message) !void {
- errdefer {
- if (source_client.connect) |c| {
- if (c.verbose) {
- source_client.send(io, .{ .@"-err" = "Slow Consumer" }) catch {};
- }
+ defer if (source_client.connect) |c| {
+ if (c.verbose) {
+ source_client.send(io, .@"+OK") catch {};
}
- }
+ };
+
const subject = switch (msg) {
- .@"pub" => |pb| pb.subject,
- .hpub => |hp| hp.@"pub".subject,
+ .PUB => |pb| pb.subject,
+ .HPUB => |hp| hp.@"pub".subject,
else => unreachable,
};
try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io);
for (server.subscriptions.items) |subscription| {
if (subjectMatches(subscription.subject, subject)) {
- const client = server.clients.get(subscription.client_id) orelse {
- log.debug("Trying to publish to a client that no longer exists: {d}\n", .{subscription.client_id});
- continue;
- };
-
switch (msg) {
- .@"pub" => |pb| client.send(io, .{
- .msg = try pb.toMsg(alloc, subscription.sid),
- }) catch |err| switch (err) {
- error.Canceled => return err,
- else => {},
+ .PUB => |pb| {
+ try subscription.queue.putOne(io, .{
+ .MSG = try pb.toMsg(alloc, subscription.sid),
+ });
},
- .hpub => |hp| client.send(io, .{ .hmsg = try hp.toHMsg(
- alloc,
- subscription.sid,
- ) }) catch |err| switch (err) {
- error.Canceled => return err,
- else => {},
+ .HPUB => |hp| {
+ try subscription.queue.putOne(io, .{
+ .HMSG = try hp.toHMsg(alloc, subscription.sid),
+ });
},
else => unreachable,
}
}
}
- if (source_client.connect) |c| {
- if (c.verbose) {
- source_client.send(io, .@"+ok") catch {};
- }
- }
}
-fn subscribe(server: *Server, io: Io, gpa: Allocator, id: usize, msg: Message.Sub) !void {
+fn subscribe(
+ server: *Server,
+ io: Io,
+ gpa: Allocator,
+ id: usize,
+ queue: *Queue(Message),
+ msg: Message.Sub,
+) !void {
try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io);
const subject = try gpa.dupe(u8, msg.subject);
@@ -284,10 +308,17 @@ fn subscribe(server: *Server, io: Io, gpa: Allocator, id: usize, msg: Message.Su
.subject = subject,
.client_id = id,
.sid = sid,
+ .queue = queue,
});
}
-fn unsubscribe(server: *Server, io: Io, gpa: Allocator, id: usize, msg: Message.Unsub) !void {
+fn unsubscribe(
+ server: *Server,
+ io: Io,
+ gpa: Allocator,
+ id: usize,
+ msg: Message.Unsub,
+) !void {
try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io);
const len = server.subscriptions.items.len;
@@ -301,5 +332,36 @@ fn unsubscribe(server: *Server, io: Io, gpa: Allocator, id: usize, msg: Message.
}
}
+const parseUnsigned = std.fmt.parseUnsigned;
+
+fn getBufferSizes(io: Io) struct { usize, usize } {
+ const default_size = 4 * 1024;
+ const default = .{ default_size, default_size };
+
+ const dir = Dir.openDirAbsolute(io, "/proc/sys/net/core", .{}) catch {
+ log.err("couldn't open /proc/sys/net/core", .{});
+ return default;
+ };
+
+ var buf: [64]u8 = undefined;
+
+ const rmem_max = readBufferSize(io, dir, "rmem_max", &buf, default_size);
+ const wmem_max = readBufferSize(io, dir, "wmem_max", &buf, default_size);
+
+ return .{ rmem_max, wmem_max };
+}
+
+fn readBufferSize(io: Io, dir: anytype, filename: []const u8, buf: []u8, default: usize) usize {
+ const bytes = dir.readFile(io, filename, buf) catch |err| {
+ log.err("couldn't open {s}: {any}", .{ filename, err });
+ return default;
+ };
+
+ return parseUnsigned(usize, bytes[0 .. bytes.len - 1], 10) catch |err| {
+ log.err("couldn't parse {s}: {any}", .{ bytes[0 .. bytes.len - 1], err });
+ return default;
+ };
+}
+
pub const default_id = "server-id-123";
pub const default_name = "Zits Server";