summaryrefslogtreecommitdiff
path: root/src/server
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-04 20:25:30 -0500
committerRobby Zambito <contact@robbyzambito.me>2026-01-04 20:57:31 -0500
commit69528a1b72bc578430e3a3e12d7cd63680986c29 (patch)
treebccc5fabd0e3966080cec90da9041156734719ee /src/server
parente81bcda9208a01daa87ab5f74fa27439c2fd05f3 (diff)
Probe for optimal network buffer size.
We want to match the underlying system socket buffer. Filling this buffer minimizes the number of syscalls we do. Larger would be a waste. Also changed parser to use enums that more closely match the NATS message types.
Diffstat (limited to 'src/server')
-rw-r--r--src/server/Client.zig18
-rw-r--r--src/server/Server.zig172
-rw-r--r--src/server/message_parser.zig66
3 files changed, 159 insertions, 97 deletions
diff --git a/src/server/Client.zig b/src/server/Client.zig
index 53a66b9..1e93360 100644
--- a/src/server/Client.zig
+++ b/src/server/Client.zig
@@ -41,29 +41,29 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
for (0..len) |i| {
const msg = msgs[i];
defer switch (msg) {
- .msg => |m| m.deinit(alloc),
- .hmsg => |h| h.deinit(alloc),
+ .MSG => |m| m.deinit(alloc),
+ .HMSG => |h| h.deinit(alloc),
else => {},
};
errdefer for (msgs[i + 1 .. len]) |mg| switch (mg) {
- .msg => |m| {
+ .MSG => |m| {
m.deinit(alloc);
},
else => {},
};
switch (msg) {
- .@"+ok" => {
+ .@"+OK" => {
_ = try self.to_client.write("+OK\r\n");
},
- .pong => {
+ .PONG => {
_ = try self.to_client.write("PONG\r\n");
},
- .info => |info| {
+ .INFO => |info| {
_ = try self.to_client.write("INFO ");
try std.json.Stringify.value(info, .{}, self.to_client);
_ = try self.to_client.write("\r\n");
},
- .msg => |m| {
+ .MSG => |m| {
@branchHint(.likely);
try self.to_client.print(
"MSG {s} {s} {s} {d}\r\n{s}\r\n",
@@ -76,7 +76,7 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
},
);
},
- .hmsg => |hmsg| {
+ .HMSG => |hmsg| {
@branchHint(.likely);
try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n{s}\r\n", .{
hmsg.msg.subject,
@@ -87,7 +87,7 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
hmsg.msg.payload,
});
},
- .@"-err" => |s| {
+ .@"-ERR" => |s| {
_ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
},
else => |m| {
diff --git a/src/server/Server.zig b/src/server/Server.zig
index eaecdf2..f7b849c 100644
--- a/src/server/Server.zig
+++ b/src/server/Server.zig
@@ -4,6 +4,7 @@ const ArrayList = std.ArrayList;
const AutoHashMapUnmanaged = std.AutoHashMapUnmanaged;
const Io = std.Io;
+const Dir = Io.Dir;
const Group = Io.Group;
const IpAddress = std.Io.net.IpAddress;
const Mutex = Io.Mutex;
@@ -21,6 +22,7 @@ pub const Subscription = struct {
subject: []const u8,
client_id: usize,
sid: []const u8,
+ queue: *Queue(Message),
fn deinit(self: Subscription, alloc: Allocator) void {
alloc.free(self.subject);
@@ -63,13 +65,24 @@ pub fn start(server: *Server, io: Io, gpa: Allocator) !void {
var client_group: Group = .init;
defer client_group.cancel(io);
+ const read_buffer_size, const write_buffer_size = getBufferSizes(io);
+ log.debug("read buf: {d} write buf: {d}", .{ read_buffer_size, write_buffer_size });
+
var id: usize = 0;
while (true) : (id +%= 1) {
if (server.clients.contains(id)) continue;
log.debug("Accepting next client", .{});
const stream = try tcp_server.accept(io);
log.debug("Accepted connection {d}", .{id});
- _ = client_group.concurrent(io, handleConnectionInfallible, .{ server, gpa, io, id, stream }) catch {
+ _ = client_group.concurrent(io, handleConnectionInfallible, .{
+ server,
+ gpa,
+ io,
+ id,
+ stream,
+ read_buffer_size,
+ write_buffer_size,
+ }) catch {
log.err("Could not start concurrent handler for {d}", .{id});
stream.close(io);
};
@@ -96,13 +109,29 @@ fn removeClient(server: *Server, io: Io, allocator: Allocator, id: usize) void {
}
}
-fn handleConnectionInfallible(server: *Server, server_allocator: Allocator, io: Io, id: usize, stream: Stream) void {
- handleConnection(server, server_allocator, io, id, stream) catch |err| {
+fn handleConnectionInfallible(
+ server: *Server,
+ server_allocator: Allocator,
+ io: Io,
+ id: usize,
+ stream: Stream,
+ r_buf_size: usize,
+ w_buf_size: usize,
+) void {
+ handleConnection(server, server_allocator, io, id, stream, r_buf_size, w_buf_size) catch |err| {
log.err("Failed processing client {d}: {any}", .{ id, err });
};
}
-fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: usize, stream: Stream) !void {
+fn handleConnection(
+ server: *Server,
+ server_allocator: Allocator,
+ io: Io,
+ id: usize,
+ stream: Stream,
+ r_buf_size: usize,
+ w_buf_size: usize,
+) !void {
defer stream.close(io);
// TODO: use a client allocator for things that should only live for as long as the client?
@@ -111,26 +140,27 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us
// messages when done processing them (usually outside the client process, ie: publish).
// Set up client writer
- // TODO: how many bytes can fit in a network write syscall? cat /proc/sys/net/core/wmem_max
- var w_buffer: [212992]u8 = undefined;
- var writer = stream.writer(io, &w_buffer);
+ const w_buffer: []u8 = try server_allocator.alloc(u8, w_buf_size);
+ defer server_allocator.free(w_buffer);
+ var writer = stream.writer(io, w_buffer);
const out = &writer.interface;
// Set up client reader
- // TODO: how many bytes can fit in a network read syscall? cat /proc/sys/net/core/rmem_max
- var r_buffer: [212992]u8 = undefined;
- var reader = stream.reader(io, &r_buffer);
+ const r_buffer: []u8 = try server_allocator.alloc(u8, r_buf_size);
+ defer server_allocator.free(r_buffer);
+ var reader = stream.reader(io, r_buffer);
const in = &reader.interface;
// Set up buffer queue
- var qbuf: [r_buffer.len / @sizeOf(Message)]Message = undefined;
- var queue: Queue(Message) = .init(&qbuf);
+ const qbuf: []Message = try server_allocator.alloc(Message, r_buffer.len / @sizeOf(Message));
+ defer server_allocator.free(qbuf);
+ var queue: Queue(Message) = .init(qbuf);
defer {
queue.close(io);
while (queue.getOne(io)) |msg| {
switch (msg) {
- .msg => |m| m.deinit(server_allocator),
- .hmsg => |h| h.deinit(server_allocator),
+ .MSG => |m| m.deinit(server_allocator),
+ .HMSG => |h| h.deinit(server_allocator),
else => {},
}
} else |_| {}
@@ -144,7 +174,7 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us
defer server.removeClient(io, server_allocator, id);
// Do initial handshake with client
- try queue.putOne(io, .{ .info = server.info });
+ try queue.putOne(io, .{ .INFO = server.info });
var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator });
defer client_task.cancel(io) catch {};
@@ -152,27 +182,28 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us
// Messages are owned by the server after they are received from the client
while (client.next(server_allocator)) |msg| {
switch (msg) {
- .ping => {
+ .PING => {
// Respond to ping with pong.
- try client.send(io, .pong);
+ try client.send(io, .PONG);
},
- .@"pub", .hpub => {
+ .PUB, .HPUB => {
+ @branchHint(.likely);
defer switch (msg) {
- .@"pub" => |pb| pb.deinit(server_allocator),
- .hpub => |hp| hp.deinit(server_allocator),
+ .PUB => |pb| pb.deinit(server_allocator),
+ .HPUB => |hp| hp.deinit(server_allocator),
else => unreachable,
};
try server.publishMessage(io, server_allocator, &client, msg);
},
- .sub => |sub| {
+ .SUB => |sub| {
defer sub.deinit(server_allocator);
- try server.subscribe(io, server_allocator, id, sub);
+ try server.subscribe(io, server_allocator, id, &queue, sub);
},
- .unsub => |unsub| {
+ .UNSUB => |unsub| {
defer unsub.deinit(server_allocator);
try server.unsubscribe(io, server_allocator, id, unsub);
},
- .connect => |connect| {
+ .CONNECT => |connect| {
if (client.connect) |*current| {
current.deinit(server_allocator);
}
@@ -227,53 +258,46 @@ test subjectMatches {
}
fn publishMessage(server: *Server, io: Io, alloc: Allocator, source_client: *Client, msg: Message) !void {
- errdefer {
- if (source_client.connect) |c| {
- if (c.verbose) {
- source_client.send(io, .{ .@"-err" = "Slow Consumer" }) catch {};
- }
+ defer if (source_client.connect) |c| {
+ if (c.verbose) {
+ source_client.send(io, .@"+OK") catch {};
}
- }
+ };
+
const subject = switch (msg) {
- .@"pub" => |pb| pb.subject,
- .hpub => |hp| hp.@"pub".subject,
+ .PUB => |pb| pb.subject,
+ .HPUB => |hp| hp.@"pub".subject,
else => unreachable,
};
try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io);
for (server.subscriptions.items) |subscription| {
if (subjectMatches(subscription.subject, subject)) {
- const client = server.clients.get(subscription.client_id) orelse {
- log.debug("Trying to publish to a client that no longer exists: {d}\n", .{subscription.client_id});
- continue;
- };
-
switch (msg) {
- .@"pub" => |pb| client.send(io, .{
- .msg = try pb.toMsg(alloc, subscription.sid),
- }) catch |err| switch (err) {
- error.Canceled => return err,
- else => {},
+ .PUB => |pb| {
+ try subscription.queue.putOne(io, .{
+ .MSG = try pb.toMsg(alloc, subscription.sid),
+ });
},
- .hpub => |hp| client.send(io, .{ .hmsg = try hp.toHMsg(
- alloc,
- subscription.sid,
- ) }) catch |err| switch (err) {
- error.Canceled => return err,
- else => {},
+ .HPUB => |hp| {
+ try subscription.queue.putOne(io, .{
+ .HMSG = try hp.toHMsg(alloc, subscription.sid),
+ });
},
else => unreachable,
}
}
}
- if (source_client.connect) |c| {
- if (c.verbose) {
- source_client.send(io, .@"+ok") catch {};
- }
- }
}
-fn subscribe(server: *Server, io: Io, gpa: Allocator, id: usize, msg: Message.Sub) !void {
+fn subscribe(
+ server: *Server,
+ io: Io,
+ gpa: Allocator,
+ id: usize,
+ queue: *Queue(Message),
+ msg: Message.Sub,
+) !void {
try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io);
const subject = try gpa.dupe(u8, msg.subject);
@@ -284,10 +308,17 @@ fn subscribe(server: *Server, io: Io, gpa: Allocator, id: usize, msg: Message.Su
.subject = subject,
.client_id = id,
.sid = sid,
+ .queue = queue,
});
}
-fn unsubscribe(server: *Server, io: Io, gpa: Allocator, id: usize, msg: Message.Unsub) !void {
+fn unsubscribe(
+ server: *Server,
+ io: Io,
+ gpa: Allocator,
+ id: usize,
+ msg: Message.Unsub,
+) !void {
try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io);
const len = server.subscriptions.items.len;
@@ -301,5 +332,36 @@ fn unsubscribe(server: *Server, io: Io, gpa: Allocator, id: usize, msg: Message.
}
}
+const parseUnsigned = std.fmt.parseUnsigned;
+
+fn getBufferSizes(io: Io) struct { usize, usize } {
+ const default_size = 4 * 1024;
+ const default = .{ default_size, default_size };
+
+ const dir = Dir.openDirAbsolute(io, "/proc/sys/net/core", .{}) catch {
+ log.err("couldn't open /proc/sys/net/core", .{});
+ return default;
+ };
+
+ var buf: [64]u8 = undefined;
+
+ const rmem_max = readBufferSize(io, dir, "rmem_max", &buf, default_size);
+ const wmem_max = readBufferSize(io, dir, "wmem_max", &buf, default_size);
+
+ return .{ rmem_max, wmem_max };
+}
+
+fn readBufferSize(io: Io, dir: anytype, filename: []const u8, buf: []u8, default: usize) usize {
+ const bytes = dir.readFile(io, filename, buf) catch |err| {
+ log.err("couldn't open {s}: {any}", .{ filename, err });
+ return default;
+ };
+
+ return parseUnsigned(usize, bytes[0 .. bytes.len - 1], 10) catch |err| {
+ log.err("couldn't parse {s}: {any}", .{ bytes[0 .. bytes.len - 1], err });
+ return default;
+ };
+}
+
pub const default_id = "server-id-123";
pub const default_name = "Zits Server";
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig
index 1e7527d..8b4859b 100644
--- a/src/server/message_parser.zig
+++ b/src/server/message_parser.zig
@@ -20,18 +20,18 @@ const log = std.log;
pub const MessageType = @typeInfo(Message).@"union".tag_type.?;
pub const Message = union(enum) {
- info: ServerInfo,
- connect: Connect,
- @"pub": Pub,
- hpub: HPub,
- sub: Sub,
- unsub: Unsub,
- msg: Msg,
- hmsg: HMsg,
- ping,
- pong,
- @"+ok": void,
- @"-err": []const u8,
+ INFO: ServerInfo,
+ CONNECT: Connect,
+ PUB: Pub,
+ HPUB: HPub,
+ SUB: Sub,
+ UNSUB: Unsub,
+ MSG: Msg,
+ HMSG: HMsg,
+ PING,
+ PONG,
+ @"+OK": void,
+ @"-ERR": []const u8,
pub const ServerInfo = struct {
/// The unique identifier of the NATS server.
server_id: []const u8,
@@ -220,15 +220,15 @@ pub const Message = union(enum) {
const client_types = StaticStringMap(MessageType).initComptime(
.{
// {"INFO", .info},
- .{ "CONNECT", .connect },
- .{ "PUB", .@"pub" },
- .{ "HPUB", .hpub },
- .{ "SUB", .sub },
- .{ "UNSUB", .unsub },
+ .{ @tagName(.CONNECT), .CONNECT },
+ .{ @tagName(.PUB), .PUB },
+ .{ @tagName(.HPUB), .HPUB },
+ .{ @tagName(.SUB), .SUB },
+ .{ @tagName(.UNSUB), .UNSUB },
// {"MSG", .msg},
// {"HMSG", .hmsg},
- .{ "PING", .ping },
- .{ "PONG", .pong },
+ .{ @tagName(.PING), .PING },
+ .{ @tagName(.PONG), .PONG },
// {"+OK", .@"+ok"},
// {"-ERR", .@"-err"},
},
@@ -267,7 +267,7 @@ pub const Message = union(enum) {
errdefer log.err("Failed to parse {s}", .{operation_string.items});
switch (operation) {
- .connect => {
+ .CONNECT => {
// for storing the json string
var connect_string_writer_allocating: AllocatingWriter = .init(alloc);
defer connect_string_writer_allocating.deinit();
@@ -295,28 +295,28 @@ pub const Message = union(enum) {
.{ .allocate = .alloc_always },
);
- return .{ .connect = try res.dupe(alloc) };
+ return .{ .CONNECT = try res.dupe(alloc) };
},
- .@"pub" => {
+ .PUB => {
@branchHint(.likely);
return parsePub(alloc, in);
},
- .hpub => {
+ .HPUB => {
@branchHint(.likely);
return parseHPub(alloc, in);
},
- .ping => {
+ .PING => {
try expectStreamBytes(in, "\r\n");
- return .ping;
+ return .PING;
},
- .pong => {
+ .PONG => {
try expectStreamBytes(in, "\r\n");
- return .pong;
+ return .PONG;
},
- .sub => {
+ .SUB => {
return parseSub(alloc, in);
},
- .unsub => {
+ .UNSUB => {
return parseUnsub(alloc, in);
},
else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}),
@@ -385,7 +385,7 @@ fn parseSub(alloc: Allocator, in: *Reader) !Message {
}
return .{
- .sub = .{
+ .SUB = .{
.subject = subject,
.queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null,
.sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc),
@@ -546,7 +546,7 @@ fn parseUnsub(alloc: Allocator, in: *Reader) !Message {
}
return .{
- .unsub = .{
+ .UNSUB = .{
.sid = try first.toOwnedSlice(alloc),
.max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null,
},
@@ -671,7 +671,7 @@ fn parsePub(alloc: Allocator, in: *Reader) !Message {
try expectStreamBytes(in, "\r\n");
return .{
- .@"pub" = .{
+ .PUB = .{
.subject = subject,
.payload = try payload.toOwnedSlice(),
.reply_to = reply_to,
@@ -843,7 +843,7 @@ fn parseHPub(alloc: Allocator, in: *Reader) !Message {
try expectStreamBytes(in, "\r\n");
return .{
- .hpub = .{
+ .HPUB = .{
.header_bytes = header_bytes,
.@"pub" = .{
.subject = subject,