diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/server/Client.zig | 26 | ||||
| -rw-r--r-- | src/server/Server.zig | 68 | ||||
| -rw-r--r-- | src/server/message_parser.zig | 185 |
3 files changed, 200 insertions, 79 deletions
diff --git a/src/server/Client.zig b/src/server/Client.zig index 53a66b9..2827b6a 100644 --- a/src/server/Client.zig +++ b/src/server/Client.zig @@ -1,5 +1,6 @@ const Message = @import("message_parser.zig").Message; const std = @import("std"); +const HotMessageManager = @import("message_parser.zig").HotMessageManager; const Client = @This(); @@ -26,7 +27,7 @@ pub fn init( } pub fn deinit(self: *Client, alloc: std.mem.Allocator) void { - if (self.connect) |c| { + if (self.connect) |*c| { c.deinit(alloc); } self.* = undefined; @@ -41,13 +42,20 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { for (0..len) |i| { const msg = msgs[i]; defer switch (msg) { - .msg => |m| m.deinit(alloc), - .hmsg => |h| h.deinit(alloc), + .msg => { + msg.msg.deinit(alloc); + }, + .hmsg => { + msg.hmsg.msg.deinit(alloc); + }, else => {}, }; errdefer for (msgs[i + 1 .. len]) |mg| switch (mg) { - .msg => |m| { - m.deinit(alloc); + .msg => { + mg.msg.deinit(alloc); + }, + .hmsg => { + mg.hmsg.msg.deinit(alloc); }, else => {}, }; @@ -103,8 +111,12 @@ pub fn send(self: *Client, io: std.Io, msg: Message) !void { try self.recv_queue.putOne(io, msg); } -pub fn next(self: *Client, allocator: std.mem.Allocator) !Message { - return Message.next(allocator, self.from_client); +pub fn next( + self: *Client, + allocator: std.mem.Allocator, + hot_msg_manager: *HotMessageManager, +) !Message { + return Message.next(allocator, self.from_client, hot_msg_manager); } test { diff --git a/src/server/Server.zig b/src/server/Server.zig index eaecdf2..4049453 100644 --- a/src/server/Server.zig +++ b/src/server/Server.zig @@ -13,6 +13,7 @@ const Stream = std.Io.net.Stream; const message_parser = @import("./message_parser.zig"); pub const MessageType = message_parser.MessageType; pub const Message = message_parser.Message; +const HotMessageManager = message_parser.HotMessageManager; const ServerInfo = Message.ServerInfo; pub const Client = @import("./Client.zig"); const Server = @This(); @@ -49,11 +50,15 @@ pub fn deinit(server: *Server, io: Io, alloc: Allocator) void { } pub fn start(server: *Server, io: Io, gpa: Allocator) !void { + var hot_msg_manager: HotMessageManager = .{ .io = io }; + defer hot_msg_manager.deinit(gpa); + var tcp_server = try IpAddress.listen(try IpAddress.parse( server.info.host, server.info.port, ), io, .{ .reuse_address = true }); defer tcp_server.deinit(io); + log.debug("Server headers: {s}", .{if (server.info.headers) "true" else "false"}); log.debug("Server max payload: {d}", .{server.info.max_payload}); log.info("Server ID: {s}", .{server.info.server_id}); @@ -69,7 +74,14 @@ pub fn start(server: *Server, io: Io, gpa: Allocator) !void { log.debug("Accepting next client", .{}); const stream = try tcp_server.accept(io); log.debug("Accepted connection {d}", .{id}); - _ = client_group.concurrent(io, handleConnectionInfallible, .{ server, gpa, io, id, stream }) catch { + _ = client_group.concurrent(io, handleConnectionInfallible, .{ + server, + gpa, + io, + id, + stream, + &hot_msg_manager, + }) catch { log.err("Could not start concurrent handler for {d}", .{id}); stream.close(io); }; @@ -96,13 +108,27 @@ fn removeClient(server: *Server, io: Io, allocator: Allocator, id: usize) void { } } -fn handleConnectionInfallible(server: *Server, server_allocator: Allocator, io: Io, id: usize, stream: Stream) void { - handleConnection(server, server_allocator, io, id, stream) catch |err| { +fn handleConnectionInfallible( + server: *Server, + server_allocator: Allocator, + io: Io, + id: usize, + stream: Stream, + hot_msg_manager: *HotMessageManager, +) void { + handleConnection(server, server_allocator, io, id, stream, hot_msg_manager) catch |err| { log.err("Failed processing client {d}: {any}", .{ id, err }); }; } -fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: usize, stream: Stream) !void { +fn handleConnection( + server: *Server, + server_allocator: Allocator, + io: Io, + id: usize, + stream: Stream, + hot_msg_manager: *HotMessageManager, +) !void { defer stream.close(io); // TODO: use a client allocator for things that should only live for as long as the client? @@ -129,8 +155,14 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us queue.close(io); while (queue.getOne(io)) |msg| { switch (msg) { - .msg => |m| m.deinit(server_allocator), - .hmsg => |h| h.deinit(server_allocator), + .msg => { + msg.msg.deinit(server_allocator); + hot_msg_manager.destroyMsg(msg.msg); + }, + .hmsg => { + msg.hmsg.msg.deinit(server_allocator); + hot_msg_manager.destroyMsg(msg.hmsg.msg); + }, else => {}, } } else |_| {} @@ -150,7 +182,7 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us defer client_task.cancel(io) catch {}; // Messages are owned by the server after they are received from the client - while (client.next(server_allocator)) |msg| { + while (client.next(server_allocator, hot_msg_manager)) |msg| { switch (msg) { .ping => { // Respond to ping with pong. @@ -158,18 +190,18 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us }, .@"pub", .hpub => { defer switch (msg) { - .@"pub" => |pb| pb.deinit(server_allocator), - .hpub => |hp| hp.deinit(server_allocator), + .@"pub" => msg.@"pub".deinit(server_allocator), + .hpub => msg.hpub.@"pub".deinit(server_allocator), else => unreachable, }; - try server.publishMessage(io, server_allocator, &client, msg); + try server.publishMessage(io, server_allocator, hot_msg_manager, &client, msg); }, .sub => |sub| { - defer sub.deinit(server_allocator); + // defer sub.deinit(server_allocator); try server.subscribe(io, server_allocator, id, sub); }, .unsub => |unsub| { - defer unsub.deinit(server_allocator); + // defer unsub.deinit(server_allocator); try server.unsubscribe(io, server_allocator, id, unsub); }, .connect => |connect| { @@ -226,7 +258,14 @@ test subjectMatches { try expect(subjectMatches("foo.>", "foo.bar.baz")); } -fn publishMessage(server: *Server, io: Io, alloc: Allocator, source_client: *Client, msg: Message) !void { +fn publishMessage( + server: *Server, + io: Io, + alloc: Allocator, + hot_msg_manager: *HotMessageManager, + source_client: *Client, + msg: Message, +) !void { errdefer { if (source_client.connect) |c| { if (c.verbose) { @@ -250,13 +289,14 @@ fn publishMessage(server: *Server, io: Io, alloc: Allocator, source_client: *Cli switch (msg) { .@"pub" => |pb| client.send(io, .{ - .msg = try pb.toMsg(alloc, subscription.sid), + .msg = try pb.toMsg(alloc, hot_msg_manager, subscription.sid), }) catch |err| switch (err) { error.Canceled => return err, else => {}, }, .hpub => |hp| client.send(io, .{ .hmsg = try hp.toHMsg( alloc, + hot_msg_manager, subscription.sid, ) }) catch |err| switch (err) { error.Canceled => return err, diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index 1e7527d..109174f 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -6,6 +6,7 @@ const StaticStringMap = std.StaticStringMap; const Io = std.Io; const AllocatingWriter = Io.Writer.Allocating; +const Mutex = Io.Mutex; const Reader = Io.Reader; const ascii = std.ascii; @@ -22,11 +23,11 @@ pub const MessageType = @typeInfo(Message).@"union".tag_type.?; pub const Message = union(enum) { info: ServerInfo, connect: Connect, - @"pub": Pub, + @"pub": *Pub, hpub: HPub, sub: Sub, unsub: Unsub, - msg: Msg, + msg: *Msg, hmsg: HMsg, ping, pong, @@ -77,7 +78,7 @@ pub const Message = union(enum) { headers: ?bool = null, nkey: ?[]const u8 = null, - pub fn deinit(self: Connect, alloc: Allocator) void { + pub fn deinit(self: *Connect, alloc: Allocator) void { if (self.auth_token) |a| alloc.free(a); if (self.user) |u| alloc.free(u); if (self.pass) |p| alloc.free(p); @@ -87,6 +88,7 @@ pub const Message = union(enum) { if (self.sig) |s| alloc.free(s); if (self.jwt) |j| alloc.free(j); if (self.nkey) |n| alloc.free(n); + self.* = undefined; } pub fn dupe(self: Connect, alloc: Allocator) !Connect { @@ -118,46 +120,58 @@ pub const Message = union(enum) { /// The reply subject that subscribers can use to send a response back to the publisher/requestor. reply_to: ?[]const u8 = null, /// The message payload data. - payload: []const u8, + payload_len: usize, + payload: [128]u8, + payload_extra: []const u8 = &.{}, - pub fn deinit(self: Pub, alloc: Allocator) void { + pub fn deinit(self: *Pub, alloc: Allocator) void { alloc.free(self.subject); - alloc.free(self.payload); + alloc.free(self.payload_extra); if (self.reply_to) |r| alloc.free(r); + self.* = undefined; } - pub fn toMsg(self: Pub, alloc: Allocator, sid: []const u8) !Msg { + pub fn toMsg( + self: Pub, + alloc: Allocator, + hot_msg_manager: *HotMessageManager, + sid: []const u8, + ) !*Msg { const res: Msg = .{ .subject = self.subject, .sid = sid, .reply_to = self.reply_to, + .payload_len = self.payload_len, .payload = self.payload, + .payload_extra = self.payload_extra, }; - return res.dupe(alloc); + return res.dupe(alloc, hot_msg_manager); } }; pub const HPub = struct { header_bytes: usize, - @"pub": Pub, + @"pub": *Pub, - pub fn deinit(self: HPub, alloc: Allocator) void { + pub fn deinit(self: *HPub, alloc: Allocator) void { self.@"pub".deinit(alloc); + self.* = undefined; } - pub fn toHMsg(self: HPub, alloc: Allocator, sid: []const u8) !HMsg { + pub fn toHMsg(self: HPub, alloc: Allocator, hot_msg_manager: *HotMessageManager, sid: []const u8) !HMsg { return .{ .header_bytes = self.header_bytes, - .msg = try self.@"pub".toMsg(alloc, sid), + .msg = try self.@"pub".toMsg(alloc, hot_msg_manager, sid), }; } }; pub const HMsg = struct { header_bytes: usize, - msg: Msg, + msg: *Msg, - pub fn deinit(self: HMsg, alloc: Allocator) void { + pub fn deinit(self: *HMsg, alloc: Allocator) void { self.msg.deinit(alloc); + self.* = undefined; } pub fn dupe(self: HMsg, alloc: Allocator) !HMsg { @@ -174,10 +188,11 @@ pub const Message = union(enum) { /// A unique alphanumeric subscription ID, generated by the client. sid: []const u8, - pub fn deinit(self: Sub, alloc: Allocator) void { + pub fn deinit(self: *Sub, alloc: Allocator) void { alloc.free(self.subject); alloc.free(self.sid); if (self.queue_group) |q| alloc.free(q); + self.* = undefined; } }; pub const Unsub = struct { @@ -186,33 +201,39 @@ pub const Message = union(enum) { /// A number of messages to wait for before automatically unsubscribing. max_msgs: ?usize = null, - pub fn deinit(self: Unsub, alloc: Allocator) void { + pub fn deinit(self: *Unsub, alloc: Allocator) void { alloc.free(self.sid); + self.* = undefined; } }; pub const Msg = struct { subject: []const u8, sid: []const u8, reply_to: ?[]const u8, - payload: []const u8, + payload_len: usize, + payload: [128]u8, + payload_extra: []const u8, - pub fn deinit(self: Msg, alloc: Allocator) void { + pub fn deinit(self: *Msg, alloc: Allocator) void { alloc.free(self.subject); alloc.free(self.sid); if (self.reply_to) |r| alloc.free(r); - alloc.free(self.payload); + alloc.free(self.payload_extra); + self.* = undefined; } - pub fn dupe(self: Msg, alloc: Allocator) !Msg { - var res: Msg = undefined; + pub fn dupe(self: Msg, alloc: Allocator, hot_msg_manager: *HotMessageManager) !*Msg { + var res = try hot_msg_manager.createMsg(alloc); res.subject = try alloc.dupe(u8, self.subject); errdefer alloc.free(res.subject); res.sid = try alloc.dupe(u8, self.sid); errdefer alloc.free(res.sid); res.reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null; errdefer if (res.reply_to) |r| alloc.free(r); - res.payload = try alloc.dupe(u8, self.payload); - errdefer alloc.free(res.payload); + res.payload_len = self.payload_len; + @memcpy(&res.payload, &self.payload); + res.payload_extra = try alloc.dupe(u8, self.payload_extra); + errdefer alloc.free(res.payload_extra); return res; } }; @@ -240,7 +261,11 @@ pub const Message = union(enum) { pub const parse = parseStaticStringMap; /// An error should be handled by cleaning up this connection. - pub fn next(alloc: Allocator, in: *Reader) !Message { + pub fn next( + alloc: Allocator, + in: *Reader, + hot_msg_manager: *HotMessageManager, + ) !Message { var operation_string: ArrayList(u8) = blk: { comptime var buf_len = 0; comptime { @@ -299,11 +324,11 @@ pub const Message = union(enum) { }, .@"pub" => { @branchHint(.likely); - return parsePub(alloc, in); + return parsePub(alloc, in, hot_msg_manager); }, .hpub => { @branchHint(.likely); - return parseHPub(alloc, in); + return parseHPub(alloc, in, hot_msg_manager); }, .ping => { try expectStreamBytes(in, "\r\n"); @@ -590,12 +615,15 @@ test parseUnsub { } } -fn parsePub(alloc: Allocator, in: *Reader) !Message { +fn parsePub(alloc: Allocator, in: *Reader, hot_msg_manager: *HotMessageManager) !Message { try in.discardAll(1); // throw away space + const res = try hot_msg_manager.createPub(alloc); + errdefer hot_msg_manager.destroyPub(res); + // Parse subject - const subject: []const u8 = try readSubject(alloc, in, .@"pub"); - errdefer alloc.free(subject); + res.*.subject = try readSubject(alloc, in, .@"pub"); + errdefer alloc.free(res.*.subject); const States = enum { before_second, @@ -609,8 +637,6 @@ fn parsePub(alloc: Allocator, in: *Reader) !Message { defer second.deinit(alloc); var third: ?ArrayList(u8) = null; defer if (third) |*t| t.deinit(alloc); - var payload: AllocatingWriter = .init(alloc); - errdefer payload.deinit(); sw: switch (@as(States, .before_second)) { .before_second => { @@ -658,25 +684,26 @@ fn parsePub(alloc: Allocator, in: *Reader) !Message { }, } - const reply_to: ?[]const u8, const bytes: usize = + res.*.reply_to, res.*.payload_len = if (third) |t| .{ - try alloc.dupe(u8, second.items), + try second.toOwnedSlice(alloc), try parseUnsigned(usize, t.items, 10), } else .{ null, try parseUnsigned(usize, second.items, 10), }; - try in.streamExact(&payload.writer, bytes); + try in.readSliceAll(res.*.payload[0..@min(res.*.payload.len, res.*.payload_len)]); + { + const remaining_payload_bytes = res.*.payload_len - res.*.payload.len; + var payload: AllocatingWriter = .init(alloc); + errdefer payload.deinit(); + try in.streamExact(&payload.writer, remaining_payload_bytes); + res.*.payload_extra = try payload.toOwnedSlice(); + } try expectStreamBytes(in, "\r\n"); - return .{ - .@"pub" = .{ - .subject = subject, - .payload = try payload.toOwnedSlice(), - .reply_to = reply_to, - }, - }; + return .{ .@"pub" = res }; } test parsePub { @@ -736,12 +763,15 @@ test parsePub { } } -fn parseHPub(alloc: Allocator, in: *Reader) !Message { +fn parseHPub(alloc: Allocator, in: *Reader, hot_msg_manager: *HotMessageManager) !Message { try in.discardAll(1); // throw away space + const nested_pub = try hot_msg_manager.createPub(alloc); + errdefer hot_msg_manager.destroyPub(nested_pub); + // Parse subject - const subject: []const u8 = try readSubject(alloc, in, .@"pub"); - errdefer alloc.free(subject); + nested_pub.*.subject = try readSubject(alloc, in, .@"pub"); + errdefer alloc.free(nested_pub.*.subject); const States = enum { before_second, @@ -759,8 +789,6 @@ fn parseHPub(alloc: Allocator, in: *Reader) !Message { defer third.deinit(alloc); var fourth: ?ArrayList(u8) = null; defer if (fourth) |*f| f.deinit(alloc); - var payload: AllocatingWriter = .init(alloc); - errdefer payload.deinit(); sw: switch (@as(States, .before_second)) { .before_second => { @@ -828,7 +856,12 @@ fn parseHPub(alloc: Allocator, in: *Reader) !Message { }, } - const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize = + var res: Message.HPub = .{ + .header_bytes = 0, + .@"pub" = nested_pub, + }; + + res.@"pub".*.reply_to, res.header_bytes, res.@"pub".*.payload_len = if (fourth) |f| .{ try alloc.dupe(u8, second.items), try parseUnsigned(usize, third.items, 10), @@ -839,19 +872,17 @@ fn parseHPub(alloc: Allocator, in: *Reader) !Message { try parseUnsigned(usize, third.items, 10), }; - try in.streamExact(&payload.writer, total_bytes); + try in.readSliceAll(res.@"pub".*.payload[0..@min(res.@"pub".*.payload.len, res.@"pub".*.payload_len)]); + { + const remaining_payload_bytes = res.@"pub".*.payload_len - res.@"pub".*.payload.len; + var payload: AllocatingWriter = .init(alloc); + errdefer payload.deinit(); + try in.streamExact(&payload.writer, remaining_payload_bytes); + res.@"pub".*.payload_extra = try payload.toOwnedSlice(); + } try expectStreamBytes(in, "\r\n"); - return .{ - .hpub = .{ - .header_bytes = header_bytes, - .@"pub" = .{ - .subject = subject, - .payload = try payload.toOwnedSlice(), - .reply_to = reply_to, - }, - }, - }; + return .{ .hpub = res }; } test parseHPub { @@ -976,6 +1007,44 @@ inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void { } } +pub const HotMessageManager = struct { + pub_pool_lock: Mutex = .init, + pub_pool: std.heap.MemoryPool(Message.Pub) = .empty, + msg_pool_lock: Mutex = .init, + msg_pool: std.heap.MemoryPool(Message.Msg) = .empty, + // used for locking + io: Io, + + pub fn deinit(self: *HotMessageManager, alloc: Allocator) void { + self.pub_pool.deinit(alloc); + self.msg_pool.deinit(alloc); + } + + pub fn createPub(self: *HotMessageManager, alloc: Allocator) !*Message.Pub { + try self.pub_pool_lock.lock(self.io); + defer self.pub_pool_lock.unlock(self.io); + return self.pub_pool.create(alloc); + } + + pub fn destroyPub(self: *HotMessageManager, msg: *Message.Pub) void { + self.pub_pool_lock.lockUncancelable(self.io); + defer self.pub_pool_lock.unlock(self.io); + self.pub_pool.destroy(msg); + } + + pub fn createMsg(self: *HotMessageManager, alloc: Allocator) !*Message.Msg { + try self.msg_pool_lock.lock(self.io); + defer self.msg_pool_lock.unlock(self.io); + return self.msg_pool.create(alloc); + } + + pub fn destroyMsg(self: *HotMessageManager, msg: *Message.Msg) void { + self.msg_pool_lock.lockUncancelable(self.io); + defer self.msg_pool_lock.unlock(self.io); + self.msg_pool.destroy(msg); + } +}; + test "parsing a stream" { const alloc = std.testing.allocator; const expectEqualDeep = std.testing.expectEqualDeep; |
