summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/server/client.zig96
-rw-r--r--src/server/main.zig23
2 files changed, 59 insertions, 60 deletions
diff --git a/src/server/client.zig b/src/server/client.zig
index e997a4a..44de542 100644
--- a/src/server/client.zig
+++ b/src/server/client.zig
@@ -1,59 +1,59 @@
const Message = @import("message_parser.zig").Message;
const std = @import("std");
-pub const ClientState = struct {
+const Client = @This();
+
+connect: ?Message.Connect,
+
+write_lock: std.Io.Mutex,
+
+from_client: *std.Io.Reader,
+to_client: *std.Io.Writer,
+
+pub fn init(
connect: ?Message.Connect,
+ in: *std.Io.Reader,
+ out: *std.Io.Writer,
+) Client {
+ return .{
+ .connect = connect,
+ .write_lock = .init,
+ .from_client = in,
+ .to_client = out,
+ };
+}
- write_lock: std.Io.Mutex,
-
- from_client: *std.Io.Reader,
- to_client: *std.Io.Writer,
-
- pub fn init(
- connect: ?Message.Connect,
- in: *std.Io.Reader,
- out: *std.Io.Writer,
- ) ClientState {
- return .{
- .connect = connect,
- .write_lock = .init,
- .from_client = in,
- .to_client = out,
- };
- }
+/// Return true if the value was put in the clients buffer to process, else false.
+pub fn send(self: *Client, io: std.Io, msg: Message) !void {
+ try self.write_lock.lock(io);
+ defer self.write_lock.unlock(io);
- /// Return true if the value was put in the clients buffer to process, else false.
- pub fn send(self: *ClientState, io: std.Io, msg: Message) !void {
- try self.write_lock.lock(io);
- defer self.write_lock.unlock(io);
-
- switch (msg) {
- .@"+ok" => {
- try writeOk(self.to_client);
- },
- .pong => {
- try writePong(self.to_client);
- },
- .info => |info| {
- try writeInfo(self.to_client, info);
- },
- .msg => |m| {
- try writeMsg(self.to_client, m);
- },
- else => {
- std.debug.panic("unimplemented write", .{});
- },
- }
+ switch (msg) {
+ .@"+ok" => {
+ try writeOk(self.to_client);
+ },
+ .pong => {
+ try writePong(self.to_client);
+ },
+ .info => |info| {
+ try writeInfo(self.to_client, info);
+ },
+ .msg => |m| {
+ try writeMsg(self.to_client, m);
+ },
+ else => {
+ std.debug.panic("unimplemented write", .{});
+ },
}
+}
- pub fn next(self: *ClientState, allocator: std.mem.Allocator) !Message {
- // std.debug.print("in client awaiting next message\n", .{});
- // errdefer std.debug.print("actually it was canceled\n", .{});
- // defer std.debug.print("client returning next message!\n", .{});
- return Message.next(allocator, self.from_client);
- // return self.send_queue.getOne(io);
- }
-};
+pub fn next(self: *Client, allocator: std.mem.Allocator) !Message {
+ // std.debug.print("in client awaiting next message\n", .{});
+ // errdefer std.debug.print("actually it was canceled\n", .{});
+ // defer std.debug.print("client returning next message!\n", .{});
+ return Message.next(allocator, self.from_client);
+ // return self.send_queue.getOne(io);
+}
fn writeOk(out: *std.Io.Writer) !void {
_ = try out.write("+OK\r\n");
diff --git a/src/server/main.zig b/src/server/main.zig
index 4f52bec..4d62e93 100644
--- a/src/server/main.zig
+++ b/src/server/main.zig
@@ -2,7 +2,7 @@ const std = @import("std");
const Message = @import("./message_parser.zig").Message;
pub const ServerInfo = Message.ServerInfo;
-const ClientState = @import("./client.zig").ClientState;
+const Client = @import("./client.zig");
const Server = @This();
const Subscription = struct {
@@ -12,7 +12,7 @@ const Subscription = struct {
};
info: ServerInfo,
-clients: std.AutoHashMapUnmanaged(usize, *ClientState) = .empty,
+clients: std.AutoHashMapUnmanaged(usize, *Client) = .empty,
subs_lock: std.Io.Mutex = .init,
subscriptions: std.ArrayList(Subscription) = .empty,
@@ -43,7 +43,6 @@ pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void {
defer threaded.deinit();
const io = threaded.io();
-
var tcp_server = try std.Io.net.IpAddress.listen(try std.Io.net.IpAddress.parse(
server.info.host,
server.info.port,
@@ -88,7 +87,7 @@ fn processMsgs(server: *Server, io: std.Io, alloc: std.mem.Allocator) void {
}
}
-fn addClient(server: *Server, allocator: std.mem.Allocator, id: usize, client: *ClientState) !void {
+fn addClient(server: *Server, allocator: std.mem.Allocator, id: usize, client: *Client) !void {
// server.clients.lockPointers();
try server.clients.put(allocator, id, client);
// server.clients.unlockPointers();
@@ -132,25 +131,25 @@ fn handleConnection(
var reader = stream.reader(io, r_buffer);
const in = &reader.interface;
- var client_state: ClientState = .init(null, in, out);
- try client_state.send(io, .{ .info = server.info });
+ var client: Client = .init(null, in, out);
+ try client.send(io, .{ .info = server.info });
var connect_arena: std.heap.ArenaAllocator = .init(allocator);
defer connect_arena.deinit();
- client_state.connect = (Message.next(connect_arena.allocator(), in) catch return).connect;
+ client.connect = (Message.next(connect_arena.allocator(), in) catch return).connect;
- try server.addClient(server_allocator, id, &client_state);
+ try server.addClient(server_allocator, id, &client);
defer server.removeClient(io, server_allocator, id);
// Messages are owned by the server after they are received from the client
- while (client_state.next(server_allocator)) |msg| {
+ while (client.next(server_allocator)) |msg| {
switch (msg) {
.ping => {
// Respond to ping with pong.
- try client_state.send(io, .pong);
+ try client.send(io, .pong);
},
.@"pub" => |pb| {
- _ = io.async(publishMessage, .{ server, io, server_allocator, &client_state, pb });
+ _ = io.async(publishMessage, .{ server, io, server_allocator, &client, pb });
},
.sub => |sub| {
try server.subscribe(io, server_allocator, id, sub);
@@ -181,7 +180,7 @@ fn subjectMatches(expected: []const u8, actual: []const u8) bool {
return std.mem.eql(u8, expected, actual);
}
-fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, source_client: *ClientState, msg: Message.Pub) !void {
+fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void {
errdefer {
if (source_client.connect) |c| {
if (c.verbose) {