summaryrefslogtreecommitdiff
path: root/src/server
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-03 03:16:51 +0000
committerRobby Zambito <contact@robbyzambito.me>2026-01-03 03:17:13 +0000
commita4ec798521bda5564e2dc96c2184100116b54d28 (patch)
treee591048a045c270345dbf828bcc16ab414eba2ed /src/server
parent9e32d014c234f2bdded380de5193b05469739a70 (diff)
Fix parse errors, ownership errors.
Diffstat (limited to 'src/server')
-rw-r--r--src/server/main.zig29
-rw-r--r--src/server/message_parser.zig110
2 files changed, 91 insertions, 48 deletions
diff --git a/src/server/main.zig b/src/server/main.zig
index 0d90e55..4e47c10 100644
--- a/src/server/main.zig
+++ b/src/server/main.zig
@@ -10,6 +10,11 @@ const Subscription = struct {
subject: []const u8,
client_id: usize,
sid: []const u8,
+
+ fn deinit(self: Subscription, alloc: std.mem.Allocator) void {
+ alloc.free(self.subject);
+ alloc.free(self.sid);
+ }
};
info: ServerInfo,
@@ -29,12 +34,10 @@ pub fn deinit(server: *Server, io: std.Io, alloc: std.mem.Allocator) void {
server.subs_lock.lockUncancelable(io);
defer server.subs_lock.unlock(io);
for (server.subscriptions.items) |sub| {
- alloc.free(sub.sid);
- alloc.free(sub.subject);
+ sub.deinit(alloc);
}
- server.subscriptions.shrinkAndFree(alloc, 0);
-
- server.clients.clearAndFree(alloc);
+ server.subscriptions.deinit(alloc);
+ server.clients.deinit(alloc);
}
pub fn main(alloc: std.mem.Allocator, server_config: ServerInfo) !void {
@@ -118,8 +121,7 @@ fn removeClient(server: *Server, io: std.Io, allocator: std.mem.Allocator, id: u
const i = len - from_end - 1;
const sub = server.subscriptions.items[i];
if (sub.client_id == id) {
- allocator.free(sub.sid);
- allocator.free(sub.subject);
+ sub.deinit(allocator);
_ = server.subscriptions.swapRemove(i);
}
}
@@ -205,9 +207,11 @@ fn handleConnection(
try server.publishMessage(io, server_allocator, &client, msg);
},
.sub => |sub| {
+ defer sub.deinit(server_allocator);
try server.subscribe(io, server_allocator, id, sub);
},
.unsub => |unsub| {
+ defer unsub.deinit(server_allocator);
try server.unsubscribe(io, server_allocator, id, unsub);
},
.connect => |connect| {
@@ -313,10 +317,14 @@ fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_
fn subscribe(server: *Server, io: std.Io, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void {
try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io);
+ const subject = try gpa.dupe(u8, msg.subject);
+ errdefer gpa.free(subject);
+ const sid = try gpa.dupe(u8, msg.sid);
+ errdefer gpa.free(sid);
try server.subscriptions.append(gpa, .{
- .subject = msg.subject,
+ .subject = subject,
.client_id = id,
- .sid = msg.sid,
+ .sid = sid,
});
}
@@ -328,8 +336,7 @@ fn unsubscribe(server: *Server, io: std.Io, gpa: std.mem.Allocator, id: usize, m
const i = len - from_end - 1;
const sub = server.subscriptions.items[i];
if (sub.client_id == id and std.mem.eql(u8, sub.sid, msg.sid)) {
- gpa.free(sub.sid);
- gpa.free(sub.subject);
+ sub.deinit(gpa);
_ = server.subscriptions.swapRemove(i);
}
}
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig
index 3adf704..54149cb 100644
--- a/src/server/message_parser.zig
+++ b/src/server/message_parser.zig
@@ -235,8 +235,6 @@ pub const Message = union(enum) {
break :blk .initBuffer(&buf);
};
- std.log.debug("buffered: '{s}'", .{in.buffered()});
-
while (in.peekByte()) |byte| {
if (std.ascii.isUpper(byte)) {
try operation_string.appendBounded(byte);
@@ -362,6 +360,7 @@ fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
}
try third.?.append(alloc, byte);
in.toss(1);
+ continue :sw .in_third;
},
.in_end => {
try expectStreamBytes(in, "\r\n");
@@ -453,49 +452,86 @@ test parseSub {
res,
);
}
+ {
+ var in: std.Io.Reader = .fixed(" foo.echo q 10\r\n");
+ var res = try parseSub(std.testing.allocator, &in);
+ defer res.sub.deinit(std.testing.allocator);
+ try std.testing.expectEqualDeep(
+ Message{
+ .sub = .{
+ .subject = "foo.echo",
+ .queue_group = "q",
+ .sid = "10",
+ },
+ },
+ res,
+ );
+ }
}
fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
- try in.discardAll(1); // throw away space
+ const States = enum {
+ before_first,
+ in_first,
+ after_first,
+ in_second,
+ in_end,
+ };
+
var first: std.ArrayList(u8) = .empty;
errdefer first.deinit(alloc);
+ var second: ?std.ArrayList(u8) = null;
+ defer if (second) |*s| s.deinit(alloc);
- while (in.peekByte()) |byte| {
- if (std.ascii.isWhitespace(byte)) break;
- try first.append(alloc, byte);
- in.toss(1);
- } else |err| return err;
-
- while (in.peekByte()) |byte| {
- if (!std.ascii.isWhitespace(byte) or byte == '\r') break;
- in.toss(1);
- } else |err| return err;
-
- if (try in.peekByte() == '\r') {
- try expectStreamBytes(in, "\r\n");
- return .{
- .unsub = .{
- .sid = try first.toOwnedSlice(alloc),
- },
- };
- } else {
- var second: std.ArrayList(u8) = .empty;
- defer second.deinit(alloc);
-
- while (in.peekByte()) |byte| {
- if (std.ascii.isWhitespace(byte)) break;
- try second.append(alloc, byte);
+ sw: switch (@as(States, .before_first)) {
+ .before_first => {
+ const byte = try in.peekByte();
+ if (std.ascii.isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .before_first;
+ }
+ continue :sw .in_first;
+ },
+ .in_first => {
+ const byte = try in.peekByte();
+ if (!std.ascii.isWhitespace(byte)) {
+ try first.append(alloc, byte);
+ in.toss(1);
+ continue :sw .in_first;
+ }
+ continue :sw .after_first;
+ },
+ .after_first => {
+ const byte = try in.peekByte();
+ if (byte == '\r') {
+ continue :sw .in_end;
+ } else if (std.ascii.isWhitespace(byte)) {
+ in.toss(1);
+ continue :sw .after_first;
+ }
+ second = .empty;
+ continue :sw .in_second;
+ },
+ .in_second => {
+ const byte = try in.peekByte();
+ if (byte == '\r') {
+ continue :sw .in_end;
+ }
+ try second.?.append(alloc, byte);
in.toss(1);
- } else |err| return err;
-
- try expectStreamBytes(in, "\r\n");
- return .{
- .unsub = .{
- .max_msgs = try std.fmt.parseUnsigned(usize, second.items, 10),
- .sid = try first.toOwnedSlice(alloc),
- },
- };
+ continue :sw .in_second;
+ },
+ .in_end => {
+ try expectStreamBytes(in, "\r\n");
+ },
}
+
+ return .{
+ .unsub = .{
+ .sid = try first.toOwnedSlice(alloc),
+ .max_msgs = if (second) |s| try std.fmt.parseUnsigned(usize, s.items, 10) else null,
+ },
+ };
}
test parseUnsub {