summaryrefslogtreecommitdiff
path: root/src/server/Client.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/Client.zig')
-rw-r--r--src/server/Client.zig221
1 files changed, 221 insertions, 0 deletions
diff --git a/src/server/Client.zig b/src/server/Client.zig
new file mode 100644
index 0000000..2ce3c38
--- /dev/null
+++ b/src/server/Client.zig
@@ -0,0 +1,221 @@
+const Message = @import("message_parser.zig").Message;
+const std = @import("std");
+
+const Client = @This();
+
+connect: ?Message.Connect,
+
+// Messages for this client to receive.
+recv_queue: *std.Io.Queue(Message),
+
+from_client: *std.Io.Reader,
+to_client: *std.Io.Writer,
+
+pub fn init(
+ connect: ?Message.Connect,
+ recv_queue: *std.Io.Queue(Message),
+ in: *std.Io.Reader,
+ out: *std.Io.Writer,
+) Client {
+ return .{
+ .connect = connect,
+ .recv_queue = recv_queue,
+ .from_client = in,
+ .to_client = out,
+ };
+}
+
+pub fn deinit(self: *Client, alloc: std.mem.Allocator) void {
+ if (self.connect) |c| {
+ c.deinit(alloc);
+ }
+ self.* = undefined;
+}
+
+pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
+ var msgs: [8]Message = undefined;
+
+ while (true) {
+ const len = try self.recv_queue.get(io, &msgs, 1);
+ std.debug.assert(len <= msgs.len);
+ for (0..len) |i| {
+ const msg = msgs[i];
+ defer switch (msg) {
+ .msg => |m| m.deinit(alloc),
+ .hmsg => |h| h.deinit(alloc),
+ else => {},
+ };
+ errdefer for (msgs[i + 1 .. len]) |mg| switch (mg) {
+ .msg => |m| {
+ m.deinit(alloc);
+ },
+ else => {},
+ };
+ switch (msg) {
+ .@"+ok" => {
+ _ = try self.to_client.write("+OK\r\n");
+ },
+ .pong => {
+ _ = try self.to_client.write("PONG\r\n");
+ },
+ .info => |info| {
+ _ = try self.to_client.write("INFO ");
+ try std.json.Stringify.value(info, .{}, self.to_client);
+ _ = try self.to_client.write("\r\n");
+ },
+ .msg => |m| {
+ @branchHint(.likely);
+ try self.to_client.print(
+ "MSG {s} {s} {s} {d}\r\n{s}\r\n",
+ .{
+ m.subject,
+ m.sid,
+ m.reply_to orelse "",
+ m.payload.len,
+ m.payload,
+ },
+ );
+ },
+ .hmsg => |hmsg| {
+ @branchHint(.likely);
+ try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n{s}\r\n", .{
+ hmsg.msg.subject,
+ hmsg.msg.sid,
+ hmsg.msg.reply_to orelse "",
+ hmsg.header_bytes,
+ hmsg.msg.payload.len,
+ hmsg.msg.payload,
+ });
+ },
+ .@"-err" => |s| {
+ _ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
+ },
+ else => |m| {
+ std.debug.panic("unimplemented write: {any}\n", .{m});
+ },
+ }
+ }
+ try self.to_client.flush();
+ }
+}
+
+pub fn send(self: *Client, io: std.Io, msg: Message) !void {
+ try self.recv_queue.putOne(io, msg);
+}
+
+pub fn next(self: *Client, allocator: std.mem.Allocator) !Message {
+ return Message.next(allocator, self.from_client);
+}
+
+test {
+ const io = std.testing.io;
+ const gpa = std.testing.allocator;
+
+ var from_client: std.Io.Reader = .fixed(
+ "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n" ++
+ "PING\r\n",
+ );
+ var from_client_buf: [1024]Message = undefined;
+ var from_client_queue: std.Io.Queue(Message) = .init(&from_client_buf);
+
+ {
+ // Simulate stream
+ while (Message.next(gpa, &from_client)) |msg| {
+ try from_client_queue.putOne(io, msg);
+ } else |err| switch (err) {
+ error.EndOfStream => from_client_queue.close(io),
+ else => return err,
+ }
+
+ while (from_client_queue.getOne(io)) |msg| {
+ switch (msg) {
+ .connect => |*c| {
+ std.debug.print("Message: {any}\n", .{msg});
+ c.deinit(gpa);
+ },
+ else => {
+ std.debug.print("Message: {any}\n", .{msg});
+ },
+ }
+ } else |_| {}
+ }
+
+ from_client_queue = .init(&from_client_buf);
+ // Reset the reader to process it again.
+ from_client.seek = 0;
+
+ // {
+ // const SemiClient = struct {
+ // q: std.Io.Queue(Message),
+
+ // fn parseClientInput(self: *@This(), ioh: std.Io, in: *std.Io.Reader) void {
+ // defer std.debug.print("done parse\n", .{});
+ // while (Message.next(gpa, in)) |msg| {
+ // self.q.putOne(ioh, msg) catch return;
+ // } else |_| {}
+ // }
+
+ // fn next(self: *@This(), ioh: std.Io) !Message {
+ // return self.q.getOne(ioh);
+ // }
+
+ // fn printAll(self: *@This(), ioh: std.Io) void {
+ // defer std.debug.print("done print\n", .{});
+ // while (self.next(ioh)) |*msg| {
+ // std.debug.print("Client msg: {any}\n", .{msg});
+ // switch (msg.*) {
+ // .connect => |c| {
+ // c.deinit(gpa);
+ // },
+ // else => {},
+ // }
+ // } else |_| {}
+ // }
+ // };
+
+ // var c: SemiClient = .{ .q = from_client_queue };
+ // var group: std.Io.Group = .init;
+ // defer group.wait(io);
+
+ // group.concurrent(io, SemiClient.printAll, .{ &c, io }) catch {
+ // @panic("could not start printAll\n");
+ // };
+
+ // group.concurrent(io, SemiClient.parseClientInput, .{ &c, io, &from_client }) catch {
+ // @panic("could not start printAll\n");
+ // };
+ // }
+
+ ////////
+
+ // const connect = (Message.next(gpa, &from_client) catch unreachable).connect;
+
+ // var to_client_alloc: std.Io.Writer.Allocating = .init(gpa);
+ // defer to_client_alloc.deinit();
+ // var to_client = to_client_alloc.writer;
+
+ // var client: ClientState = try .init(io, gpa, 0, connect, &from_client, &to_client);
+ // defer client.deinit(gpa);
+
+ // {
+ // var get_next = io.concurrent(ClientState.next, .{ &client, io }) catch unreachable;
+ // defer if (get_next.cancel(io)) |_| {} else |_| @panic("fail");
+
+ // var timeout = io.concurrent(std.Io.sleep, .{ io, .fromMilliseconds(1000), .awake }) catch unreachable;
+ // defer timeout.cancel(io) catch {};
+
+ // switch (try io.select(.{
+ // .get_next = &get_next,
+ // .timeout = &timeout,
+ // })) {
+ // .get_next => |next| {
+ // std.debug.print("next is {any}\n", .{next});
+ // try std.testing.expect((next catch |err| return err) == .ping);
+ // },
+ // .timeout => {
+ // std.debug.print("reached timeout\n", .{});
+ // return error.TestUnexpectedResult;
+ // },
+ // }
+ // }
+}