summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-02 15:19:19 +0000
committerRobby Zambito <contact@robbyzambito.me>2026-01-02 15:29:58 +0000
commit2be370e379959e2763e70851cf14ecfca07754fc (patch)
tree54412e021efef581490ebc985ca5b161f1faeee0 /src
parent539255adb1f0a681ff49ebfb9e1e3c7198ad5310 (diff)
Support subject patterns
clean up some tests
Diffstat (limited to 'src')
-rw-r--r--src/server/client.zig115
-rw-r--r--src/server/main.zig37
2 files changed, 77 insertions, 75 deletions
diff --git a/src/server/client.zig b/src/server/client.zig
index 5db6ff1..23a0c9d 100644
--- a/src/server/client.zig
+++ b/src/server/client.zig
@@ -112,28 +112,17 @@ test {
{
// Simulate stream
while (Message.next(gpa, &from_client)) |msg| {
- switch (msg) {
- .eos => {
- try from_client_queue.putOne(io, msg);
- break;
- },
- else => {
- try from_client_queue.putOne(io, msg);
- },
- }
+ try from_client_queue.putOne(io, msg);
} else |err| switch (err) {
- error.EndOfStream => try from_client_queue.close(io),
+ error.EndOfStream => from_client_queue.close(io),
else => return err,
}
while (from_client_queue.getOne(io)) |msg| {
switch (msg) {
- .eos => {
- break;
- },
.connect => |*c| {
std.debug.print("Message: {any}\n", .{msg});
- c.deinit();
+ c.deinit(gpa);
},
else => {
std.debug.print("Message: {any}\n", .{msg});
@@ -146,63 +135,47 @@ test {
// Reset the reader to process it again.
from_client.seek = 0;
- {
- const SemiClient = struct {
- q: std.Io.Queue(Message),
-
- fn parseClientInput(self: *@This(), ioh: std.Io, in: *std.Io.Reader) void {
- defer std.debug.print("done parse\n", .{});
- while (Message.next(gpa, in)) |msg| {
- switch (msg) {
- .eos => {
- self.q.putOne(ioh, msg) catch return;
- break;
- },
- else => {
- self.q.putOne(ioh, msg) catch return;
- },
- }
- } else |_| {}
- }
-
- fn next(self: *@This(), ioh: std.Io) !Message {
- return self.q.getOne(ioh);
- }
-
- fn printAll(self: *@This(), ioh: std.Io) void {
- defer std.debug.print("done print\n", .{});
- while (self.next(ioh)) |*msg| {
- switch (msg.*) {
- .eos => |_| {
- break;
- },
- else => {},
- }
- std.debug.print("Client msg: {any}\n", .{msg});
- switch (msg.*) {
- .connect => |c| {
- // c.allocator.deinit();
- c.deinit();
- // @constCast(c).deinit();
- },
- else => {},
- }
- } else |_| {}
- }
- };
-
- var c: SemiClient = .{ .q = from_client_queue };
- var group: std.Io.Group = .init;
- defer group.wait(io);
-
- group.concurrent(io, SemiClient.printAll, .{ &c, io }) catch {
- @panic("could not start printAll\n");
- };
-
- group.concurrent(io, SemiClient.parseClientInput, .{ &c, io, &from_client }) catch {
- @panic("could not start printAll\n");
- };
- }
+ // {
+ // const SemiClient = struct {
+ // q: std.Io.Queue(Message),
+
+ // fn parseClientInput(self: *@This(), ioh: std.Io, in: *std.Io.Reader) void {
+ // defer std.debug.print("done parse\n", .{});
+ // while (Message.next(gpa, in)) |msg| {
+ // self.q.putOne(ioh, msg) catch return;
+ // } else |_| {}
+ // }
+
+ // fn next(self: *@This(), ioh: std.Io) !Message {
+ // return self.q.getOne(ioh);
+ // }
+
+ // fn printAll(self: *@This(), ioh: std.Io) void {
+ // defer std.debug.print("done print\n", .{});
+ // while (self.next(ioh)) |*msg| {
+ // std.debug.print("Client msg: {any}\n", .{msg});
+ // switch (msg.*) {
+ // .connect => |c| {
+ // c.deinit(gpa);
+ // },
+ // else => {},
+ // }
+ // } else |_| {}
+ // }
+ // };
+
+ // var c: SemiClient = .{ .q = from_client_queue };
+ // var group: std.Io.Group = .init;
+ // defer group.wait(io);
+
+ // group.concurrent(io, SemiClient.printAll, .{ &c, io }) catch {
+ // @panic("could not start printAll\n");
+ // };
+
+ // group.concurrent(io, SemiClient.parseClientInput, .{ &c, io, &from_client }) catch {
+ // @panic("could not start printAll\n");
+ // };
+ // }
////////
diff --git a/src/server/main.zig b/src/server/main.zig
index 59dfa94..81c8fbd 100644
--- a/src/server/main.zig
+++ b/src/server/main.zig
@@ -83,7 +83,7 @@ pub fn start(server: *Server, io: std.Io, gpa: std.mem.Allocator) !void {
server.info.port,
), io, .{ .reuse_address = true });
defer tcp_server.deinit(io);
- std.log.info("Server listening on {s}:{d}", .{server.info.host, server.info.port});
+ std.log.info("Server listening on {s}:{d}", .{ server.info.host, server.info.port });
var client_group: std.Io.Group = .init;
defer client_group.cancel(io);
@@ -147,12 +147,12 @@ fn handleConnection(
//const allocator = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) client_allocator.allocator() else server_allocator;
// Set up client writer
- var w_buffer: [1024*16]u8 = undefined;
+ var w_buffer: [1024 * 16]u8 = undefined;
var writer = stream.writer(io, &w_buffer);
const out = &writer.interface;
// Set up client reader
- var r_buffer: [1024*16]u8 = undefined;
+ var r_buffer: [1024 * 16]u8 = undefined;
var reader = stream.reader(io, &r_buffer);
const in = &reader.interface;
@@ -216,7 +216,36 @@ fn handleConnection(
}
fn subjectMatches(sub_subject: []const u8, pub_subject: []const u8) bool {
- return std.mem.eql(u8, sub_subject, pub_subject);
+ // TODO: assert that sub_subject and pub_subject are valid.
+ var sub_iter = std.mem.splitScalar(u8, sub_subject, '.');
+ var pub_iter = std.mem.splitScalar(u8, pub_subject, '.');
+
+ while (sub_iter.next()) |st| {
+ const pt = pub_iter.next() orelse return false;
+
+ if (std.mem.eql(u8, st, ">")) return true;
+
+ if (!std.mem.eql(u8, st, "*") and !std.mem.eql(u8, st, pt)) {
+ return false;
+ }
+ }
+
+ return pub_iter.next() == null;
+}
+
+test subjectMatches {
+ try std.testing.expect(subjectMatches("foo", "foo"));
+ try std.testing.expect(!subjectMatches("foo", "bar"));
+
+ try std.testing.expect(subjectMatches("foo.*", "foo.bar"));
+ try std.testing.expect(!subjectMatches("foo.*", "foo"));
+ try std.testing.expect(!subjectMatches("foo.>", "foo"));
+
+ // the wildcard subscriptions foo.*.quux and foo.> both match foo.bar.quux, but only the latter matches foo.bar.baz.
+ try std.testing.expect(subjectMatches("foo.*.quux", "foo.bar.quux"));
+ try std.testing.expect(subjectMatches("foo.>", "foo.bar.quux"));
+ try std.testing.expect(!subjectMatches("foo.*.quux", "foo.bar.baz"));
+ try std.testing.expect(subjectMatches("foo.>", "foo.bar.baz"));
}
fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void {