From 4bf064056cf8938c90fe5ac8ac374a6e998fbc27 Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Fri, 2 Jan 2026 03:00:49 +0000 Subject: reorganize but crashing not sure why, seems like i'm using the right allocators everywhere? need to take another pass at this later. --- src/server/client.zig | 13 +++++++- src/server/main.zig | 33 ++++++++++---------- src/server/message_parser.zig | 70 ++++++++++++++++++++++++++----------------- 3 files changed, 70 insertions(+), 46 deletions(-) (limited to 'src') diff --git a/src/server/client.zig b/src/server/client.zig index af48fb5..5db6ff1 100644 --- a/src/server/client.zig +++ b/src/server/client.zig @@ -23,9 +23,20 @@ pub fn init( }; } -pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io.Queue(Message)) !void { +pub fn deinit(self: *Client, alloc: std.mem.Allocator) void { + if (self.connect) |c| { + c.deinit(alloc); + } + self.* = undefined; +} + +pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io.Queue(Message), server_info: Message.ServerInfo) !void { self.recv_queue = queue; var msgs: [8]Message = undefined; + + // Do initial handshake with client + try queue.putOne(io, .{ .info = server_info }); + while (true) { const len = try queue.get(io, &msgs, 1); std.debug.assert(len <= msgs.len); diff --git a/src/server/main.zig b/src/server/main.zig index 044f551..aad842c 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -87,7 +87,6 @@ pub fn start(server: *Server, io: std.Io, gpa: std.mem.Allocator) !void { defer client_group.cancel(io); var id: usize = 0; - // Run until SIGINT is handled, then exit gracefully while (true) : (id +%= 1) { std.debug.print("in server loop\n", .{}); if (server.clients.contains(id)) continue; @@ -140,10 +139,10 @@ fn handleConnection( ) !void { defer stream.close(io); - var client_allocator: std.heap.DebugAllocator(.{}) = .init; - client_allocator.backing_allocator = server_allocator; - defer _ = client_allocator.deinit(); - const allocator = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) client_allocator.allocator() else server_allocator; + //var client_allocator: std.heap.DebugAllocator(.{}) = .init; + //client_allocator.backing_allocator = server_allocator; + //defer _ = client_allocator.deinit(); + //const allocator = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) client_allocator.allocator() else server_allocator; // Set up client writer var w_buffer: [1024]u8 = undefined; @@ -157,6 +156,8 @@ fn handleConnection( // Create client var client: Client = .init(null, in, out); + defer client.deinit(server_allocator); + try server.addClient(server_allocator, id, &client); defer server.removeClient(io, server_allocator, id); @@ -172,17 +173,9 @@ fn handleConnection( } else |_| {} } - var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator, &queue }); + var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator, &queue, server.info }); defer client_task.cancel(io) catch {}; - try io.sleep(std.Io.Duration.fromMilliseconds(5), .real); - - // Do initial handshake with client - try client.send(io, .{ .info = server.info }); - var connect_arena: std.heap.ArenaAllocator = .init(allocator); - defer connect_arena.deinit(); - client.connect = (Message.next(connect_arena.allocator(), in) catch return).connect; - // Messages are owned by the server after they are received from the client while (client.next(server_allocator)) |msg| { switch (msg) { @@ -191,6 +184,7 @@ fn handleConnection( try client.send(io, .pong); }, .@"pub" => |pb| { + defer pb.deinit(server_allocator); try server.publishMessage(io, server_allocator, &client, pb); }, .sub => |sub| { @@ -199,6 +193,12 @@ fn handleConnection( .unsub => |unsub| { try server.unsubscribe(io, server_allocator, id, unsub); }, + .connect => |connect| { + if (client.connect) |*current| { + current.deinit(server_allocator); + } + client.connect = connect; + }, else => |e| { std.debug.panic("Unimplemented message: {any}\n", .{e}); }, @@ -213,8 +213,8 @@ fn handleConnection( } } -fn subjectMatches(expected: []const u8, actual: []const u8) bool { - return std.mem.eql(u8, expected, actual); +fn subjectMatches(sub_subject: []const u8, pub_subject: []const u8) bool { + return std.mem.eql(u8, sub_subject, pub_subject); } fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void { @@ -225,7 +225,6 @@ fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_ } } } - defer msg.deinit(alloc); try server.subs_lock.lock(io); defer server.subs_lock.unlock(io); for (server.subscriptions.items) |subscription| { diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index f9e20bb..af29c1e 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -102,24 +102,22 @@ pub const Message = union(MessageType) { } pub fn dupe(self: Connect, alloc: std.mem.Allocator) !Connect { - return .{ - .verbose = self.verbose, - .pedantic = self.pedantic, - .tls_required = self.tls_required, - .auth_token = if (self.auth_token) |a| try alloc.dupe(u8, a) else null, - .user = if (self.user) |u| try alloc.dupe(u8, u) else null, - .pass = if (self.pass) |p| try alloc.dupe(u8, p) else null, - .name = if (self.name) |n| try alloc.dupe(u8, n) else null, - .lang = self.lang, - .version = self.version, - .protocol = self.protocol, - .echo = self.echo, - .sig = if (self.sig) |s| try alloc.dupe(u8, s) else null, - .jwt = if (self.jwt) |j| try alloc.dupe(u8, j) else null, - .no_responders = self.no_responders, - .headers = self.headers, - .nkey = if (self.nkey) |n| try alloc.dupe(u8, n) else null, - }; + var res = self; + res.auth_token = if (self.auth_token) |a| try alloc.dupe(u8, a) else null; + errdefer if (res.auth_token) |a| alloc.free(a); + res.user = if (self.user) |u| try alloc.dupe(u8, u) else null; + errdefer if (res.user) |u| alloc.free(u); + res.pass = if (self.pass) |p| try alloc.dupe(u8, p) else null; + errdefer if (res.pass) |p| alloc.free(p); + res.name = if (self.name) |n| try alloc.dupe(u8, n) else null; + errdefer if (res.name) |n| alloc.free(n); + res.sig = if (self.sig) |s| try alloc.dupe(u8, s) else null; + errdefer if (res.sig) |s| alloc.free(s); + res.jwt = if (self.jwt) |j| try alloc.dupe(u8, j) else null; + errdefer if (res.jwt) |j| alloc.free(j); + res.nkey = if (self.nkey) |n| try alloc.dupe(u8, n) else null; + errdefer if (res.nkey) |n| alloc.free(n); + return res; } }; pub const Pub = struct { @@ -165,17 +163,21 @@ pub const Message = union(MessageType) { pub fn deinit(self: Msg, alloc: std.mem.Allocator) void { alloc.free(self.subject); alloc.free(self.sid); - alloc.free(self.payload); if (self.reply_to) |r| alloc.free(r); + alloc.free(self.payload); } pub fn dupe(self: Msg, alloc: std.mem.Allocator) !Msg { - return .{ - .subject = try alloc.dupe(u8, self.subject), - .sid = try alloc.dupe(u8, self.sid), - .reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null, - .payload = try alloc.dupe(u8, self.payload), - }; + var res: Msg = undefined; + res.subject = try alloc.dupe(u8, self.subject); + errdefer alloc.free(res.subject); + res.sid = try alloc.dupe(u8, self.sid); + errdefer alloc.free(res.sid); + res.reply_to = if (self.reply_to) |r| alloc.dupe(u8, r) else null; + errdefer if (res.reply_to) |r| alloc.free(r); + res.payload = try alloc.dupe(u8, self.payload); + errdefer alloc.free(res.payload); + return res; } }; @@ -221,11 +223,16 @@ pub const Message = union(MessageType) { switch (operation) { .connect => { + // for storing the json string + var connect_string_writer_allocating: std.Io.Writer.Allocating = try .initCapacity(alloc, 1024); + defer connect_string_writer_allocating.deinit(); + var connect_string_writer = connect_string_writer_allocating.writer; + + // for parsing the json string var connect_arena_allocator: std.heap.ArenaAllocator = .init(alloc); defer connect_arena_allocator.deinit(); const connect_allocator = connect_arena_allocator.allocator(); - const connect_string_writer_allocating: std.Io.Writer.Allocating = try .initCapacity(connect_allocator, 1024); - var connect_string_writer = connect_string_writer_allocating.writer; + try in.discardAll(1); // throw away space // Should read the next JSON object to the fixed buffer writer. @@ -234,7 +241,12 @@ pub const Message = union(MessageType) { try expectStreamBytes(in, "}\r\n"); // discard '}\r\n' // TODO: should be CONNECTION allocator - const res = try std.json.parseFromSliceLeaky(Connect, connect_allocator, connect_string_writer.buffered(), .{ .allocate = .alloc_always }); + const res = try std.json.parseFromSliceLeaky( + Connect, + connect_allocator, + connect_string_writer.buffered(), + .{ .allocate = .alloc_always }, + ); return .{ .connect = try res.dupe(alloc) }; }, @@ -243,6 +255,7 @@ pub const Message = union(MessageType) { // Parse subject const subject: []const u8 = try readSubject(alloc, in); + errdefer alloc.free(subject); // Parse byte count const byte_count = blk: { @@ -267,6 +280,7 @@ pub const Message = union(MessageType) { const payload = blk: { const bytes = try alloc.alloc(u8, byte_count); + errdefer alloc.free(bytes); try in.readSliceAll(bytes); try expectStreamBytes(in, "\r\n"); break :blk bytes; -- cgit