summaryrefslogtreecommitdiff
path: root/src/server/main.zig
blob: b26f4b0b93c9e5800a47d8adad5f9a303bd6463d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
const std = @import("std");
const Message = @import("./message_parser.zig");

const ClientState = @import("./client.zig");

pub const ServerInfo = struct {
    /// The unique identifier of the NATS server.
    server_id: []const u8,
    /// The name of the NATS server.
    server_name: []const u8,
    /// The version of NATS.
    version: []const u8,
    /// The version of golang the NATS server was built with.
    go: []const u8 = "0.0.0",
    /// The IP address used to start the NATS server,
    /// by default this will be 0.0.0.0 and can be
    /// configured with -client_advertise host:port.
    host: []const u8 = "0.0.0.0",
    /// The port number the NATS server is configured
    /// to listen on.
    port: u16 = 4222,
    /// Whether the server supports headers.
    headers: bool = false,
    /// Maximum payload size, in bytes, that the server
    /// will accept from the client.
    max_payload: u64,
    /// An integer indicating the protocol version of
    /// the server. The server version 1.2.0 sets this
    /// to 1 to indicate that it supports the "Echo"
    /// feature.
    proto: u32 = 1,
};

server_info: ServerInfo,
clients: std.AutoHashMapUnmanaged(u64, ClientState) = .empty,
/// Map of subjects to client IDs that are subscribed to that subject.
subscriptions: std.StringHashMapUnmanaged(std.ArrayList(u64)),

pub fn main(gpa: std.mem.Allocator, main_args: anytype) !void {
    _ = gpa;
    _ = main_args;
}

fn handleConnection(allocator: std.mem.Allocator, io: std.Io, stream: std.Io.net.Stream, info: ServerInfo) void {
    defer stream.close(io);
    var w_buffer: [1024]u8 = undefined;
    var writer = stream.writer(io, &w_buffer);
    const out = &writer.interface;

    var r_buffer: [8192]u8 = undefined;
    var reader = stream.reader(io, &r_buffer);
    const in = &reader.interface;

    processClient(allocator, in, out, info) catch |err| {
        std.debug.panic("Error processing client: {}\n", .{err});
    };
}

fn processClient(gpa: std.mem.Allocator, in: *std.Io.Reader, out: *std.Io.Writer, info: ServerInfo) !void {
    try writeInfo(out, info);

    var client_state_arena: std.heap.ArenaAllocator = .init(gpa);
    defer client_state_arena.deinit();
    const client_state = (try Message.next(client_state_arena.allocator(), in)).connect;
    _ = client_state;

    var message_parsing_arena: std.heap.ArenaAllocator = .init(gpa);
    defer message_parsing_arena.deinit();
    const message_parsing_allocator = message_parsing_arena.allocator();
    while (true) {
        defer _ = message_parsing_arena.reset(.retain_capacity);
        const next_message = Message.next(message_parsing_allocator, in) catch |err| {
            switch (err) {
                error.EndOfStream => {
                    break;
                },
                else => {
                    return err;
                },
            }
        };
        switch (next_message) {
            .connect => |connect| {
                std.debug.panic("Connection message after already connected: {any}\n", .{connect});
            },
            .ping => try writePong(out),
            .@"pub" => try writeOk(out),
            else => |msg| std.debug.panic("Message type not implemented: {any}\n", .{msg}),
        }
    }
}

fn writeOk(out: *std.Io.Writer) !void {
    _ = try out.write("+OK\r\n");
    try out.flush();
}

fn writePong(out: *std.Io.Writer) !void {
    _ = try out.write("PONG\r\n");
    try out.flush();
}

fn writeInfo(out: *std.Io.Writer, info: ServerInfo) !void {
    _ = try out.write("INFO ");
    try std.json.Stringify.value(info, .{}, out);
    _ = try out.write("\r\n");
    try out.flush();
}

pub fn createId() []const u8 {
    return "SERVERID";
}

pub fn createName() []const u8 {
    return "SERVERNAME";
}