diff options
Diffstat (limited to 'src/server/Server.zig')
| -rw-r--r-- | src/server/Server.zig | 68 |
1 files changed, 54 insertions, 14 deletions
diff --git a/src/server/Server.zig b/src/server/Server.zig index eaecdf2..4049453 100644 --- a/src/server/Server.zig +++ b/src/server/Server.zig @@ -13,6 +13,7 @@ const Stream = std.Io.net.Stream; const message_parser = @import("./message_parser.zig"); pub const MessageType = message_parser.MessageType; pub const Message = message_parser.Message; +const HotMessageManager = message_parser.HotMessageManager; const ServerInfo = Message.ServerInfo; pub const Client = @import("./Client.zig"); const Server = @This(); @@ -49,11 +50,15 @@ pub fn deinit(server: *Server, io: Io, alloc: Allocator) void { } pub fn start(server: *Server, io: Io, gpa: Allocator) !void { + var hot_msg_manager: HotMessageManager = .{ .io = io }; + defer hot_msg_manager.deinit(gpa); + var tcp_server = try IpAddress.listen(try IpAddress.parse( server.info.host, server.info.port, ), io, .{ .reuse_address = true }); defer tcp_server.deinit(io); + log.debug("Server headers: {s}", .{if (server.info.headers) "true" else "false"}); log.debug("Server max payload: {d}", .{server.info.max_payload}); log.info("Server ID: {s}", .{server.info.server_id}); @@ -69,7 +74,14 @@ pub fn start(server: *Server, io: Io, gpa: Allocator) !void { log.debug("Accepting next client", .{}); const stream = try tcp_server.accept(io); log.debug("Accepted connection {d}", .{id}); - _ = client_group.concurrent(io, handleConnectionInfallible, .{ server, gpa, io, id, stream }) catch { + _ = client_group.concurrent(io, handleConnectionInfallible, .{ + server, + gpa, + io, + id, + stream, + &hot_msg_manager, + }) catch { log.err("Could not start concurrent handler for {d}", .{id}); stream.close(io); }; @@ -96,13 +108,27 @@ fn removeClient(server: *Server, io: Io, allocator: Allocator, id: usize) void { } } -fn handleConnectionInfallible(server: *Server, server_allocator: Allocator, io: Io, id: usize, stream: Stream) void { - handleConnection(server, server_allocator, io, id, stream) catch |err| { +fn handleConnectionInfallible( + server: *Server, + server_allocator: Allocator, + io: Io, + id: usize, + stream: Stream, + hot_msg_manager: *HotMessageManager, +) void { + handleConnection(server, server_allocator, io, id, stream, hot_msg_manager) catch |err| { log.err("Failed processing client {d}: {any}", .{ id, err }); }; } -fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: usize, stream: Stream) !void { +fn handleConnection( + server: *Server, + server_allocator: Allocator, + io: Io, + id: usize, + stream: Stream, + hot_msg_manager: *HotMessageManager, +) !void { defer stream.close(io); // TODO: use a client allocator for things that should only live for as long as the client? @@ -129,8 +155,14 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us queue.close(io); while (queue.getOne(io)) |msg| { switch (msg) { - .msg => |m| m.deinit(server_allocator), - .hmsg => |h| h.deinit(server_allocator), + .msg => { + msg.msg.deinit(server_allocator); + hot_msg_manager.destroyMsg(msg.msg); + }, + .hmsg => { + msg.hmsg.msg.deinit(server_allocator); + hot_msg_manager.destroyMsg(msg.hmsg.msg); + }, else => {}, } } else |_| {} @@ -150,7 +182,7 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us defer client_task.cancel(io) catch {}; // Messages are owned by the server after they are received from the client - while (client.next(server_allocator)) |msg| { + while (client.next(server_allocator, hot_msg_manager)) |msg| { switch (msg) { .ping => { // Respond to ping with pong. @@ -158,18 +190,18 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us }, .@"pub", .hpub => { defer switch (msg) { - .@"pub" => |pb| pb.deinit(server_allocator), - .hpub => |hp| hp.deinit(server_allocator), + .@"pub" => msg.@"pub".deinit(server_allocator), + .hpub => msg.hpub.@"pub".deinit(server_allocator), else => unreachable, }; - try server.publishMessage(io, server_allocator, &client, msg); + try server.publishMessage(io, server_allocator, hot_msg_manager, &client, msg); }, .sub => |sub| { - defer sub.deinit(server_allocator); + // defer sub.deinit(server_allocator); try server.subscribe(io, server_allocator, id, sub); }, .unsub => |unsub| { - defer unsub.deinit(server_allocator); + // defer unsub.deinit(server_allocator); try server.unsubscribe(io, server_allocator, id, unsub); }, .connect => |connect| { @@ -226,7 +258,14 @@ test subjectMatches { try expect(subjectMatches("foo.>", "foo.bar.baz")); } -fn publishMessage(server: *Server, io: Io, alloc: Allocator, source_client: *Client, msg: Message) !void { +fn publishMessage( + server: *Server, + io: Io, + alloc: Allocator, + hot_msg_manager: *HotMessageManager, + source_client: *Client, + msg: Message, +) !void { errdefer { if (source_client.connect) |c| { if (c.verbose) { @@ -250,13 +289,14 @@ fn publishMessage(server: *Server, io: Io, alloc: Allocator, source_client: *Cli switch (msg) { .@"pub" => |pb| client.send(io, .{ - .msg = try pb.toMsg(alloc, subscription.sid), + .msg = try pb.toMsg(alloc, hot_msg_manager, subscription.sid), }) catch |err| switch (err) { error.Canceled => return err, else => {}, }, .hpub => |hp| client.send(io, .{ .hmsg = try hp.toHMsg( alloc, + hot_msg_manager, subscription.sid, ) }) catch |err| switch (err) { error.Canceled => return err, |
