summaryrefslogtreecommitdiff
path: root/src/server/message_parser.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/message_parser.zig')
-rw-r--r--src/server/message_parser.zig154
1 files changed, 116 insertions, 38 deletions
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig
index 75e13d2..1a6b213 100644
--- a/src/server/message_parser.zig
+++ b/src/server/message_parser.zig
@@ -32,20 +32,48 @@ pub const MessageType = enum {
};
pub const Message = union(enum) {
- info: void,
-
+ /// TODO: REMOVE
+ not_real: void,
+ info: ServerInfo,
connect: Connect,
@"pub": Pub,
hpub: void,
- sub: void,
+ sub: Sub,
unsub: void,
- msg: void,
+ msg: Msg,
hmsg: void,
ping,
pong,
@"+ok": void,
@"-err": void,
- const Connect = struct {
+ pub const ServerInfo = struct {
+ /// The unique identifier of the NATS server.
+ server_id: []const u8,
+ /// The name of the NATS server.
+ server_name: []const u8,
+ /// The version of NATS.
+ version: []const u8,
+ /// The version of golang the NATS server was built with.
+ go: []const u8 = "0.0.0",
+ /// The IP address used to start the NATS server,
+ /// by default this will be 0.0.0.0 and can be
+ /// configured with -client_advertise host:port.
+ host: []const u8 = "0.0.0.0",
+ /// The port number the NATS server is configured
+ /// to listen on.
+ port: u16 = 4222,
+ /// Whether the server supports headers.
+ headers: bool = false,
+ /// Maximum payload size, in bytes, that the server
+ /// will accept from the client.
+ max_payload: u64,
+ /// An integer indicating the protocol version of
+ /// the server. The server version 1.2.0 sets this
+ /// to 1 to indicate that it supports the "Echo"
+ /// feature.
+ proto: u32 = 1,
+ };
+ pub const Connect = struct {
verbose: bool = false,
pedantic: bool = false,
tls_required: bool = false,
@@ -63,9 +91,26 @@ pub const Message = union(enum) {
headers: ?bool = null,
nkey: ?[]const u8 = null,
};
- const Pub = struct {
+ pub const Pub = struct {
+ /// The destination subject to publish to.
subject: []const u8,
+ /// The reply subject that subscribers can use to send a response back to the publisher/requestor.
reply_to: ?[]const u8 = null,
+ /// The message payload data.
+ payload: []const u8,
+ };
+ pub const Sub = struct {
+ /// The subject name to subscribe to.
+ subject: []const u8,
+ /// If specified, the subscriber will join this queue group.
+ queue_group: ?[]const u8,
+ /// A unique alphanumeric subscription ID, generated by the client.
+ sid: []const u8,
+ };
+ pub const Msg = struct {
+ subject: []const u8,
+ sid: []const u8,
+ reply_to: ?[]const u8,
payload: []const u8,
};
@@ -132,32 +177,7 @@ pub const Message = union(enum) {
try in.discardAll(1); // throw away space
// Parse subject
- const subject: []const u8 = blk: {
- // TODO: should be ARENA allocator
- var subject_list: std.ArrayList(u8) = try .initCapacity(alloc, 1024);
-
- // Handle the first character
- {
- const byte = try in.takeByte();
- if (byte == '.' or std.ascii.isWhitespace(byte))
- return error.InvalidSubject;
-
- try subject_list.append(alloc, byte);
- }
-
- while (in.takeByte() catch null) |byte| {
- if (std.ascii.isWhitespace(byte)) break;
- if (std.ascii.isAscii(byte)) {
- if (byte == '.') {
- const next_byte = try in.peekByte();
- if (next_byte == '.' or std.ascii.isWhitespace(next_byte))
- return error.InvalidSubject;
- }
- try subject_list.append(alloc, byte);
- }
- } else return error.InvalidStream;
- break :blk subject_list.items;
- };
+ const subject: []const u8 = try readSubject(alloc, in);
// Parse byte count
const byte_count = blk: {
@@ -186,12 +206,12 @@ pub const Message = union(enum) {
break :blk bytes;
};
- std.debug.print("buffer: '{s}'\n", .{in.buffered()});
- // return std.debug.panic("not implemented", .{});
- return .{ .@"pub" = .{
- .subject = subject,
- .payload = payload,
- } };
+ return .{
+ .@"pub" = .{
+ .subject = subject,
+ .payload = payload,
+ },
+ };
},
.ping => {
std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n"));
@@ -201,11 +221,69 @@ pub const Message = union(enum) {
std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n"));
return .pong;
},
+ .sub => {
+ std.debug.assert(std.ascii.isWhitespace(try in.takeByte()));
+ const subject = try readSubject(alloc, in);
+ const second = blk: {
+ // Drop whitespace
+ while (in.peekByte() catch null) |byte| {
+ if (std.ascii.isWhitespace(byte)) {
+ in.toss(1);
+ } else break;
+ } else return error.InvalidStream;
+
+ var acc: std.ArrayList(u8) = try .initCapacity(alloc, 32);
+ while (in.takeByte() catch null) |byte| {
+ if (std.ascii.isWhitespace(byte)) break;
+ try acc.append(alloc, byte);
+ } else return error.InvalidStream;
+
+ break :blk try acc.toOwnedSlice(alloc);
+ };
+ const queue_group = if ((try in.peekByte()) != '\r') second else null;
+ const sid = if (queue_group) |_| try in.takeDelimiterExclusive('\r') else second;
+ std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n"));
+ return .{
+ .sub = .{
+ .subject = subject,
+ .queue_group = queue_group,
+ .sid = sid,
+ },
+ };
+ },
else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}),
}
}
};
+fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 {
+ // TODO: should be ARENA allocator
+ var subject_list: std.ArrayList(u8) = try .initCapacity(alloc, 1024);
+
+ // Handle the first character
+ {
+ const byte = try in.takeByte();
+ std.debug.assert(!std.ascii.isWhitespace(byte));
+ if (byte == '.')
+ return error.InvalidSubject;
+
+ try subject_list.append(alloc, byte);
+ }
+
+ while (in.takeByte() catch null) |byte| {
+ if (std.ascii.isWhitespace(byte)) break;
+ if (std.ascii.isAscii(byte)) {
+ if (byte == '.') {
+ const next_byte = try in.peekByte();
+ if (next_byte == '.' or std.ascii.isWhitespace(next_byte))
+ return error.InvalidSubject;
+ }
+ try subject_list.append(alloc, byte);
+ }
+ } else return error.InvalidStream;
+ return subject_list.toOwnedSlice(alloc);
+}
+
fn parseJsonMessage(T: type, alloc: std.mem.Allocator, in: *std.Io.Reader) !T {
var reader: std.json.Reader = .init(alloc, in);
return std.json.innerParse(T, alloc, &reader, .{