diff options
Diffstat (limited to 'src/server')
| -rw-r--r-- | src/server/client.zig | 115 | ||||
| -rw-r--r-- | src/server/main.zig | 37 |
2 files changed, 77 insertions, 75 deletions
diff --git a/src/server/client.zig b/src/server/client.zig index 5db6ff1..23a0c9d 100644 --- a/src/server/client.zig +++ b/src/server/client.zig @@ -112,28 +112,17 @@ test { { // Simulate stream while (Message.next(gpa, &from_client)) |msg| { - switch (msg) { - .eos => { - try from_client_queue.putOne(io, msg); - break; - }, - else => { - try from_client_queue.putOne(io, msg); - }, - } + try from_client_queue.putOne(io, msg); } else |err| switch (err) { - error.EndOfStream => try from_client_queue.close(io), + error.EndOfStream => from_client_queue.close(io), else => return err, } while (from_client_queue.getOne(io)) |msg| { switch (msg) { - .eos => { - break; - }, .connect => |*c| { std.debug.print("Message: {any}\n", .{msg}); - c.deinit(); + c.deinit(gpa); }, else => { std.debug.print("Message: {any}\n", .{msg}); @@ -146,63 +135,47 @@ test { // Reset the reader to process it again. from_client.seek = 0; - { - const SemiClient = struct { - q: std.Io.Queue(Message), - - fn parseClientInput(self: *@This(), ioh: std.Io, in: *std.Io.Reader) void { - defer std.debug.print("done parse\n", .{}); - while (Message.next(gpa, in)) |msg| { - switch (msg) { - .eos => { - self.q.putOne(ioh, msg) catch return; - break; - }, - else => { - self.q.putOne(ioh, msg) catch return; - }, - } - } else |_| {} - } - - fn next(self: *@This(), ioh: std.Io) !Message { - return self.q.getOne(ioh); - } - - fn printAll(self: *@This(), ioh: std.Io) void { - defer std.debug.print("done print\n", .{}); - while (self.next(ioh)) |*msg| { - switch (msg.*) { - .eos => |_| { - break; - }, - else => {}, - } - std.debug.print("Client msg: {any}\n", .{msg}); - switch (msg.*) { - .connect => |c| { - // c.allocator.deinit(); - c.deinit(); - // @constCast(c).deinit(); - }, - else => {}, - } - } else |_| {} - } - }; - - var c: SemiClient = .{ .q = from_client_queue }; - var group: std.Io.Group = .init; - defer group.wait(io); - - group.concurrent(io, SemiClient.printAll, .{ &c, io }) catch { - @panic("could not start printAll\n"); - }; - - group.concurrent(io, SemiClient.parseClientInput, .{ &c, io, &from_client }) catch { - @panic("could not start printAll\n"); - }; - } + // { + // const SemiClient = struct { + // q: std.Io.Queue(Message), + + // fn parseClientInput(self: *@This(), ioh: std.Io, in: *std.Io.Reader) void { + // defer std.debug.print("done parse\n", .{}); + // while (Message.next(gpa, in)) |msg| { + // self.q.putOne(ioh, msg) catch return; + // } else |_| {} + // } + + // fn next(self: *@This(), ioh: std.Io) !Message { + // return self.q.getOne(ioh); + // } + + // fn printAll(self: *@This(), ioh: std.Io) void { + // defer std.debug.print("done print\n", .{}); + // while (self.next(ioh)) |*msg| { + // std.debug.print("Client msg: {any}\n", .{msg}); + // switch (msg.*) { + // .connect => |c| { + // c.deinit(gpa); + // }, + // else => {}, + // } + // } else |_| {} + // } + // }; + + // var c: SemiClient = .{ .q = from_client_queue }; + // var group: std.Io.Group = .init; + // defer group.wait(io); + + // group.concurrent(io, SemiClient.printAll, .{ &c, io }) catch { + // @panic("could not start printAll\n"); + // }; + + // group.concurrent(io, SemiClient.parseClientInput, .{ &c, io, &from_client }) catch { + // @panic("could not start printAll\n"); + // }; + // } //////// diff --git a/src/server/main.zig b/src/server/main.zig index 59dfa94..81c8fbd 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -83,7 +83,7 @@ pub fn start(server: *Server, io: std.Io, gpa: std.mem.Allocator) !void { server.info.port, ), io, .{ .reuse_address = true }); defer tcp_server.deinit(io); - std.log.info("Server listening on {s}:{d}", .{server.info.host, server.info.port}); + std.log.info("Server listening on {s}:{d}", .{ server.info.host, server.info.port }); var client_group: std.Io.Group = .init; defer client_group.cancel(io); @@ -147,12 +147,12 @@ fn handleConnection( //const allocator = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) client_allocator.allocator() else server_allocator; // Set up client writer - var w_buffer: [1024*16]u8 = undefined; + var w_buffer: [1024 * 16]u8 = undefined; var writer = stream.writer(io, &w_buffer); const out = &writer.interface; // Set up client reader - var r_buffer: [1024*16]u8 = undefined; + var r_buffer: [1024 * 16]u8 = undefined; var reader = stream.reader(io, &r_buffer); const in = &reader.interface; @@ -216,7 +216,36 @@ fn handleConnection( } fn subjectMatches(sub_subject: []const u8, pub_subject: []const u8) bool { - return std.mem.eql(u8, sub_subject, pub_subject); + // TODO: assert that sub_subject and pub_subject are valid. + var sub_iter = std.mem.splitScalar(u8, sub_subject, '.'); + var pub_iter = std.mem.splitScalar(u8, pub_subject, '.'); + + while (sub_iter.next()) |st| { + const pt = pub_iter.next() orelse return false; + + if (std.mem.eql(u8, st, ">")) return true; + + if (!std.mem.eql(u8, st, "*") and !std.mem.eql(u8, st, pt)) { + return false; + } + } + + return pub_iter.next() == null; +} + +test subjectMatches { + try std.testing.expect(subjectMatches("foo", "foo")); + try std.testing.expect(!subjectMatches("foo", "bar")); + + try std.testing.expect(subjectMatches("foo.*", "foo.bar")); + try std.testing.expect(!subjectMatches("foo.*", "foo")); + try std.testing.expect(!subjectMatches("foo.>", "foo")); + + // the wildcard subscriptions foo.*.quux and foo.> both match foo.bar.quux, but only the latter matches foo.bar.baz. + try std.testing.expect(subjectMatches("foo.*.quux", "foo.bar.quux")); + try std.testing.expect(subjectMatches("foo.>", "foo.bar.quux")); + try std.testing.expect(!subjectMatches("foo.*.quux", "foo.bar.baz")); + try std.testing.expect(subjectMatches("foo.>", "foo.bar.baz")); } fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void { |
