summaryrefslogtreecommitdiff
path: root/src/server
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2025-12-31 02:13:15 +0000
committerRobby Zambito <contact@robbyzambito.me>2026-01-01 05:22:02 +0000
commit0f138e5984cbebe64bc398513597d62f4e879b05 (patch)
tree96f071539363a31160ef154398ebb63a1b6a9662 /src/server
parente60a566a7c61cacc09213a23a708ab2f7d78a3ac (diff)
some cleanup and freeing
Diffstat (limited to 'src/server')
-rw-r--r--src/server/client.zig3
-rw-r--r--src/server/main.zig29
-rw-r--r--src/server/message_parser.zig1
3 files changed, 23 insertions, 10 deletions
diff --git a/src/server/client.zig b/src/server/client.zig
index 70673c2..9acc3b1 100644
--- a/src/server/client.zig
+++ b/src/server/client.zig
@@ -68,9 +68,8 @@ pub const ClientState = struct {
}
/// Return true if the value was put in the clients buffer to process, else false.
- pub fn send(self: *ClientState, io: std.Io, msg: Message) (std.Io.Cancelable || std.Io.QueueClosedError)!bool {
+ pub fn send(self: *ClientState, io: std.Io, msg: Message) (std.Io.Cancelable || std.Io.QueueClosedError)!void {
try self.recv_queue.putOne(io, msg);
- return true;
}
pub fn next(self: *ClientState, allocator: std.mem.Allocator) !Message {
diff --git a/src/server/main.zig b/src/server/main.zig
index 0045c1c..f68e5d3 100644
--- a/src/server/main.zig
+++ b/src/server/main.zig
@@ -111,22 +111,35 @@ fn handleConnection(
switch (msg) {
.ping => {
// Respond to ping with pong.
- for (0..5) |_| {
- if (try client_state.send(io, .pong)) {
- break;
- }
- } else {}
+ try client_state.send(io, .pong);
},
- .@"pub" => |@"pub"| {
- try server.publishMessage(io, @"pub");
+ .@"pub" => |pb| {
+ defer {
+ allocator.free(pb.payload);
+ allocator.free(pb.subject);
+ if (pb.reply_to) |r| {
+ allocator.free(r);
+ }
+ }
+ try server.publishMessage(io, pb);
if (client_state.connect.connect.verbose) {
- _ = try client_state.send(io, .@"+ok");
+ try client_state.send(io, .@"+ok");
}
},
.sub => |sub| {
+ defer {
+ allocator.free(sub.subject);
+ allocator.free(sub.sid);
+ if (sub.queue_group) |q| {
+ allocator.free(q);
+ }
+ }
try server.subscribe(allocator, id, sub);
},
.unsub => |unsub| {
+ defer {
+ allocator.free(unsub.sid);
+ }
try server.unsubscribe(id, unsub);
},
else => |e| {
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig
index 41f309a..d238120 100644
--- a/src/server/message_parser.zig
+++ b/src/server/message_parser.zig
@@ -206,6 +206,7 @@ pub const Message = union(MessageType) {
// Parse byte count
const byte_count = blk: {
var byte_count_list: std.ArrayList(u8) = try .initCapacity(alloc, 64);
+ defer byte_count_list.deinit(alloc);
while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) {
try expectStreamBytes(in, "\r\n");