diff options
| -rw-r--r-- | src/server/client.zig | 8 | ||||
| -rw-r--r-- | src/server/main.zig | 64 | ||||
| -rw-r--r-- | src/server/message_parser.zig | 2 |
3 files changed, 50 insertions, 24 deletions
diff --git a/src/server/client.zig b/src/server/client.zig index 684a50f..e93c9bc 100644 --- a/src/server/client.zig +++ b/src/server/client.zig @@ -44,6 +44,9 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io defer m.deinit(alloc); try writeMsg(self.to_client, m); }, + .@"-err" => |s| { + try writeErr(self.to_client, s); + }, else => |m| { std.debug.panic("unimplemented write: {any}\n", .{m}); }, @@ -71,6 +74,11 @@ fn writeOk(out: *std.Io.Writer) !void { try out.flush(); } +fn writeErr(out: *std.Io.Writer, msg: []const u8) !void { + _ = try out.print("-ERR '{s}'\r\n", .{msg}); + try out.flush(); +} + fn writePong(out: *std.Io.Writer) !void { _ = try out.write("PONG\r\n"); try out.flush(); diff --git a/src/server/main.zig b/src/server/main.zig index e622304..609df5c 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -162,7 +162,23 @@ fn handleConnection( try client.send(io, .pong); }, .@"pub" => |pb| { - try server.publishMessage(io, server_allocator, &client, pb); + var pub_task = io.async(publishMessage, .{ server, io, server_allocator, &client, pb }); + defer pub_task.cancel(io) catch {}; + + var timeout_task = io.async(std.Io.sleep, .{ io, .fromMilliseconds(200), .real }); + defer timeout_task.cancel(io) catch {}; + + switch (try io.select(.{ + .publish = &pub_task, + .timeout = &timeout_task, + })) { + .publish => { + timeout_task.cancel(io) catch {}; + }, + .timeout => { + pub_task.cancel(io) catch {}; + }, + } }, .sub => |sub| { try server.subscribe(io, server_allocator, id, sub); @@ -196,33 +212,35 @@ fn subjectMatches(expected: []const u8, actual: []const u8) bool { } fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void { - errdefer { - if (source_client.connect) |c| { - if (c.verbose) { - source_client.send(io, .@"-err") catch {}; + // errdefer { + // if (source_client.connect) |c| { + // if (c.verbose) { + // source_client.send(io, .{ .@"-err" = "Slow Consumer" }) catch {}; + // } + // } + // } + { + defer msg.deinit(alloc); + try server.subs_lock.lock(io); + defer server.subs_lock.unlock(io); + for (server.subscriptions.items) |subscription| { + if (subjectMatches(subscription.subject, msg.subject)) { + const client = server.clients.get(subscription.client_id) orelse { + std.debug.print("trying to publish to a client that no longer exists: {d}\n", .{subscription.client_id}); + continue; + }; + client.send(io, .{ .msg = .{ + .subject = try alloc.dupe(u8, msg.subject), + .sid = try alloc.dupe(u8, subscription.sid), + .reply_to = if (msg.reply_to) |r| try alloc.dupe(u8, r) else null, + .payload = try alloc.dupe(u8, msg.payload), + } }) catch continue; } } } - defer msg.deinit(alloc); - try server.subs_lock.lock(io); - defer server.subs_lock.unlock(io); - for (server.subscriptions.items) |subscription| { - if (subjectMatches(subscription.subject, msg.subject)) { - const client = server.clients.get(subscription.client_id) orelse { - std.debug.print("trying to publish to a client that no longer exists: {d}\n", .{subscription.client_id}); - continue; - }; - client.send(io, .{ .msg = .{ - .subject = try alloc.dupe(u8, msg.subject), - .sid = try alloc.dupe(u8, subscription.sid), - .reply_to = if (msg.reply_to) |r| try alloc.dupe(u8, r) else null, - .payload = try alloc.dupe(u8, msg.payload), - } }) catch continue; - } - } if (source_client.connect) |c| { if (c.verbose) { - source_client.send(io, .@"+ok") catch {}; + try source_client.send(io, .@"+ok"); } } } diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index b156dd6..233135d 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -43,7 +43,7 @@ pub const Message = union(MessageType) { ping, pong, @"+ok": void, - @"-err": void, + @"-err": []const u8, pub const ServerInfo = struct { /// The unique identifier of the NATS server. server_id: []const u8, |
