diff options
Diffstat (limited to 'src/server/Server.zig')
| -rw-r--r-- | src/server/Server.zig | 41 |
1 files changed, 24 insertions, 17 deletions
diff --git a/src/server/Server.zig b/src/server/Server.zig index f7b849c..3fedfcb 100644 --- a/src/server/Server.zig +++ b/src/server/Server.zig @@ -16,13 +16,14 @@ pub const MessageType = message_parser.MessageType; pub const Message = message_parser.Message; const ServerInfo = Message.ServerInfo; pub const Client = @import("./Client.zig"); +const Msgs = Client.Msgs; const Server = @This(); pub const Subscription = struct { subject: []const u8, client_id: usize, sid: []const u8, - queue: *Queue(Message), + queue: *Queue(Msgs), fn deinit(self: Subscription, alloc: Allocator) void { alloc.free(self.subject); @@ -152,29 +153,34 @@ fn handleConnection( const in = &reader.interface; // Set up buffer queue - const qbuf: []Message = try server_allocator.alloc(Message, r_buffer.len / @sizeOf(Message)); + const qbuf: []Message = try server_allocator.alloc(Message, 16); defer server_allocator.free(qbuf); - var queue: Queue(Message) = .init(qbuf); + var recv_queue: Queue(Message) = .init(qbuf); + defer recv_queue.close(io); + + const mbuf: []Msgs = try server_allocator.alloc(Msgs, w_buf_size / (@sizeOf(Msgs) + 128)); + defer server_allocator.free(mbuf); + var msgs_queue: Queue(Msgs) = .init(mbuf); defer { - queue.close(io); - while (queue.getOne(io)) |msg| { + msgs_queue.close(io); + while (msgs_queue.getOne(io)) |msg| { switch (msg) { .MSG => |m| m.deinit(server_allocator), .HMSG => |h| h.deinit(server_allocator), - else => {}, } } else |_| {} } // Create client - var client: Client = .init(null, &queue, in, out); + var client: Client = .init(null, &recv_queue, &msgs_queue, in, out); defer client.deinit(server_allocator); try server.addClient(server_allocator, id, &client); defer server.removeClient(io, server_allocator, id); // Do initial handshake with client - try queue.putOne(io, .{ .INFO = server.info }); + // try recv_queue.putOne(io, .PONG); + try recv_queue.putOne(io, .{ .INFO = server.info }); var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator }); defer client_task.cancel(io) catch {}; @@ -186,18 +192,19 @@ fn handleConnection( // Respond to ping with pong. try client.send(io, .PONG); }, - .PUB, .HPUB => { + .PUB => |pb| { + @branchHint(.likely); + defer pb.deinit(server_allocator); + try server.publishMessage(io, server_allocator, &client, msg); + }, + .HPUB => |hp| { @branchHint(.likely); - defer switch (msg) { - .PUB => |pb| pb.deinit(server_allocator), - .HPUB => |hp| hp.deinit(server_allocator), - else => unreachable, - }; + defer hp.deinit(server_allocator); try server.publishMessage(io, server_allocator, &client, msg); }, .SUB => |sub| { defer sub.deinit(server_allocator); - try server.subscribe(io, server_allocator, id, &queue, sub); + try server.subscribe(io, server_allocator, id, &msgs_queue, sub); }, .UNSUB => |unsub| { defer unsub.deinit(server_allocator); @@ -295,7 +302,7 @@ fn subscribe( io: Io, gpa: Allocator, id: usize, - queue: *Queue(Message), + queue: *Queue(Msgs), msg: Message.Sub, ) !void { try server.subs_lock.lock(io); @@ -339,7 +346,7 @@ fn getBufferSizes(io: Io) struct { usize, usize } { const default = .{ default_size, default_size }; const dir = Dir.openDirAbsolute(io, "/proc/sys/net/core", .{}) catch { - log.err("couldn't open /proc/sys/net/core", .{}); + log.warn("couldn't open /proc/sys/net/core", .{}); return default; }; |
