summaryrefslogtreecommitdiff
path: root/src/Server/Client.zig
blob: 26be79fe21cbb53bdbba1cef50f3704168adbf81 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
const message = @import("message.zig");
const parse = message.parse;
const Message = message.Message;
const std = @import("std");
const Queue = std.Io.Queue;

const Client = @This();

connect: ?Message.Connect,
// Byte queue for this client to receive.
recv_queue: *Queue(u8),
// Only necessary to hold this lock for writing to the queue (to avoid interleaving message writes).
recv_queue_write_lock: std.Io.Mutex = .init,

from_client: *std.Io.Reader,
to_client: *std.Io.Writer,

pub fn init(
    connect: ?Message.Connect,
    recv_queue: *Queue(u8),
    in: *std.Io.Reader,
    out: *std.Io.Writer,
) Client {
    return .{
        .connect = connect,
        .recv_queue = recv_queue,
        .from_client = in,
        .to_client = out,
    };
}

pub fn deinit(self: *Client, alloc: std.mem.Allocator) void {
    if (self.connect) |c| {
        c.deinit(alloc);
    }
    self.* = undefined;
}

pub fn start(self: *Client, io: std.Io) !void {
    std.debug.assert(self.to_client.buffer.len > 0);
    std.debug.assert(self.to_client.end == 0);
    while (true) {
        self.to_client.end = try self.recv_queue.get(io, self.to_client.buffer, 1);
        try self.to_client.flush();
    }
}

pub fn send(self: *Client, io: std.Io, msg: []const u8) !void {
    try self.recv_queue.putAll(io, msg);
}

test send {
    const io = std.testing.io;
    const gpa = std.testing.allocator;

    var to_client: std.Io.Writer = .fixed(blk: {
        var buf: [1024]u8 = undefined;
        break :blk &buf;
    });
    var recv_queue: Queue(u8) = .init(&.{});
    var client: Client = .init(null, &recv_queue, undefined, &to_client);
    defer client.deinit(gpa);

    var c_task = try io.concurrent(Client.start, .{ &client, io });
    defer c_task.cancel(io) catch {};

    {
        try client.send(io, "PONG\r\n");
        // Wait for the concurrent client task to write to the writer
        try io.sleep(.fromMilliseconds(1), .awake);
        try std.testing.expectEqualSlices(u8, "PONG\r\n", to_client.buffered());
    }

    to_client.end = 0;

    {
        const payload = "payload";
        const msg: Message.Msg = .{
            .sid = "1",
            .subject = "subject",
            .reply_to = "reply",
            .payload = .{
                .len = payload.len,
                .short = blk: {
                    var buf: [128]u8 = undefined;
                    @memcpy(buf[0..payload.len], payload);
                    break :blk buf;
                },
                .long = null,
            },
        };
        try client.send(io, .{
            // msg must be owned by the allocator the client uses
            .MSG = try msg.dupe(gpa),
        });
        try io.sleep(.fromMilliseconds(1), .awake);
        try std.testing.expectEqualSlices(u8, "MSG subject 1 reply 7\r\npayload\r\n", to_client.buffered());
    }
}

pub fn next(self: *Client) !message.Control {
    return parse.control(self.from_client);
}

test next {
    const gpa = std.testing.allocator;

    var from_client: std.Io.Reader = .fixed(
        "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_r" ++
            "equired\":false,\"name\":\"NATS CLI Version v0.2." ++
            "4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"prot" ++
            "ocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n" ++
            "PING\r\n",
    );

    var client: Client = .init(null, undefined, undefined, undefined, &from_client, undefined);

    {
        // Simulate stream

        {
            const msg = try client.next(gpa);
            try std.testing.expectEqual(.CONNECT, std.meta.activeTag(msg));
            defer msg.CONNECT.deinit(gpa);
            try std.testing.expectEqualDeep(Message{
                .CONNECT = .{
                    .verbose = false,
                    .pedantic = false,
                    .tls_required = false,
                    .name = "NATS CLI Version v0.2.4",
                    .lang = "go",
                    .version = "1.43.0",
                    .protocol = 1,
                    .echo = true,
                    .headers = true,
                    .no_responders = true,
                },
            }, msg);
        }

        {
            const msg = try client.next(gpa);
            try std.testing.expectEqual(.PING, std.meta.activeTag(msg));
        }
    }
}