summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-01 17:29:53 +0000
committerRobby Zambito <contact@robbyzambito.me>2026-01-01 18:50:52 +0000
commit6f91d25db79d72ec7973730337d261701eaa8d84 (patch)
tree40a4839966465f491d3a294a22713dbcdd0ee075
parent86558986efff134680ac14aae605bbd80eaba4d5 (diff)
slow again :(push-xvmlpnyknryz
-rw-r--r--src/server/client.zig8
-rw-r--r--src/server/main.zig64
-rw-r--r--src/server/message_parser.zig2
3 files changed, 50 insertions, 24 deletions
diff --git a/src/server/client.zig b/src/server/client.zig
index 684a50f..e93c9bc 100644
--- a/src/server/client.zig
+++ b/src/server/client.zig
@@ -44,6 +44,9 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io
defer m.deinit(alloc);
try writeMsg(self.to_client, m);
},
+ .@"-err" => |s| {
+ try writeErr(self.to_client, s);
+ },
else => |m| {
std.debug.panic("unimplemented write: {any}\n", .{m});
},
@@ -71,6 +74,11 @@ fn writeOk(out: *std.Io.Writer) !void {
try out.flush();
}
+fn writeErr(out: *std.Io.Writer, msg: []const u8) !void {
+ _ = try out.print("-ERR '{s}'\r\n", .{msg});
+ try out.flush();
+}
+
fn writePong(out: *std.Io.Writer) !void {
_ = try out.write("PONG\r\n");
try out.flush();
diff --git a/src/server/main.zig b/src/server/main.zig
index e622304..609df5c 100644
--- a/src/server/main.zig
+++ b/src/server/main.zig
@@ -162,7 +162,23 @@ fn handleConnection(
try client.send(io, .pong);
},
.@"pub" => |pb| {
- try server.publishMessage(io, server_allocator, &client, pb);
+ var pub_task = io.async(publishMessage, .{ server, io, server_allocator, &client, pb });
+ defer pub_task.cancel(io) catch {};
+
+ var timeout_task = io.async(std.Io.sleep, .{ io, .fromMilliseconds(200), .real });
+ defer timeout_task.cancel(io) catch {};
+
+ switch (try io.select(.{
+ .publish = &pub_task,
+ .timeout = &timeout_task,
+ })) {
+ .publish => {
+ timeout_task.cancel(io) catch {};
+ },
+ .timeout => {
+ pub_task.cancel(io) catch {};
+ },
+ }
},
.sub => |sub| {
try server.subscribe(io, server_allocator, id, sub);
@@ -196,33 +212,35 @@ fn subjectMatches(expected: []const u8, actual: []const u8) bool {
}
fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void {
- errdefer {
- if (source_client.connect) |c| {
- if (c.verbose) {
- source_client.send(io, .@"-err") catch {};
+ // errdefer {
+ // if (source_client.connect) |c| {
+ // if (c.verbose) {
+ // source_client.send(io, .{ .@"-err" = "Slow Consumer" }) catch {};
+ // }
+ // }
+ // }
+ {
+ defer msg.deinit(alloc);
+ try server.subs_lock.lock(io);
+ defer server.subs_lock.unlock(io);
+ for (server.subscriptions.items) |subscription| {
+ if (subjectMatches(subscription.subject, msg.subject)) {
+ const client = server.clients.get(subscription.client_id) orelse {
+ std.debug.print("trying to publish to a client that no longer exists: {d}\n", .{subscription.client_id});
+ continue;
+ };
+ client.send(io, .{ .msg = .{
+ .subject = try alloc.dupe(u8, msg.subject),
+ .sid = try alloc.dupe(u8, subscription.sid),
+ .reply_to = if (msg.reply_to) |r| try alloc.dupe(u8, r) else null,
+ .payload = try alloc.dupe(u8, msg.payload),
+ } }) catch continue;
}
}
}
- defer msg.deinit(alloc);
- try server.subs_lock.lock(io);
- defer server.subs_lock.unlock(io);
- for (server.subscriptions.items) |subscription| {
- if (subjectMatches(subscription.subject, msg.subject)) {
- const client = server.clients.get(subscription.client_id) orelse {
- std.debug.print("trying to publish to a client that no longer exists: {d}\n", .{subscription.client_id});
- continue;
- };
- client.send(io, .{ .msg = .{
- .subject = try alloc.dupe(u8, msg.subject),
- .sid = try alloc.dupe(u8, subscription.sid),
- .reply_to = if (msg.reply_to) |r| try alloc.dupe(u8, r) else null,
- .payload = try alloc.dupe(u8, msg.payload),
- } }) catch continue;
- }
- }
if (source_client.connect) |c| {
if (c.verbose) {
- source_client.send(io, .@"+ok") catch {};
+ try source_client.send(io, .@"+ok");
}
}
}
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig
index b156dd6..233135d 100644
--- a/src/server/message_parser.zig
+++ b/src/server/message_parser.zig
@@ -43,7 +43,7 @@ pub const Message = union(MessageType) {
ping,
pong,
@"+ok": void,
- @"-err": void,
+ @"-err": []const u8,
pub const ServerInfo = struct {
/// The unique identifier of the NATS server.
server_id: []const u8,