summaryrefslogtreecommitdiff
path: root/src/server/Server.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/Server.zig')
-rw-r--r--src/server/Server.zig68
1 files changed, 54 insertions, 14 deletions
diff --git a/src/server/Server.zig b/src/server/Server.zig
index eaecdf2..4049453 100644
--- a/src/server/Server.zig
+++ b/src/server/Server.zig
@@ -13,6 +13,7 @@ const Stream = std.Io.net.Stream;
const message_parser = @import("./message_parser.zig");
pub const MessageType = message_parser.MessageType;
pub const Message = message_parser.Message;
+const HotMessageManager = message_parser.HotMessageManager;
const ServerInfo = Message.ServerInfo;
pub const Client = @import("./Client.zig");
const Server = @This();
@@ -49,11 +50,15 @@ pub fn deinit(server: *Server, io: Io, alloc: Allocator) void {
}
pub fn start(server: *Server, io: Io, gpa: Allocator) !void {
+ var hot_msg_manager: HotMessageManager = .{ .io = io };
+ defer hot_msg_manager.deinit(gpa);
+
var tcp_server = try IpAddress.listen(try IpAddress.parse(
server.info.host,
server.info.port,
), io, .{ .reuse_address = true });
defer tcp_server.deinit(io);
+
log.debug("Server headers: {s}", .{if (server.info.headers) "true" else "false"});
log.debug("Server max payload: {d}", .{server.info.max_payload});
log.info("Server ID: {s}", .{server.info.server_id});
@@ -69,7 +74,14 @@ pub fn start(server: *Server, io: Io, gpa: Allocator) !void {
log.debug("Accepting next client", .{});
const stream = try tcp_server.accept(io);
log.debug("Accepted connection {d}", .{id});
- _ = client_group.concurrent(io, handleConnectionInfallible, .{ server, gpa, io, id, stream }) catch {
+ _ = client_group.concurrent(io, handleConnectionInfallible, .{
+ server,
+ gpa,
+ io,
+ id,
+ stream,
+ &hot_msg_manager,
+ }) catch {
log.err("Could not start concurrent handler for {d}", .{id});
stream.close(io);
};
@@ -96,13 +108,27 @@ fn removeClient(server: *Server, io: Io, allocator: Allocator, id: usize) void {
}
}
-fn handleConnectionInfallible(server: *Server, server_allocator: Allocator, io: Io, id: usize, stream: Stream) void {
- handleConnection(server, server_allocator, io, id, stream) catch |err| {
+fn handleConnectionInfallible(
+ server: *Server,
+ server_allocator: Allocator,
+ io: Io,
+ id: usize,
+ stream: Stream,
+ hot_msg_manager: *HotMessageManager,
+) void {
+ handleConnection(server, server_allocator, io, id, stream, hot_msg_manager) catch |err| {
log.err("Failed processing client {d}: {any}", .{ id, err });
};
}
-fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: usize, stream: Stream) !void {
+fn handleConnection(
+ server: *Server,
+ server_allocator: Allocator,
+ io: Io,
+ id: usize,
+ stream: Stream,
+ hot_msg_manager: *HotMessageManager,
+) !void {
defer stream.close(io);
// TODO: use a client allocator for things that should only live for as long as the client?
@@ -129,8 +155,14 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us
queue.close(io);
while (queue.getOne(io)) |msg| {
switch (msg) {
- .msg => |m| m.deinit(server_allocator),
- .hmsg => |h| h.deinit(server_allocator),
+ .msg => {
+ msg.msg.deinit(server_allocator);
+ hot_msg_manager.destroyMsg(msg.msg);
+ },
+ .hmsg => {
+ msg.hmsg.msg.deinit(server_allocator);
+ hot_msg_manager.destroyMsg(msg.hmsg.msg);
+ },
else => {},
}
} else |_| {}
@@ -150,7 +182,7 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us
defer client_task.cancel(io) catch {};
// Messages are owned by the server after they are received from the client
- while (client.next(server_allocator)) |msg| {
+ while (client.next(server_allocator, hot_msg_manager)) |msg| {
switch (msg) {
.ping => {
// Respond to ping with pong.
@@ -158,18 +190,18 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us
},
.@"pub", .hpub => {
defer switch (msg) {
- .@"pub" => |pb| pb.deinit(server_allocator),
- .hpub => |hp| hp.deinit(server_allocator),
+ .@"pub" => msg.@"pub".deinit(server_allocator),
+ .hpub => msg.hpub.@"pub".deinit(server_allocator),
else => unreachable,
};
- try server.publishMessage(io, server_allocator, &client, msg);
+ try server.publishMessage(io, server_allocator, hot_msg_manager, &client, msg);
},
.sub => |sub| {
- defer sub.deinit(server_allocator);
+ // defer sub.deinit(server_allocator);
try server.subscribe(io, server_allocator, id, sub);
},
.unsub => |unsub| {
- defer unsub.deinit(server_allocator);
+ // defer unsub.deinit(server_allocator);
try server.unsubscribe(io, server_allocator, id, unsub);
},
.connect => |connect| {
@@ -226,7 +258,14 @@ test subjectMatches {
try expect(subjectMatches("foo.>", "foo.bar.baz"));
}
-fn publishMessage(server: *Server, io: Io, alloc: Allocator, source_client: *Client, msg: Message) !void {
+fn publishMessage(
+ server: *Server,
+ io: Io,
+ alloc: Allocator,
+ hot_msg_manager: *HotMessageManager,
+ source_client: *Client,
+ msg: Message,
+) !void {
errdefer {
if (source_client.connect) |c| {
if (c.verbose) {
@@ -250,13 +289,14 @@ fn publishMessage(server: *Server, io: Io, alloc: Allocator, source_client: *Cli
switch (msg) {
.@"pub" => |pb| client.send(io, .{
- .msg = try pb.toMsg(alloc, subscription.sid),
+ .msg = try pb.toMsg(alloc, hot_msg_manager, subscription.sid),
}) catch |err| switch (err) {
error.Canceled => return err,
else => {},
},
.hpub => |hp| client.send(io, .{ .hmsg = try hp.toHMsg(
alloc,
+ hot_msg_manager,
subscription.sid,
) }) catch |err| switch (err) {
error.Canceled => return err,