summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/server/client.zig23
-rw-r--r--src/server/main.zig7
2 files changed, 14 insertions, 16 deletions
diff --git a/src/server/client.zig b/src/server/client.zig
index ed1d33e..70673c2 100644
--- a/src/server/client.zig
+++ b/src/server/client.zig
@@ -2,40 +2,36 @@ const Message = @import("message_parser.zig").Message;
const std = @import("std");
pub const ClientState = struct {
- id: usize,
connect: Message.AllocatedConnect,
/// Messages that this client should receive.
- recv_queue: *std.Io.Queue(Message),
+ recv_queue: std.Io.Queue(Message) = undefined,
recv_queue_buffer: [1024]Message = undefined,
from_client: *std.Io.Reader,
to_client: *std.Io.Writer,
- task: std.Io.Future(void),
+ task: ?std.Io.Future(void) = null,
pub fn init(
- io: std.Io,
- allocator: std.mem.Allocator,
- id: usize,
connect: Message.AllocatedConnect,
in: *std.Io.Reader,
out: *std.Io.Writer,
) !ClientState {
var res: ClientState = .{
- .id = id,
.connect = connect,
- .recv_queue = try allocator.create(std.Io.Queue(Message)),
.from_client = in,
.to_client = out,
- .task = undefined,
};
- res.recv_queue.* = .init(&res.recv_queue_buffer);
- res.task = try io.concurrent(processWrite, .{ &res, io });
+ res.recv_queue = .init(&res.recv_queue_buffer);
return res;
}
+ pub fn start(self: *ClientState, io: std.Io) !void {
+ self.task = try io.concurrent(processWrite, .{ self, io });
+ }
+
fn processWrite(
self: *ClientState,
io: std.Io,
@@ -60,11 +56,12 @@ pub const ClientState = struct {
},
}
}
- self.task.cancel(io);
}
pub fn deinit(self: *ClientState, io: std.Io, allocator: std.mem.Allocator) void {
- self.task.cancel(io);
+ if (self.task) |*t| {
+ t.cancel(io);
+ }
self.connect.deinit();
_ = allocator;
// allocator.destroy(self.recv_queue);
diff --git a/src/server/main.zig b/src/server/main.zig
index 7e3f503..0045c1c 100644
--- a/src/server/main.zig
+++ b/src/server/main.zig
@@ -100,7 +100,8 @@ fn handleConnection(
var connect_arena: std.heap.ArenaAllocator = .init(allocator);
defer connect_arena.deinit();
const connect = (Message.next(connect_arena.allocator(), in) catch return).connect;
- var client_state: ClientState = try .init(io, allocator, id, connect, in, out);
+ var client_state: ClientState = try .init(connect, in, out);
+ try client_state.start(io);
defer client_state.deinit(io, allocator);
try server.addClient(allocator, id, &client_state);
@@ -123,10 +124,10 @@ fn handleConnection(
}
},
.sub => |sub| {
- try server.subscribe(allocator, client_state.id, sub);
+ try server.subscribe(allocator, id, sub);
},
.unsub => |unsub| {
- try server.unsubscribe(client_state.id, unsub);
+ try server.unsubscribe(id, unsub);
},
else => |e| {
std.debug.panic("Unimplemented message: {any}\n", .{e});