summaryrefslogtreecommitdiff
path: root/src/server/client.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/client.zig')
-rw-r--r--src/server/client.zig221
1 files changed, 0 insertions, 221 deletions
diff --git a/src/server/client.zig b/src/server/client.zig
deleted file mode 100644
index 2ce3c38..0000000
--- a/src/server/client.zig
+++ /dev/null
@@ -1,221 +0,0 @@
-const Message = @import("message_parser.zig").Message;
-const std = @import("std");
-
-const Client = @This();
-
-connect: ?Message.Connect,
-
-// Messages for this client to receive.
-recv_queue: *std.Io.Queue(Message),
-
-from_client: *std.Io.Reader,
-to_client: *std.Io.Writer,
-
-pub fn init(
- connect: ?Message.Connect,
- recv_queue: *std.Io.Queue(Message),
- in: *std.Io.Reader,
- out: *std.Io.Writer,
-) Client {
- return .{
- .connect = connect,
- .recv_queue = recv_queue,
- .from_client = in,
- .to_client = out,
- };
-}
-
-pub fn deinit(self: *Client, alloc: std.mem.Allocator) void {
- if (self.connect) |c| {
- c.deinit(alloc);
- }
- self.* = undefined;
-}
-
-pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
- var msgs: [8]Message = undefined;
-
- while (true) {
- const len = try self.recv_queue.get(io, &msgs, 1);
- std.debug.assert(len <= msgs.len);
- for (0..len) |i| {
- const msg = msgs[i];
- defer switch (msg) {
- .msg => |m| m.deinit(alloc),
- .hmsg => |h| h.deinit(alloc),
- else => {},
- };
- errdefer for (msgs[i + 1 .. len]) |mg| switch (mg) {
- .msg => |m| {
- m.deinit(alloc);
- },
- else => {},
- };
- switch (msg) {
- .@"+ok" => {
- _ = try self.to_client.write("+OK\r\n");
- },
- .pong => {
- _ = try self.to_client.write("PONG\r\n");
- },
- .info => |info| {
- _ = try self.to_client.write("INFO ");
- try std.json.Stringify.value(info, .{}, self.to_client);
- _ = try self.to_client.write("\r\n");
- },
- .msg => |m| {
- @branchHint(.likely);
- try self.to_client.print(
- "MSG {s} {s} {s} {d}\r\n{s}\r\n",
- .{
- m.subject,
- m.sid,
- m.reply_to orelse "",
- m.payload.len,
- m.payload,
- },
- );
- },
- .hmsg => |hmsg| {
- @branchHint(.likely);
- try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n{s}\r\n", .{
- hmsg.msg.subject,
- hmsg.msg.sid,
- hmsg.msg.reply_to orelse "",
- hmsg.header_bytes,
- hmsg.msg.payload.len,
- hmsg.msg.payload,
- });
- },
- .@"-err" => |s| {
- _ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
- },
- else => |m| {
- std.debug.panic("unimplemented write: {any}\n", .{m});
- },
- }
- }
- try self.to_client.flush();
- }
-}
-
-pub fn send(self: *Client, io: std.Io, msg: Message) !void {
- try self.recv_queue.putOne(io, msg);
-}
-
-pub fn next(self: *Client, allocator: std.mem.Allocator) !Message {
- return Message.next(allocator, self.from_client);
-}
-
-test {
- const io = std.testing.io;
- const gpa = std.testing.allocator;
-
- var from_client: std.Io.Reader = .fixed(
- "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n" ++
- "PING\r\n",
- );
- var from_client_buf: [1024]Message = undefined;
- var from_client_queue: std.Io.Queue(Message) = .init(&from_client_buf);
-
- {
- // Simulate stream
- while (Message.next(gpa, &from_client)) |msg| {
- try from_client_queue.putOne(io, msg);
- } else |err| switch (err) {
- error.EndOfStream => from_client_queue.close(io),
- else => return err,
- }
-
- while (from_client_queue.getOne(io)) |msg| {
- switch (msg) {
- .connect => |*c| {
- std.debug.print("Message: {any}\n", .{msg});
- c.deinit(gpa);
- },
- else => {
- std.debug.print("Message: {any}\n", .{msg});
- },
- }
- } else |_| {}
- }
-
- from_client_queue = .init(&from_client_buf);
- // Reset the reader to process it again.
- from_client.seek = 0;
-
- // {
- // const SemiClient = struct {
- // q: std.Io.Queue(Message),
-
- // fn parseClientInput(self: *@This(), ioh: std.Io, in: *std.Io.Reader) void {
- // defer std.debug.print("done parse\n", .{});
- // while (Message.next(gpa, in)) |msg| {
- // self.q.putOne(ioh, msg) catch return;
- // } else |_| {}
- // }
-
- // fn next(self: *@This(), ioh: std.Io) !Message {
- // return self.q.getOne(ioh);
- // }
-
- // fn printAll(self: *@This(), ioh: std.Io) void {
- // defer std.debug.print("done print\n", .{});
- // while (self.next(ioh)) |*msg| {
- // std.debug.print("Client msg: {any}\n", .{msg});
- // switch (msg.*) {
- // .connect => |c| {
- // c.deinit(gpa);
- // },
- // else => {},
- // }
- // } else |_| {}
- // }
- // };
-
- // var c: SemiClient = .{ .q = from_client_queue };
- // var group: std.Io.Group = .init;
- // defer group.wait(io);
-
- // group.concurrent(io, SemiClient.printAll, .{ &c, io }) catch {
- // @panic("could not start printAll\n");
- // };
-
- // group.concurrent(io, SemiClient.parseClientInput, .{ &c, io, &from_client }) catch {
- // @panic("could not start printAll\n");
- // };
- // }
-
- ////////
-
- // const connect = (Message.next(gpa, &from_client) catch unreachable).connect;
-
- // var to_client_alloc: std.Io.Writer.Allocating = .init(gpa);
- // defer to_client_alloc.deinit();
- // var to_client = to_client_alloc.writer;
-
- // var client: ClientState = try .init(io, gpa, 0, connect, &from_client, &to_client);
- // defer client.deinit(gpa);
-
- // {
- // var get_next = io.concurrent(ClientState.next, .{ &client, io }) catch unreachable;
- // defer if (get_next.cancel(io)) |_| {} else |_| @panic("fail");
-
- // var timeout = io.concurrent(std.Io.sleep, .{ io, .fromMilliseconds(1000), .awake }) catch unreachable;
- // defer timeout.cancel(io) catch {};
-
- // switch (try io.select(.{
- // .get_next = &get_next,
- // .timeout = &timeout,
- // })) {
- // .get_next => |next| {
- // std.debug.print("next is {any}\n", .{next});
- // try std.testing.expect((next catch |err| return err) == .ping);
- // },
- // .timeout => {
- // std.debug.print("reached timeout\n", .{});
- // return error.TestUnexpectedResult;
- // },
- // }
- // }
-}