diff options
| -rw-r--r-- | src/main.zig | 3 | ||||
| -rw-r--r-- | src/server/client.zig | 57 | ||||
| -rw-r--r-- | src/server/main.zig | 31 |
3 files changed, 56 insertions, 35 deletions
diff --git a/src/main.zig b/src/main.zig index 42a4d34..a1414c8 100644 --- a/src/main.zig +++ b/src/main.zig @@ -1,4 +1,5 @@ const std = @import("std"); +const builtin = @import("builtin"); const zits = @import("zits"); const yazap = @import("yazap"); @@ -8,7 +9,7 @@ const Server = zits.Server; pub fn main() !void { var dba: std.heap.DebugAllocator(.{}) = .init; defer _ = dba.deinit(); - const gpa = dba.allocator(); + const gpa = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) dba.allocator() else std.heap.smp_allocator; var app = yazap.App.init(gpa, "zits", "High performance NATS compatible client and server."); defer app.deinit(); diff --git a/src/server/client.zig b/src/server/client.zig index 44de542..684a50f 100644 --- a/src/server/client.zig +++ b/src/server/client.zig @@ -5,7 +5,8 @@ const Client = @This(); connect: ?Message.Connect, -write_lock: std.Io.Mutex, +// Messages for this client to receive. +recv_queue: ?*std.Io.Queue(Message) = null, from_client: *std.Io.Reader, to_client: *std.Io.Writer, @@ -17,36 +18,46 @@ pub fn init( ) Client { return .{ .connect = connect, - .write_lock = .init, .from_client = in, .to_client = out, }; } -/// Return true if the value was put in the clients buffer to process, else false. -pub fn send(self: *Client, io: std.Io, msg: Message) !void { - try self.write_lock.lock(io); - defer self.write_lock.unlock(io); - - switch (msg) { - .@"+ok" => { - try writeOk(self.to_client); - }, - .pong => { - try writePong(self.to_client); - }, - .info => |info| { - try writeInfo(self.to_client, info); - }, - .msg => |m| { - try writeMsg(self.to_client, m); - }, - else => { - std.debug.panic("unimplemented write", .{}); - }, +pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io.Queue(Message)) !void { + self.recv_queue = queue; + var msgs: [16]Message = undefined; + while (true) { + const len = try queue.get(io, &msgs, 1); + std.debug.assert(len <= msgs.len); + for (msgs[0..len]) |msg| { + switch (msg) { + .@"+ok" => { + try writeOk(self.to_client); + }, + .pong => { + try writePong(self.to_client); + }, + .info => |info| { + try writeInfo(self.to_client, info); + }, + .msg => |m| { + defer m.deinit(alloc); + try writeMsg(self.to_client, m); + }, + else => |m| { + std.debug.panic("unimplemented write: {any}\n", .{m}); + }, + } + } } } +pub fn send(self: *Client, io: std.Io, msg: Message) !void { + if (self.recv_queue) |queue| { + try queue.putOne(io, msg); + } else @panic("Must start() the client before sending it messages."); +} + pub fn next(self: *Client, allocator: std.mem.Allocator) !Message { // std.debug.print("in client awaiting next message\n", .{}); // errdefer std.debug.print("actually it was canceled\n", .{}); diff --git a/src/server/main.zig b/src/server/main.zig index d9f4595..ecfb513 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -1,4 +1,5 @@ const std = @import("std"); +const builtin = @import("builtin"); const Message = @import("./message_parser.zig").Message; pub const ServerInfo = Message.ServerInfo; @@ -120,16 +121,16 @@ fn handleConnection( var client_allocator: std.heap.DebugAllocator(.{}) = .init; client_allocator.backing_allocator = server_allocator; defer _ = client_allocator.deinit(); - const allocator = client_allocator.allocator(); + const allocator = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) client_allocator.allocator() else server_allocator; // Set up client writer - const w_buffer: []u8 = try allocator.alloc(u8, 1024); + const w_buffer: []u8 = try allocator.alloc(u8, 1024 * 10); defer allocator.free(w_buffer); var writer = stream.writer(io, w_buffer); const out = &writer.interface; // Set up client reader - const r_buffer: []u8 = try allocator.alloc(u8, 1024); + const r_buffer: []u8 = try allocator.alloc(u8, 1024 * 10); defer allocator.free(r_buffer); var reader = stream.reader(io, r_buffer); const in = &reader.interface; @@ -139,6 +140,14 @@ fn handleConnection( try server.addClient(server_allocator, id, &client); defer server.removeClient(io, server_allocator, id); + var qbuf: [1024]Message = undefined; + var queue: std.Io.Queue(Message) = .init(&qbuf); + + var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator, &queue }); + defer client_task.cancel(io) catch {}; + + try io.sleep(std.Io.Duration.fromMilliseconds(5), .real); + // Do initial handshake with client try client.send(io, .{ .info = server.info }); var connect_arena: std.heap.ArenaAllocator = .init(allocator); @@ -153,7 +162,7 @@ fn handleConnection( try client.send(io, .pong); }, .@"pub" => |pb| { - _ = io.async(publishMessage, .{ server, io, server_allocator, &client, pb }); + try server.publishMessage(io, server_allocator, &client, pb); }, .sub => |sub| { try server.subscribe(io, server_allocator, id, sub); @@ -167,7 +176,7 @@ fn handleConnection( } } else |err| switch (err) { error.EndOfStream => { - std.debug.print("Client {d} disconnected", .{}); + std.debug.print("Client {d} disconnected\n", .{id}); }, else => { return err; @@ -186,7 +195,7 @@ fn subjectMatches(expected: []const u8, actual: []const u8) bool { return std.mem.eql(u8, expected, actual); } -fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void { +fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void { errdefer { if (source_client.connect) |c| { if (c.verbose) { @@ -194,7 +203,7 @@ fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, source_cl } } } - defer msg.deinit(gpa); + defer msg.deinit(alloc); for (server.subscriptions.items) |subscription| { if (subjectMatches(subscription.subject, msg.subject)) { const client = server.clients.get(subscription.client_id) orelse { @@ -202,10 +211,10 @@ fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, source_cl continue; }; client.send(io, .{ .msg = .{ - .subject = msg.subject, - .sid = subscription.sid, - .reply_to = msg.reply_to, - .payload = msg.payload, + .subject = try alloc.dupe(u8, msg.subject), + .sid = try alloc.dupe(u8, subscription.sid), + .reply_to = if (msg.reply_to) |r| try alloc.dupe(u8, r) else null, + .payload = try alloc.dupe(u8, msg.payload), } }) catch continue; } } |
