diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-03 03:16:51 +0000 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-03 03:17:13 +0000 |
| commit | a4ec798521bda5564e2dc96c2184100116b54d28 (patch) | |
| tree | e591048a045c270345dbf828bcc16ab414eba2ed /src/server | |
| parent | 9e32d014c234f2bdded380de5193b05469739a70 (diff) | |
Fix parse errors, ownership errors.
Diffstat (limited to 'src/server')
| -rw-r--r-- | src/server/main.zig | 29 | ||||
| -rw-r--r-- | src/server/message_parser.zig | 110 |
2 files changed, 91 insertions, 48 deletions
diff --git a/src/server/main.zig b/src/server/main.zig index 0d90e55..4e47c10 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -10,6 +10,11 @@ const Subscription = struct { subject: []const u8, client_id: usize, sid: []const u8, + + fn deinit(self: Subscription, alloc: std.mem.Allocator) void { + alloc.free(self.subject); + alloc.free(self.sid); + } }; info: ServerInfo, @@ -29,12 +34,10 @@ pub fn deinit(server: *Server, io: std.Io, alloc: std.mem.Allocator) void { server.subs_lock.lockUncancelable(io); defer server.subs_lock.unlock(io); for (server.subscriptions.items) |sub| { - alloc.free(sub.sid); - alloc.free(sub.subject); + sub.deinit(alloc); } - server.subscriptions.shrinkAndFree(alloc, 0); - - server.clients.clearAndFree(alloc); + server.subscriptions.deinit(alloc); + server.clients.deinit(alloc); } pub fn main(alloc: std.mem.Allocator, server_config: ServerInfo) !void { @@ -118,8 +121,7 @@ fn removeClient(server: *Server, io: std.Io, allocator: std.mem.Allocator, id: u const i = len - from_end - 1; const sub = server.subscriptions.items[i]; if (sub.client_id == id) { - allocator.free(sub.sid); - allocator.free(sub.subject); + sub.deinit(allocator); _ = server.subscriptions.swapRemove(i); } } @@ -205,9 +207,11 @@ fn handleConnection( try server.publishMessage(io, server_allocator, &client, msg); }, .sub => |sub| { + defer sub.deinit(server_allocator); try server.subscribe(io, server_allocator, id, sub); }, .unsub => |unsub| { + defer unsub.deinit(server_allocator); try server.unsubscribe(io, server_allocator, id, unsub); }, .connect => |connect| { @@ -313,10 +317,14 @@ fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_ fn subscribe(server: *Server, io: std.Io, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void { try server.subs_lock.lock(io); defer server.subs_lock.unlock(io); + const subject = try gpa.dupe(u8, msg.subject); + errdefer gpa.free(subject); + const sid = try gpa.dupe(u8, msg.sid); + errdefer gpa.free(sid); try server.subscriptions.append(gpa, .{ - .subject = msg.subject, + .subject = subject, .client_id = id, - .sid = msg.sid, + .sid = sid, }); } @@ -328,8 +336,7 @@ fn unsubscribe(server: *Server, io: std.Io, gpa: std.mem.Allocator, id: usize, m const i = len - from_end - 1; const sub = server.subscriptions.items[i]; if (sub.client_id == id and std.mem.eql(u8, sub.sid, msg.sid)) { - gpa.free(sub.sid); - gpa.free(sub.subject); + sub.deinit(gpa); _ = server.subscriptions.swapRemove(i); } } diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index 3adf704..54149cb 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -235,8 +235,6 @@ pub const Message = union(enum) { break :blk .initBuffer(&buf); }; - std.log.debug("buffered: '{s}'", .{in.buffered()}); - while (in.peekByte()) |byte| { if (std.ascii.isUpper(byte)) { try operation_string.appendBounded(byte); @@ -362,6 +360,7 @@ fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { } try third.?.append(alloc, byte); in.toss(1); + continue :sw .in_third; }, .in_end => { try expectStreamBytes(in, "\r\n"); @@ -453,49 +452,86 @@ test parseSub { res, ); } + { + var in: std.Io.Reader = .fixed(" foo.echo q 10\r\n"); + var res = try parseSub(std.testing.allocator, &in); + defer res.sub.deinit(std.testing.allocator); + try std.testing.expectEqualDeep( + Message{ + .sub = .{ + .subject = "foo.echo", + .queue_group = "q", + .sid = "10", + }, + }, + res, + ); + } } fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { - try in.discardAll(1); // throw away space + const States = enum { + before_first, + in_first, + after_first, + in_second, + in_end, + }; + var first: std.ArrayList(u8) = .empty; errdefer first.deinit(alloc); + var second: ?std.ArrayList(u8) = null; + defer if (second) |*s| s.deinit(alloc); - while (in.peekByte()) |byte| { - if (std.ascii.isWhitespace(byte)) break; - try first.append(alloc, byte); - in.toss(1); - } else |err| return err; - - while (in.peekByte()) |byte| { - if (!std.ascii.isWhitespace(byte) or byte == '\r') break; - in.toss(1); - } else |err| return err; - - if (try in.peekByte() == '\r') { - try expectStreamBytes(in, "\r\n"); - return .{ - .unsub = .{ - .sid = try first.toOwnedSlice(alloc), - }, - }; - } else { - var second: std.ArrayList(u8) = .empty; - defer second.deinit(alloc); - - while (in.peekByte()) |byte| { - if (std.ascii.isWhitespace(byte)) break; - try second.append(alloc, byte); + sw: switch (@as(States, .before_first)) { + .before_first => { + const byte = try in.peekByte(); + if (std.ascii.isWhitespace(byte)) { + in.toss(1); + continue :sw .before_first; + } + continue :sw .in_first; + }, + .in_first => { + const byte = try in.peekByte(); + if (!std.ascii.isWhitespace(byte)) { + try first.append(alloc, byte); + in.toss(1); + continue :sw .in_first; + } + continue :sw .after_first; + }, + .after_first => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (std.ascii.isWhitespace(byte)) { + in.toss(1); + continue :sw .after_first; + } + second = .empty; + continue :sw .in_second; + }, + .in_second => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } + try second.?.append(alloc, byte); in.toss(1); - } else |err| return err; - - try expectStreamBytes(in, "\r\n"); - return .{ - .unsub = .{ - .max_msgs = try std.fmt.parseUnsigned(usize, second.items, 10), - .sid = try first.toOwnedSlice(alloc), - }, - }; + continue :sw .in_second; + }, + .in_end => { + try expectStreamBytes(in, "\r\n"); + }, } + + return .{ + .unsub = .{ + .sid = try first.toOwnedSlice(alloc), + .max_msgs = if (second) |s| try std.fmt.parseUnsigned(usize, s.items, 10) else null, + }, + }; } test parseUnsub { |
