summaryrefslogtreecommitdiff
path: root/src/server/main.zig
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-01 05:12:51 +0000
committerRobby Zambito <contact@robbyzambito.me>2026-01-01 05:22:03 +0000
commit45bd63dbe1e427f51114edf5df8a1f86dd9fd1b1 (patch)
tree94e42dc5d1057c394fdcb7a3035c6060bcf9eb89 /src/server/main.zig
parent5dea33367ee56abee7377f6e016d941a518f33b6 (diff)
Actually fast again???
way faster than before even?? coder@08714a4174bb:~$ nats bench pub foo -s localhost:4223 05:12:23 Starting Core NATS publisher benchmark [clients=1, msg-size=128 B, msgs=100,000, multi-subject=false, multi-subject-max=100,000, sleep=0s, subject=foo] 05:12:23 [1] Starting Core NATS publisher, publishing 100,000 messages Finished 0s [====================================================================================] 100% NATS Core NATS publisher stats: 574,666 msgs/sec ~ 70 MiB/sec ~ 1.74us So cool. src/server/client.zig JJ: M src/server/main.zig JJ: JJ: Lines starting with "JJ:" (like this one) will be removed.
Diffstat (limited to 'src/server/main.zig')
-rw-r--r--src/server/main.zig31
1 files changed, 20 insertions, 11 deletions
diff --git a/src/server/main.zig b/src/server/main.zig
index d9f4595..ecfb513 100644
--- a/src/server/main.zig
+++ b/src/server/main.zig
@@ -1,4 +1,5 @@
const std = @import("std");
+const builtin = @import("builtin");
const Message = @import("./message_parser.zig").Message;
pub const ServerInfo = Message.ServerInfo;
@@ -120,16 +121,16 @@ fn handleConnection(
var client_allocator: std.heap.DebugAllocator(.{}) = .init;
client_allocator.backing_allocator = server_allocator;
defer _ = client_allocator.deinit();
- const allocator = client_allocator.allocator();
+ const allocator = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) client_allocator.allocator() else server_allocator;
// Set up client writer
- const w_buffer: []u8 = try allocator.alloc(u8, 1024);
+ const w_buffer: []u8 = try allocator.alloc(u8, 1024 * 10);
defer allocator.free(w_buffer);
var writer = stream.writer(io, w_buffer);
const out = &writer.interface;
// Set up client reader
- const r_buffer: []u8 = try allocator.alloc(u8, 1024);
+ const r_buffer: []u8 = try allocator.alloc(u8, 1024 * 10);
defer allocator.free(r_buffer);
var reader = stream.reader(io, r_buffer);
const in = &reader.interface;
@@ -139,6 +140,14 @@ fn handleConnection(
try server.addClient(server_allocator, id, &client);
defer server.removeClient(io, server_allocator, id);
+ var qbuf: [1024]Message = undefined;
+ var queue: std.Io.Queue(Message) = .init(&qbuf);
+
+ var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator, &queue });
+ defer client_task.cancel(io) catch {};
+
+ try io.sleep(std.Io.Duration.fromMilliseconds(5), .real);
+
// Do initial handshake with client
try client.send(io, .{ .info = server.info });
var connect_arena: std.heap.ArenaAllocator = .init(allocator);
@@ -153,7 +162,7 @@ fn handleConnection(
try client.send(io, .pong);
},
.@"pub" => |pb| {
- _ = io.async(publishMessage, .{ server, io, server_allocator, &client, pb });
+ try server.publishMessage(io, server_allocator, &client, pb);
},
.sub => |sub| {
try server.subscribe(io, server_allocator, id, sub);
@@ -167,7 +176,7 @@ fn handleConnection(
}
} else |err| switch (err) {
error.EndOfStream => {
- std.debug.print("Client {d} disconnected", .{});
+ std.debug.print("Client {d} disconnected\n", .{id});
},
else => {
return err;
@@ -186,7 +195,7 @@ fn subjectMatches(expected: []const u8, actual: []const u8) bool {
return std.mem.eql(u8, expected, actual);
}
-fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void {
+fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void {
errdefer {
if (source_client.connect) |c| {
if (c.verbose) {
@@ -194,7 +203,7 @@ fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, source_cl
}
}
}
- defer msg.deinit(gpa);
+ defer msg.deinit(alloc);
for (server.subscriptions.items) |subscription| {
if (subjectMatches(subscription.subject, msg.subject)) {
const client = server.clients.get(subscription.client_id) orelse {
@@ -202,10 +211,10 @@ fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, source_cl
continue;
};
client.send(io, .{ .msg = .{
- .subject = msg.subject,
- .sid = subscription.sid,
- .reply_to = msg.reply_to,
- .payload = msg.payload,
+ .subject = try alloc.dupe(u8, msg.subject),
+ .sid = try alloc.dupe(u8, subscription.sid),
+ .reply_to = if (msg.reply_to) |r| try alloc.dupe(u8, r) else null,
+ .payload = try alloc.dupe(u8, msg.payload),
} }) catch continue;
}
}