diff options
| -rw-r--r-- | src/server/client.zig | 14 | ||||
| -rw-r--r-- | src/server/main.zig | 18 | ||||
| -rw-r--r-- | src/server/message_parser.zig | 8 |
3 files changed, 24 insertions, 16 deletions
diff --git a/src/server/client.zig b/src/server/client.zig index 004b0f9..2ce3c38 100644 --- a/src/server/client.zig +++ b/src/server/client.zig @@ -6,18 +6,20 @@ const Client = @This(); connect: ?Message.Connect, // Messages for this client to receive. -recv_queue: ?*std.Io.Queue(Message) = null, +recv_queue: *std.Io.Queue(Message), from_client: *std.Io.Reader, to_client: *std.Io.Writer, pub fn init( connect: ?Message.Connect, + recv_queue: *std.Io.Queue(Message), in: *std.Io.Reader, out: *std.Io.Writer, ) Client { return .{ .connect = connect, + .recv_queue = recv_queue, .from_client = in, .to_client = out, }; @@ -30,17 +32,17 @@ pub fn deinit(self: *Client, alloc: std.mem.Allocator) void { self.* = undefined; } -pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io.Queue(Message)) !void { - self.recv_queue = queue; +pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { var msgs: [8]Message = undefined; while (true) { - const len = try queue.get(io, &msgs, 1); + const len = try self.recv_queue.get(io, &msgs, 1); std.debug.assert(len <= msgs.len); for (0..len) |i| { const msg = msgs[i]; defer switch (msg) { .msg => |m| m.deinit(alloc), + .hmsg => |h| h.deinit(alloc), else => {}, }; errdefer for (msgs[i + 1 .. len]) |mg| switch (mg) { @@ -98,9 +100,7 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io } pub fn send(self: *Client, io: std.Io, msg: Message) !void { - if (self.recv_queue) |queue| { - try queue.putOne(io, msg); - } else @panic("Must start() the client before sending it messages."); + try self.recv_queue.putOne(io, msg); } pub fn next(self: *Client, allocator: std.mem.Allocator) !Message { diff --git a/src/server/main.zig b/src/server/main.zig index 4c1093f..92ac671 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -160,13 +160,7 @@ fn handleConnection( var reader = stream.reader(io, &r_buffer); const in = &reader.interface; - // Create client - var client: Client = .init(null, in, out); - defer client.deinit(server_allocator); - - try server.addClient(server_allocator, id, &client); - defer server.removeClient(io, server_allocator, id); - + // Set up buffer queue var qbuf: [8]Message = undefined; var queue: std.Io.Queue(Message) = .init(&qbuf); defer { @@ -174,15 +168,23 @@ fn handleConnection( while (queue.getOne(io)) |msg| { switch (msg) { .msg => |m| m.deinit(server_allocator), + .hmsg => |h| h.deinit(server_allocator), else => {}, } } else |_| {} } + // Create client + var client: Client = .init(null, &queue, in, out); + defer client.deinit(server_allocator); + + try server.addClient(server_allocator, id, &client); + defer server.removeClient(io, server_allocator, id); + // Do initial handshake with client try queue.putOne(io, .{ .info = server.info }); - var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator, &queue }); + var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator }); defer client_task.cancel(io) catch {}; // Messages are owned by the server after they are received from the client diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index 1b7333a..52d1422 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -250,7 +250,13 @@ pub const Message = union(MessageType) { /// An error should be handled by cleaning up this connection. pub fn next(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { var operation_string: std.ArrayList(u8) = blk: { - var buf: ["CONTINUE".len + 1]u8 = undefined; + comptime var buf_len = 0; + comptime { + for (client_types.keys()) |key| { + buf_len = @max(buf_len, key.len); + } + } + var buf: [buf_len]u8 = undefined; break :blk .initBuffer(&buf); }; |
