summaryrefslogtreecommitdiff
path: root/src/server/client.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/client.zig')
-rw-r--r--src/server/client.zig79
1 files changed, 20 insertions, 59 deletions
diff --git a/src/server/client.zig b/src/server/client.zig
index 69e655b..e997a4a 100644
--- a/src/server/client.zig
+++ b/src/server/client.zig
@@ -2,85 +2,46 @@ const Message = @import("message_parser.zig").Message;
const std = @import("std");
pub const ClientState = struct {
- connect: Message.AllocatedConnect,
+ connect: ?Message.Connect,
- /// Messages that this client should receive.
- recv_queue: std.Io.Queue(Message) = undefined,
- recv_queue_buffer: [1024]Message = undefined,
- // Used to take ownership of values as they are put in the queue.
- recv_alloc: std.mem.Allocator,
+ write_lock: std.Io.Mutex,
from_client: *std.Io.Reader,
to_client: *std.Io.Writer,
- task: ?std.Io.Future(void) = null,
-
pub fn init(
- connect: Message.AllocatedConnect,
- alloc: std.mem.Allocator,
+ connect: ?Message.Connect,
in: *std.Io.Reader,
out: *std.Io.Writer,
- ) !ClientState {
- var res: ClientState = .{
+ ) ClientState {
+ return .{
.connect = connect,
- .recv_alloc = alloc,
+ .write_lock = .init,
.from_client = in,
.to_client = out,
};
- res.recv_queue = .init(&res.recv_queue_buffer);
-
- return res;
- }
-
- pub fn start(self: *ClientState, io: std.Io) !void {
- self.task = try io.concurrent(processWrite, .{ self, io });
- }
-
- fn processWrite(
- self: *ClientState,
- io: std.Io,
- ) void {
- while (true) {
- const message = self.recv_queue.getOne(io) catch break;
- switch (message) {
- .@"+ok" => {
- writeOk(self.to_client) catch break;
- },
- .pong => {
- writePong(self.to_client) catch break;
- },
- .info => |info| {
- writeInfo(self.to_client, info) catch break;
- },
- .msg => |m| {
- defer m.deinit(self.recv_alloc);
- writeMsg(self.to_client, m) catch break;
- },
- else => {
- std.debug.panic("unimplemented write", .{});
- },
- }
- }
- }
-
- pub fn deinit(self: *ClientState, io: std.Io, allocator: std.mem.Allocator) void {
- if (self.task) |*t| {
- t.cancel(io);
- }
- self.connect.deinit();
- _ = allocator;
- // allocator.destroy(self.recv_queue);
}
/// Return true if the value was put in the clients buffer to process, else false.
pub fn send(self: *ClientState, io: std.Io, msg: Message) !void {
- // Client needs to own msg that is put in its queue
+ try self.write_lock.lock(io);
+ defer self.write_lock.unlock(io);
+
switch (msg) {
+ .@"+ok" => {
+ try writeOk(self.to_client);
+ },
+ .pong => {
+ try writePong(self.to_client);
+ },
+ .info => |info| {
+ try writeInfo(self.to_client, info);
+ },
.msg => |m| {
- try self.recv_queue.putOne(io, .{ .msg = try m.dupe(self.recv_alloc) });
+ try writeMsg(self.to_client, m);
},
else => {
- try self.recv_queue.putOne(io, msg);
+ std.debug.panic("unimplemented write", .{});
},
}
}