diff options
Diffstat (limited to 'src/Server.zig')
| -rw-r--r-- | src/Server.zig | 141 |
1 files changed, 84 insertions, 57 deletions
diff --git a/src/Server.zig b/src/Server.zig index 21eecb4..b5f9ee9 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -13,10 +13,11 @@ const Stream = std.Io.net.Stream; pub const Client = @import("./Server/Client.zig"); -pub const parse = @import("./Server/parse.zig"); +pub const message = @import("./Server/message.zig"); +const parse = message.parse; -const MessageType = parse.MessageType; -const Message = parse.Message; +const MessageType = message.Control; +const Message = message.Message; const ServerInfo = Message.ServerInfo; const Msgs = Client.Msgs; @@ -29,9 +30,8 @@ const Subscription = struct { client_id: usize, sid: []const u8, queue_group: ?[]const u8, - queue: *Queue(Msgs), - // used to alloc messages in the queue - alloc: Allocator, + queue_lock: *Mutex, + queue: *Queue(u8), fn deinit(self: Subscription, alloc: Allocator) void { alloc.free(self.subject); @@ -168,66 +168,57 @@ fn handleConnection( const in = &reader.interface; // Set up buffer queue - const qbuf: []Message = try alloc.alloc(Message, 16); + const qbuf: []u8 = try alloc.alloc(u8, r_buf_size); defer alloc.free(qbuf); - var recv_queue: Queue(Message) = .init(qbuf); + var recv_queue: Queue(u8) = .init(qbuf); defer recv_queue.close(io); - const mbuf: []Msgs = try alloc.alloc(Msgs, w_buf_size / @sizeOf(Msgs)); - defer alloc.free(mbuf); - var msgs_queue: Queue(Msgs) = .init(mbuf); - defer { - msgs_queue.close(io); - while (msgs_queue.getOne(io)) |msg| { - switch (msg) { - .MSG => |m| m.deinit(alloc), - .HMSG => |h| h.deinit(alloc), - } - } else |_| {} - } - // Create client - var client: Client = .init(null, alloc, &recv_queue, &msgs_queue, in, out); + var client: Client = .init(null, &recv_queue, in, out); defer client.deinit(server_allocator); try server.addClient(server_allocator, id, &client); defer server.removeClient(io, server_allocator, id); // Do initial handshake with client - // try recv_queue.putOne(io, .PONG); - try recv_queue.putOne(io, .{ .INFO = server.info }); + _ = try out.write("INFO "); + try std.json.Stringify.value(server.info, .{}, out); + _ = try out.write("\r\n"); + try out.flush(); var client_task = try io.concurrent(Client.start, .{ &client, io }); defer client_task.cancel(io) catch {}; - while (client.next(server_allocator)) |ctrl| { + while (client.next()) |ctrl| { switch (ctrl) { .PING => { // Respond to ping with pong. try client.recv_queue_write_lock.lock(io); defer client.recv_queue_write_lock.unlock(io); - try client.send(io, "PONG\r\n"); + _ = try client.from_client.take(2); + try client.recv_queue.putAll(io, "PONG\r\n"); + // try client.send(io, "PONG\r\n"); }, .PUB => { @branchHint(.likely); - try server.publishMessage(io, server_allocator, &client, msg); + // log.debug("received a pub msg", .{}); + try server.publishMessage(io, server_allocator, &client, .@"pub"); }, .HPUB => { @branchHint(.likely); - try server.publishMessage(io, server_allocator, &client, msg); + try server.publishMessage(io, server_allocator, &client, .hpub); }, .SUB => { - try server.subscribe(io, server_allocator, client, id, sub); + try server.subscribe(io, server_allocator, &client, id); }, .UNSUB => { - defer unsub.deinit(server_allocator); - try server.unsubscribe(io, server_allocator, id, unsub); + try server.unsubscribe(io, server_allocator, client, id); }, .CONNECT => { if (client.connect) |*current| { current.deinit(server_allocator); } - client.connect = connect; + client.connect = try parse.connect(server_allocator, client.from_client); }, else => |e| { panic("Unimplemented message: {any}\n", .{e}); @@ -279,19 +270,26 @@ fn publishMessage( io: Io, alloc: Allocator, source_client: *Client, - msg: Message, + comptime pub_or_hpub: enum { @"pub", hpub }, ) !void { defer if (source_client.connect) |c| { if (c.verbose) { - source_client.send(io, .@"+OK") catch {}; + if (source_client.recv_queue_write_lock.lock(io)) |_| { + defer source_client.recv_queue_write_lock.unlock(io); + source_client.recv_queue.putAll(io, "+OK\r\n") catch {}; + } else |_| {} } }; - const subject = switch (msg) { - .PUB => |pb| pb.subject, - .HPUB => |hp| hp.@"pub".subject, - else => unreachable, - }; + _ = pub_or_hpub; + + const msg = try parse.@"pub"(source_client.from_client); + + // const subject = switch (pub_or_hpub) { + // .PUB => |pb| pb.subject, + // .HPUB => |hp| hp.@"pub".subject, + // else => unreachable, + // }; try server.subs_lock.lock(io); defer server.subs_lock.unlock(io); var published_queue_groups: ArrayList([]const u8) = .empty; @@ -301,7 +299,7 @@ fn publishMessage( subs: for (0..server.subscriptions.items.len) |i| { const subscription = server.subscriptions.items[i]; - if (subjectMatches(subscription.subject, subject)) { + if (subjectMatches(subscription.subject, msg.subject)) { if (subscription.queue_group) |sg| { for (published_queue_groups.items) |g| { if (eql(u8, g, sg)) { @@ -314,19 +312,46 @@ fn publishMessage( // to prioritize other subscriptions in the queue next time. try published_queue_sub_idxs.append(alloc, i); } - switch (msg) { - .PUB => |pb| { - try subscription.queue.putOne(io, .{ - .MSG = try pb.toMsg(subscription.alloc, subscription.sid), - }); - }, - .HPUB => |hp| { - try subscription.queue.putOne(io, .{ - .HMSG = try hp.toHMsg(subscription.alloc, subscription.sid), - }); + + const m = msg.toMsg(subscription.sid); + var msg_line_buf: [1024]u8 = undefined; + var msg_line_writer: std.Io.Writer = .fixed(&msg_line_buf); + + // try self.to_client.print( + // , + + // ); + // try m.payload.write(self.to_client); + // try self.to_client.print("\r\n", .{}); + try msg_line_writer.print( + "MSG {s} {s} {s} {d}\r\n", + .{ + m.subject, + m.sid, + m.reply_to orelse "", + m.payload.len, }, - else => unreachable, - } + ); + + try subscription.queue_lock.lock(io); + defer subscription.queue_lock.unlock(io); + try subscription.queue.putAll(io, msg_line_writer.buffered()); + try subscription.queue.putAll(io, m.payload); + try subscription.queue.putAll(io, "\r\n"); + + // switch (msg) { + // .PUB => |pb| { + // try subscription.queue.putOne(io, .{ + // .MSG = try pb.toMsg(subscription.alloc, subscription.sid), + // }); + // }, + // .HPUB => |hp| { + // try subscription.queue.putOne(io, .{ + // .HMSG = try hp.toHMsg(subscription.alloc, subscription.sid), + // }); + // }, + // else => unreachable, + // } } } @@ -340,10 +365,11 @@ fn subscribe( server: *Server, io: Io, gpa: Allocator, - client: Client, + client: *Client, id: usize, - msg: Message.Sub, + // msg: Message.Sub, ) !void { + const msg = try parse.sub(client.from_client); try server.subs_lock.lock(io); defer server.subs_lock.unlock(io); const subject = try gpa.dupe(u8, msg.subject); @@ -357,8 +383,8 @@ fn subscribe( .client_id = id, .sid = sid, .queue_group = queue_group, - .queue = client.msg_queue, - .alloc = client.alloc, + .queue_lock = &client.recv_queue_write_lock, + .queue = client.recv_queue, }); } @@ -366,9 +392,10 @@ fn unsubscribe( server: *Server, io: Io, gpa: Allocator, + client: Client, id: usize, - msg: Message.Unsub, ) !void { + const msg = try parse.unsub(client.from_client); try server.subs_lock.lock(io); defer server.subs_lock.unlock(io); const len = server.subscriptions.items.len; |
