diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-01 05:12:51 +0000 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-01 05:22:03 +0000 |
| commit | 45bd63dbe1e427f51114edf5df8a1f86dd9fd1b1 (patch) | |
| tree | 94e42dc5d1057c394fdcb7a3035c6060bcf9eb89 /src/server/main.zig | |
| parent | 5dea33367ee56abee7377f6e016d941a518f33b6 (diff) | |
Actually fast again???
way faster than before even??
coder@08714a4174bb:~$ nats bench pub foo -s localhost:4223
05:12:23 Starting Core NATS publisher benchmark [clients=1, msg-size=128 B, msgs=100,000, multi-subject=false, multi-subject-max=100,000, sleep=0s, subject=foo]
05:12:23 [1] Starting Core NATS publisher, publishing 100,000 messages
Finished 0s [====================================================================================] 100%
NATS Core NATS publisher stats: 574,666 msgs/sec ~ 70 MiB/sec ~ 1.74us
So cool.
src/server/client.zig JJ: M src/server/main.zig JJ: JJ: Lines starting with "JJ:" (like this one) will be
removed.
Diffstat (limited to 'src/server/main.zig')
| -rw-r--r-- | src/server/main.zig | 31 |
1 files changed, 20 insertions, 11 deletions
diff --git a/src/server/main.zig b/src/server/main.zig index d9f4595..ecfb513 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -1,4 +1,5 @@ const std = @import("std"); +const builtin = @import("builtin"); const Message = @import("./message_parser.zig").Message; pub const ServerInfo = Message.ServerInfo; @@ -120,16 +121,16 @@ fn handleConnection( var client_allocator: std.heap.DebugAllocator(.{}) = .init; client_allocator.backing_allocator = server_allocator; defer _ = client_allocator.deinit(); - const allocator = client_allocator.allocator(); + const allocator = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) client_allocator.allocator() else server_allocator; // Set up client writer - const w_buffer: []u8 = try allocator.alloc(u8, 1024); + const w_buffer: []u8 = try allocator.alloc(u8, 1024 * 10); defer allocator.free(w_buffer); var writer = stream.writer(io, w_buffer); const out = &writer.interface; // Set up client reader - const r_buffer: []u8 = try allocator.alloc(u8, 1024); + const r_buffer: []u8 = try allocator.alloc(u8, 1024 * 10); defer allocator.free(r_buffer); var reader = stream.reader(io, r_buffer); const in = &reader.interface; @@ -139,6 +140,14 @@ fn handleConnection( try server.addClient(server_allocator, id, &client); defer server.removeClient(io, server_allocator, id); + var qbuf: [1024]Message = undefined; + var queue: std.Io.Queue(Message) = .init(&qbuf); + + var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator, &queue }); + defer client_task.cancel(io) catch {}; + + try io.sleep(std.Io.Duration.fromMilliseconds(5), .real); + // Do initial handshake with client try client.send(io, .{ .info = server.info }); var connect_arena: std.heap.ArenaAllocator = .init(allocator); @@ -153,7 +162,7 @@ fn handleConnection( try client.send(io, .pong); }, .@"pub" => |pb| { - _ = io.async(publishMessage, .{ server, io, server_allocator, &client, pb }); + try server.publishMessage(io, server_allocator, &client, pb); }, .sub => |sub| { try server.subscribe(io, server_allocator, id, sub); @@ -167,7 +176,7 @@ fn handleConnection( } } else |err| switch (err) { error.EndOfStream => { - std.debug.print("Client {d} disconnected", .{}); + std.debug.print("Client {d} disconnected\n", .{id}); }, else => { return err; @@ -186,7 +195,7 @@ fn subjectMatches(expected: []const u8, actual: []const u8) bool { return std.mem.eql(u8, expected, actual); } -fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void { +fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void { errdefer { if (source_client.connect) |c| { if (c.verbose) { @@ -194,7 +203,7 @@ fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, source_cl } } } - defer msg.deinit(gpa); + defer msg.deinit(alloc); for (server.subscriptions.items) |subscription| { if (subjectMatches(subscription.subject, msg.subject)) { const client = server.clients.get(subscription.client_id) orelse { @@ -202,10 +211,10 @@ fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, source_cl continue; }; client.send(io, .{ .msg = .{ - .subject = msg.subject, - .sid = subscription.sid, - .reply_to = msg.reply_to, - .payload = msg.payload, + .subject = try alloc.dupe(u8, msg.subject), + .sid = try alloc.dupe(u8, subscription.sid), + .reply_to = if (msg.reply_to) |r| try alloc.dupe(u8, r) else null, + .payload = try alloc.dupe(u8, msg.payload), } }) catch continue; } } |
