diff options
| -rw-r--r-- | src/server/Client.zig | 150 | ||||
| -rw-r--r-- | src/server/Server.zig | 41 | ||||
| -rw-r--r-- | src/server/message_parser.zig | 60 |
3 files changed, 148 insertions, 103 deletions
diff --git a/src/server/Client.zig b/src/server/Client.zig index 1e93360..8ff92e8 100644 --- a/src/server/Client.zig +++ b/src/server/Client.zig @@ -3,10 +3,16 @@ const std = @import("std"); const Client = @This(); +pub const Msgs = union(enum) { + MSG: Message.Msg, + HMSG: Message.HMsg, +}; + connect: ?Message.Connect, // Messages for this client to receive. recv_queue: *std.Io.Queue(Message), +msg_queue: *std.Io.Queue(Msgs), from_client: *std.Io.Reader, to_client: *std.Io.Writer, @@ -14,12 +20,14 @@ to_client: *std.Io.Writer, pub fn init( connect: ?Message.Connect, recv_queue: *std.Io.Queue(Message), + msg_queue: *std.Io.Queue(Msgs), in: *std.Io.Reader, out: *std.Io.Writer, ) Client { return .{ .connect = connect, .recv_queue = recv_queue, + .msg_queue = msg_queue, .from_client = in, .to_client = out, }; @@ -33,72 +41,98 @@ pub fn deinit(self: *Client, alloc: std.mem.Allocator) void { } pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { - var msgs: [256]Message = undefined; + var msgs_buf: [1024]Msgs = undefined; + + var recv_msgs_task = io.concurrent(recvMsgsMsg, .{ self, io, &msgs_buf }) catch @panic("Concurrency unavailable"); + errdefer _ = recv_msgs_task.cancel(io) catch {}; + + var recv_proto_task = io.concurrent(recvProtoMsg, .{ self, io }) catch unreachable; + errdefer _ = recv_proto_task.cancel(io) catch {}; while (true) { - const len = try self.recv_queue.get(io, &msgs, 1); - std.debug.assert(len <= msgs.len); - for (0..len) |i| { - const msg = msgs[i]; - defer switch (msg) { - .MSG => |m| m.deinit(alloc), - .HMSG => |h| h.deinit(alloc), - else => {}, - }; - errdefer for (msgs[i + 1 .. len]) |mg| switch (mg) { - .MSG => |m| { - m.deinit(alloc); - }, - else => {}, - }; - switch (msg) { - .@"+OK" => { - _ = try self.to_client.write("+OK\r\n"); - }, - .PONG => { - _ = try self.to_client.write("PONG\r\n"); - }, - .INFO => |info| { - _ = try self.to_client.write("INFO "); - try std.json.Stringify.value(info, .{}, self.to_client); - _ = try self.to_client.write("\r\n"); - }, - .MSG => |m| { - @branchHint(.likely); - try self.to_client.print( - "MSG {s} {s} {s} {d}\r\n{s}\r\n", - .{ - m.subject, - m.sid, - m.reply_to orelse "", - m.payload.len, - m.payload, + switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) { + .msgs => |msgs_err| { + @branchHint(.likely); + defer recv_msgs_task = io.concurrent(recvMsgsMsg, .{ self, io, &msgs_buf }) catch unreachable; + const msgs = try msgs_err; + for (0..msgs.len) |i| { + const msg = msgs[i]; + defer switch (msg) { + .MSG => |m| m.deinit(alloc), + .HMSG => |h| h.deinit(alloc), + }; + errdefer for (msgs[i + 1 ..]) |mg| switch (mg) { + .MSG => |m| { + m.deinit(alloc); }, - ); - }, - .HMSG => |hmsg| { - @branchHint(.likely); - try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n{s}\r\n", .{ - hmsg.msg.subject, - hmsg.msg.sid, - hmsg.msg.reply_to orelse "", - hmsg.header_bytes, - hmsg.msg.payload.len, - hmsg.msg.payload, - }); - }, - .@"-ERR" => |s| { - _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); - }, - else => |m| { - std.debug.panic("unimplemented write: {any}\n", .{m}); - }, - } + .HMSG => |h| { + h.deinit(alloc); + }, + }; + switch (msg) { + .MSG => |m| { + try self.to_client.print( + "MSG {s} {s} {s} {d}\r\n{s}\r\n", + .{ + m.subject, + m.sid, + m.reply_to orelse "", + m.payload.len, + m.payload, + }, + ); + }, + .HMSG => |hmsg| { + try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n{s}\r\n", .{ + hmsg.msg.subject, + hmsg.msg.sid, + hmsg.msg.reply_to orelse "", + hmsg.header_bytes, + hmsg.msg.payload.len, + hmsg.msg.payload, + }); + }, + } + } + }, + .proto => |msg_err| { + @branchHint(.unlikely); + defer recv_proto_task = io.concurrent(recvProtoMsg, .{ self, io }) catch unreachable; + const msg = try msg_err; + switch (msg) { + .@"+OK" => { + _ = try self.to_client.write("+OK\r\n"); + }, + .PONG => { + _ = try self.to_client.write("PONG\r\n"); + }, + .INFO => |info| { + _ = try self.to_client.write("INFO "); + try std.json.Stringify.value(info, .{}, self.to_client); + _ = try self.to_client.write("\r\n"); + }, + .@"-ERR" => |s| { + _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); + }, + else => |m| { + std.debug.panic("unimplemented write: {any}\n", .{m}); + }, + } + }, } try self.to_client.flush(); } } +fn recvProtoMsg(self: *Client, io: std.Io) !Message { + return self.recv_queue.getOne(io); +} + +fn recvMsgsMsg(self: *Client, io: std.Io, buf: []Msgs) ![]Msgs { + const len = try self.msg_queue.get(io, buf, 1); + return buf[0..len]; +} + pub fn send(self: *Client, io: std.Io, msg: Message) !void { try self.recv_queue.putOne(io, msg); } diff --git a/src/server/Server.zig b/src/server/Server.zig index f7b849c..3fedfcb 100644 --- a/src/server/Server.zig +++ b/src/server/Server.zig @@ -16,13 +16,14 @@ pub const MessageType = message_parser.MessageType; pub const Message = message_parser.Message; const ServerInfo = Message.ServerInfo; pub const Client = @import("./Client.zig"); +const Msgs = Client.Msgs; const Server = @This(); pub const Subscription = struct { subject: []const u8, client_id: usize, sid: []const u8, - queue: *Queue(Message), + queue: *Queue(Msgs), fn deinit(self: Subscription, alloc: Allocator) void { alloc.free(self.subject); @@ -152,29 +153,34 @@ fn handleConnection( const in = &reader.interface; // Set up buffer queue - const qbuf: []Message = try server_allocator.alloc(Message, r_buffer.len / @sizeOf(Message)); + const qbuf: []Message = try server_allocator.alloc(Message, 16); defer server_allocator.free(qbuf); - var queue: Queue(Message) = .init(qbuf); + var recv_queue: Queue(Message) = .init(qbuf); + defer recv_queue.close(io); + + const mbuf: []Msgs = try server_allocator.alloc(Msgs, w_buf_size / (@sizeOf(Msgs) + 128)); + defer server_allocator.free(mbuf); + var msgs_queue: Queue(Msgs) = .init(mbuf); defer { - queue.close(io); - while (queue.getOne(io)) |msg| { + msgs_queue.close(io); + while (msgs_queue.getOne(io)) |msg| { switch (msg) { .MSG => |m| m.deinit(server_allocator), .HMSG => |h| h.deinit(server_allocator), - else => {}, } } else |_| {} } // Create client - var client: Client = .init(null, &queue, in, out); + var client: Client = .init(null, &recv_queue, &msgs_queue, in, out); defer client.deinit(server_allocator); try server.addClient(server_allocator, id, &client); defer server.removeClient(io, server_allocator, id); // Do initial handshake with client - try queue.putOne(io, .{ .INFO = server.info }); + // try recv_queue.putOne(io, .PONG); + try recv_queue.putOne(io, .{ .INFO = server.info }); var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator }); defer client_task.cancel(io) catch {}; @@ -186,18 +192,19 @@ fn handleConnection( // Respond to ping with pong. try client.send(io, .PONG); }, - .PUB, .HPUB => { + .PUB => |pb| { + @branchHint(.likely); + defer pb.deinit(server_allocator); + try server.publishMessage(io, server_allocator, &client, msg); + }, + .HPUB => |hp| { @branchHint(.likely); - defer switch (msg) { - .PUB => |pb| pb.deinit(server_allocator), - .HPUB => |hp| hp.deinit(server_allocator), - else => unreachable, - }; + defer hp.deinit(server_allocator); try server.publishMessage(io, server_allocator, &client, msg); }, .SUB => |sub| { defer sub.deinit(server_allocator); - try server.subscribe(io, server_allocator, id, &queue, sub); + try server.subscribe(io, server_allocator, id, &msgs_queue, sub); }, .UNSUB => |unsub| { defer unsub.deinit(server_allocator); @@ -295,7 +302,7 @@ fn subscribe( io: Io, gpa: Allocator, id: usize, - queue: *Queue(Message), + queue: *Queue(Msgs), msg: Message.Sub, ) !void { try server.subs_lock.lock(io); @@ -339,7 +346,7 @@ fn getBufferSizes(io: Io) struct { usize, usize } { const default = .{ default_size, default_size }; const dir = Dir.openDirAbsolute(io, "/proc/sys/net/core", .{}) catch { - log.err("couldn't open /proc/sys/net/core", .{}); + log.warn("couldn't open /proc/sys/net/core", .{}); return default; }; diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index 8b4859b..ff1a573 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -268,34 +268,7 @@ pub const Message = union(enum) { switch (operation) { .CONNECT => { - // for storing the json string - var connect_string_writer_allocating: AllocatingWriter = .init(alloc); - defer connect_string_writer_allocating.deinit(); - var connect_string_writer = &connect_string_writer_allocating.writer; - - // for parsing the json string - var connect_arena_allocator: ArenaAllocator = .init(alloc); - defer connect_arena_allocator.deinit(); - const connect_allocator = connect_arena_allocator.allocator(); - - try in.discardAll(1); // throw away space - - // Should read the next JSON object to the fixed buffer writer. - _ = try in.streamDelimiter(connect_string_writer, '}'); - try connect_string_writer.writeByte('}'); - try expectStreamBytes(in, "}\r\n"); // discard '}\r\n' - - const connect_str = try connect_string_writer_allocating.toOwnedSlice(); - defer alloc.free(connect_str); - // TODO: should be CONNECTION allocator - const res = try std.json.parseFromSliceLeaky( - Connect, - connect_allocator, - connect_str, - .{ .allocate = .alloc_always }, - ); - - return .{ .CONNECT = try res.dupe(alloc) }; + return parseConnect(alloc, in); }, .PUB => { @branchHint(.likely); @@ -324,6 +297,37 @@ pub const Message = union(enum) { } }; +fn parseConnect(alloc: Allocator, in: *Reader) !Message { + // for storing the json string + var connect_string_writer_allocating: AllocatingWriter = .init(alloc); + defer connect_string_writer_allocating.deinit(); + var connect_string_writer = &connect_string_writer_allocating.writer; + + // for parsing the json string + var connect_arena_allocator: ArenaAllocator = .init(alloc); + defer connect_arena_allocator.deinit(); + const connect_allocator = connect_arena_allocator.allocator(); + + try in.discardAll(1); // throw away space + + // Should read the next JSON object to the fixed buffer writer. + _ = try in.streamDelimiter(connect_string_writer, '}'); + try connect_string_writer.writeByte('}'); + try expectStreamBytes(in, "}\r\n"); // discard '}\r\n' + + const connect_str = try connect_string_writer_allocating.toOwnedSlice(); + defer alloc.free(connect_str); + // TODO: should be CONNECTION allocator + const res = try std.json.parseFromSliceLeaky( + Message.Connect, + connect_allocator, + connect_str, + .{ .allocate = .alloc_always }, + ); + + return .{ .CONNECT = try res.dupe(alloc) }; +} + fn parseSub(alloc: Allocator, in: *Reader) !Message { try in.discardAll(1); // throw away space const subject = try readSubject(alloc, in, .sub); |
