summaryrefslogtreecommitdiff
path: root/src/Server
diff options
context:
space:
mode:
Diffstat (limited to 'src/Server')
-rw-r--r--src/Server/Client.zig192
-rw-r--r--src/Server/Payload.zig (renamed from src/Server/parse/Payload.zig)0
-rw-r--r--src/Server/message.zig (renamed from src/Server/parse/message.zig)30
-rw-r--r--src/Server/parse.zig1653
-rw-r--r--src/Server/serialize.zig6
5 files changed, 985 insertions, 896 deletions
diff --git a/src/Server/Client.zig b/src/Server/Client.zig
index 9ec928c..047f38d 100644
--- a/src/Server/Client.zig
+++ b/src/Server/Client.zig
@@ -11,29 +11,23 @@ pub const Msgs = union(enum) {
};
connect: ?Message.Connect,
-// Used to own messages that we receive in our queues.
-alloc: std.mem.Allocator,
-
-// Messages for this client to receive.
-recv_queue: *Queue(Message),
-msg_queue: *Queue(Msgs),
+// Byte queue for this client to receive.
+recv_queue: *Queue(u8),
+// Only necessary to hold this lock for writing to the queue (to avoid interleaving message writes).
+recv_queue_write_lock: std.Io.Mutex = .init,
from_client: *std.Io.Reader,
to_client: *std.Io.Writer,
pub fn init(
connect: ?Message.Connect,
- alloc: std.mem.Allocator,
- recv_queue: *Queue(Message),
- msg_queue: *Queue(Msgs),
+ recv_queue: *Queue(u8),
in: *std.Io.Reader,
out: *std.Io.Writer,
) Client {
return .{
.connect = connect,
- .alloc = alloc,
.recv_queue = recv_queue,
- .msg_queue = msg_queue,
.from_client = in,
.to_client = out,
};
@@ -47,97 +41,91 @@ pub fn deinit(self: *Client, alloc: std.mem.Allocator) void {
}
pub fn start(self: *Client, io: std.Io) !void {
- var msgs_buf: [1024]Msgs = undefined;
-
- var recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch @panic("Concurrency unavailable");
- errdefer _ = recv_msgs_task.cancel(io) catch {};
-
- var recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable;
- errdefer _ = recv_proto_task.cancel(io) catch {};
-
+ std.debug.assert(self.to_client.buffer.len > 0);
+ std.debug.assert(self.to_client.end == 0);
while (true) {
- switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) {
- .msgs => |len_err| {
- @branchHint(.likely);
- const msgs = msgs_buf[0..try len_err];
- for (0..msgs.len) |i| {
- const msg = msgs[i];
- defer switch (msg) {
- .MSG => |m| m.deinit(self.alloc),
- .HMSG => |h| h.deinit(self.alloc),
- };
- errdefer for (msgs[i + 1 ..]) |mg| switch (mg) {
- .MSG => |m| {
- m.deinit(self.alloc);
- },
- .HMSG => |h| {
- h.deinit(self.alloc);
- },
- };
- switch (msg) {
- .MSG => |m| {
- try self.to_client.print(
- "MSG {s} {s} {s} {d}\r\n",
- .{
- m.subject,
- m.sid,
- m.reply_to orelse "",
- m.payload.len,
- },
- );
- try m.payload.write(self.to_client);
- try self.to_client.print("\r\n", .{});
- },
- .HMSG => |hmsg| {
- try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{
- hmsg.msg.subject,
- hmsg.msg.sid,
- hmsg.msg.reply_to orelse "",
- hmsg.header_bytes,
- hmsg.msg.payload.len,
- });
- try hmsg.msg.payload.write(self.to_client);
- try self.to_client.print("\r\n", .{});
- },
- }
- }
- recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable;
- },
- .proto => |msg_err| {
- @branchHint(.unlikely);
- const msg = try msg_err;
- switch (msg) {
- .@"+OK" => {
- _ = try self.to_client.write("+OK\r\n");
- },
- .PONG => {
- _ = try self.to_client.write("PONG\r\n");
- },
- .INFO => |info| {
- _ = try self.to_client.write("INFO ");
- try std.json.Stringify.value(info, .{}, self.to_client);
- _ = try self.to_client.write("\r\n");
- },
- .@"-ERR" => |s| {
- _ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
- },
- else => |m| {
- std.debug.panic("unimplemented write: {any}\n", .{m});
- },
- }
- recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable;
- },
- }
+ self.to_client.end = try self.recv_queue.get(io, self.to_client.buffer, 1);
try self.to_client.flush();
}
+ // while (true) {
+ // switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) {
+ // .msgs => |len_err| {
+ // @branchHint(.likely);
+ // const msgs = msgs_buf[0..try len_err];
+ // for (0..msgs.len) |i| {
+ // const msg = msgs[i];
+ // defer switch (msg) {
+ // .MSG => |m| m.deinit(self.alloc),
+ // .HMSG => |h| h.deinit(self.alloc),
+ // };
+ // errdefer for (msgs[i + 1 ..]) |mg| switch (mg) {
+ // .MSG => |m| {
+ // m.deinit(self.alloc);
+ // },
+ // .HMSG => |h| {
+ // h.deinit(self.alloc);
+ // },
+ // };
+ // switch (msg) {
+ // .MSG => |m| {
+ // try self.to_client.print(
+ // "MSG {s} {s} {s} {d}\r\n",
+ // .{
+ // m.subject,
+ // m.sid,
+ // m.reply_to orelse "",
+ // m.payload.len,
+ // },
+ // );
+ // try m.payload.write(self.to_client);
+ // try self.to_client.print("\r\n", .{});
+ // },
+ // .HMSG => |hmsg| {
+ // try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{
+ // hmsg.msg.subject,
+ // hmsg.msg.sid,
+ // hmsg.msg.reply_to orelse "",
+ // hmsg.header_bytes,
+ // hmsg.msg.payload.len,
+ // });
+ // try hmsg.msg.payload.write(self.to_client);
+ // try self.to_client.print("\r\n", .{});
+ // },
+ // }
+ // }
+ // recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable;
+ // },
+ // .proto => |msg_err| {
+ // @branchHint(.unlikely);
+ // const msg = try msg_err;
+ // switch (msg) {
+ // .@"+OK" => {
+ // _ = try self.to_client.write("+OK\r\n");
+ // },
+ // .PONG => {
+ // _ = try self.to_client.write("PONG\r\n");
+ // },
+ // .INFO => |info| {
+ // _ = try self.to_client.write("INFO ");
+ // try std.json.Stringify.value(info, .{}, self.to_client);
+ // _ = try self.to_client.write("\r\n");
+ // },
+ // .@"-ERR" => |s| {
+ // _ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
+ // },
+ // else => |m| {
+ // std.debug.panic("unimplemented write: {any}\n", .{m});
+ // },
+ // }
+ // recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable;
+ // },
+ // }
+ // try self.to_client.flush();
+ // }
}
-pub fn send(self: *Client, io: std.Io, msg: Message) !void {
- switch (msg) {
- .MSG => |m| try self.msg_queue.putOne(io, .{ .MSG = m }),
- .HMSG => |m| try self.msg_queue.putOne(io, .{ .HMSG = m }),
- else => try self.recv_queue.putOne(io, msg),
- }
+pub fn send(self: *Client, io: std.Io, msg: []const u8) !void {
+ try self.recv_queue.putAll(io, msg);
}
test send {
@@ -148,19 +136,15 @@ test send {
var buf: [1024]u8 = undefined;
break :blk &buf;
});
- var recv_queue: Queue(Message) = .init(&.{});
- var msgs_queue: Queue(Msgs) = .init(blk: {
- var buf: [1]Msgs = undefined;
- break :blk &buf;
- });
- var client: Client = .init(null, gpa, &recv_queue, &msgs_queue, undefined, &to_client);
+ var recv_queue: Queue(u8) = .init(&.{});
+ var client: Client = .init(null, &recv_queue, undefined, &to_client);
defer client.deinit(gpa);
var c_task = try io.concurrent(Client.start, .{ &client, io });
defer c_task.cancel(io) catch {};
{
- try client.send(io, .PONG);
+ try client.send(io, "PONG\r\n");
// Wait for the concurrent client task to write to the writer
try io.sleep(.fromMilliseconds(1), .awake);
try std.testing.expectEqualSlices(u8, "PONG\r\n", to_client.buffered());
diff --git a/src/Server/parse/Payload.zig b/src/Server/Payload.zig
index b512a81..b512a81 100644
--- a/src/Server/parse/Payload.zig
+++ b/src/Server/Payload.zig
diff --git a/src/Server/parse/message.zig b/src/Server/message.zig
index c8a308f..f22410f 100644
--- a/src/Server/parse/message.zig
+++ b/src/Server/message.zig
@@ -106,13 +106,7 @@ pub const Message = union(enum) {
/// The reply subject that subscribers can use to send a response back to the publisher/requestor.
reply_to: ?[]const u8 = null,
/// The message payload data.
- payload: Payload,
-
- pub fn deinit(self: Pub, alloc: Allocator) void {
- alloc.free(self.subject);
- self.payload.deinit(alloc);
- if (self.reply_to) |r| alloc.free(r);
- }
+ payload: []const u8,
pub fn toMsg(self: Pub, alloc: Allocator, sid: []const u8) !Msg {
const res: Msg = .{
@@ -183,26 +177,6 @@ pub const Message = union(enum) {
subject: []const u8,
sid: []const u8,
reply_to: ?[]const u8,
- payload: Payload,
-
- pub fn deinit(self: Msg, alloc: Allocator) void {
- alloc.free(self.subject);
- alloc.free(self.sid);
- if (self.reply_to) |r| alloc.free(r);
- self.payload.deinit(alloc);
- }
-
- pub fn dupe(self: Msg, alloc: Allocator) !Msg {
- var res: Msg = undefined;
- res.subject = try alloc.dupe(u8, self.subject);
- errdefer alloc.free(res.subject);
- res.sid = try alloc.dupe(u8, self.sid);
- errdefer alloc.free(res.sid);
- res.reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null;
- errdefer if (res.reply_to) |r| alloc.free(r);
- res.payload = try self.payload.dupe(alloc);
- errdefer alloc.free(res.payload);
- return res;
- }
+ payload: []const u8,
};
};
diff --git a/src/Server/parse.zig b/src/Server/parse.zig
index 70dd2bc..6e013c4 100644
--- a/src/Server/parse.zig
+++ b/src/Server/parse.zig
@@ -15,9 +15,9 @@ const isWhitespace = std.ascii.isWhitespace;
const parseUnsigned = std.fmt.parseUnsigned;
-const message = @import("./parse/message.zig");
+const message = @import("./message.zig");
pub const Message = message.Message;
-pub const Payload = @import("./parse/Payload.zig");
+pub const Payload = @import("./Payload.zig");
const client_control = StaticStringMap(message.Control).initComptime(
.{
@@ -35,801 +35,926 @@ const client_control = StaticStringMap(message.Control).initComptime(
// {"-ERR", .@"-err"},
},
);
-fn parseStaticStringMap(input: []const u8) ?message.Control {
- return client_control.get(input);
-}
-
-/// Parse a string into its associated MessageType.
-const parse = parseStaticStringMap;
-/// Get the next Message from the input stream.
-pub fn next(alloc: Allocator, in: *Reader) !Message {
- var operation_string: ArrayList(u8) = blk: {
- comptime var buf_len = 0;
- comptime {
- for (client_control.keys()) |key| {
- buf_len = @max(buf_len, key.len);
- }
+pub fn control(in: *Reader) !message.Control {
+ const longest_ctrl = comptime blk: {
+ var min_len = 0;
+ for (client_control.keys()) |ctrl| {
+ min_len = @max(ctrl.len, min_len);
}
- var buf: [buf_len]u8 = undefined;
- break :blk .initBuffer(&buf);
- };
-
- while (in.peekByte()) |byte| {
- if (isUpper(byte)) {
- try operation_string.appendBounded(byte);
- in.toss(1);
- } else break;
- } else |err| return err;
-
- const operation = parse(operation_string.items) orelse {
- log.err("Invalid operation: '{s}'", .{operation_string.items});
- return error.InvalidOperation;
- };
-
- errdefer log.err("Failed to parse {s}", .{operation_string.items});
-
- switch (operation) {
- .CONNECT => return connect(alloc, in),
- .PUB => {
- @branchHint(.likely);
- return @"pub"(alloc, in);
- },
- .HPUB => {
- @branchHint(.likely);
- return hpub(alloc, in);
- },
- .PING => {
- try expectStreamBytes(in, "\r\n");
- return .PING;
- },
- .PONG => {
- try expectStreamBytes(in, "\r\n");
- return .PONG;
- },
- .SUB => return sub(alloc, in),
- .UNSUB => return unsub(alloc, in),
- else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}),
- }
-}
-
-pub fn connect(alloc: Allocator, in: *Reader) !Message {
- // for storing the json string
- var connect_string_writer_allocating: AllocatingWriter = .init(alloc);
- defer connect_string_writer_allocating.deinit();
- var connect_string_writer = &connect_string_writer_allocating.writer;
-
- // for parsing the json string
- var connect_arena_allocator: ArenaAllocator = .init(alloc);
- defer connect_arena_allocator.deinit();
- const connect_allocator = connect_arena_allocator.allocator();
-
- try in.discardAll(1); // throw away space
-
- // Should read the next JSON object to the fixed buffer writer.
- _ = try in.streamDelimiter(connect_string_writer, '}');
- try connect_string_writer.writeByte('}');
- try expectStreamBytes(in, "}\r\n"); // discard '}\r\n'
-
- const connect_str = try connect_string_writer_allocating.toOwnedSlice();
- defer alloc.free(connect_str);
- const res = try std.json.parseFromSliceLeaky(
- Message.Connect,
- connect_allocator,
- connect_str,
- .{ .allocate = .alloc_always },
- );
-
- return .{ .CONNECT = try res.dupe(alloc) };
-}
-
-pub fn sub(alloc: Allocator, in: *Reader) !Message {
- try in.discardAll(1); // throw away space
- const subject = try readSubject(alloc, in, .sub);
-
- const States = enum {
- before_second,
- in_second,
- after_second,
- in_third,
- in_end,
- };
-
- var second: ArrayList(u8) = .empty;
- errdefer second.deinit(alloc);
- var third: ?ArrayList(u8) = null;
- errdefer if (third) |*t| t.deinit(alloc);
-
- sw: switch (@as(States, .before_second)) {
- .before_second => {
- const byte = try in.peekByte();
- if (isWhitespace(byte)) {
- in.toss(1);
- continue :sw .before_second;
- }
- continue :sw .in_second;
- },
- .in_second => {
- for (1..in.buffer.len) |i| {
- try in.fill(i + 1);
- if (isWhitespace(in.buffered()[i])) {
- @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]);
- in.toss(i);
- break;
- }
- } else return error.EndOfStream;
- continue :sw .after_second;
- },
- .after_second => {
- const byte = try in.peekByte();
- if (byte == '\r') {
- continue :sw .in_end;
- } else if (isWhitespace(byte)) {
- in.toss(1);
- continue :sw .after_second;
- }
- third = .empty;
- continue :sw .in_third;
- },
- .in_third => {
- for (1..in.buffer.len) |i| {
- try in.fill(i + 1);
- if (isWhitespace(in.buffered()[i])) {
- @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]);
- in.toss(i);
- break;
- }
- } else return error.EndOfStream;
- continue :sw .in_end;
- },
- .in_end => {
- try expectStreamBytes(in, "\r\n");
- },
- }
-
- return .{
- .SUB = .{
- .subject = subject,
- .queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null,
- .sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc),
- },
- };
-}
-
-pub fn unsub(alloc: Allocator, in: *Reader) !Message {
- const States = enum {
- before_first,
- in_first,
- after_first,
- in_second,
- in_end,
- };
-
- var first: ArrayList(u8) = .empty;
- errdefer first.deinit(alloc);
- var second: ?ArrayList(u8) = null;
- defer if (second) |*s| s.deinit(alloc);
-
- sw: switch (@as(States, .before_first)) {
- .before_first => {
- const byte = try in.peekByte();
- if (isWhitespace(byte)) {
- in.toss(1);
- continue :sw .before_first;
- }
- continue :sw .in_first;
- },
- .in_first => {
- const byte = try in.peekByte();
- if (!isWhitespace(byte)) {
- try first.append(alloc, byte);
- in.toss(1);
- continue :sw .in_first;
- }
- continue :sw .after_first;
- },
- .after_first => {
- const byte = try in.peekByte();
- if (byte == '\r') {
- continue :sw .in_end;
- } else if (isWhitespace(byte)) {
- in.toss(1);
- continue :sw .after_first;
- }
- second = .empty;
- continue :sw .in_second;
- },
- .in_second => {
- const byte = try in.peekByte();
- if (byte == '\r') {
- continue :sw .in_end;
- }
- try second.?.append(alloc, byte);
- in.toss(1);
- continue :sw .in_second;
- },
- .in_end => {
- try expectStreamBytes(in, "\r\n");
- },
- }
-
- return .{
- .UNSUB = .{
- .sid = try first.toOwnedSlice(alloc),
- .max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null,
- },
- };
-}
-
-pub fn @"pub"(alloc: Allocator, in: *Reader) !Message {
- try in.discardAll(1); // throw away space
-
- // Parse subject
- const subject: []const u8 = try readSubject(alloc, in, .@"pub");
- errdefer alloc.free(subject);
-
- const States = enum {
- before_second,
- in_second,
- after_second,
- in_third,
- in_end,
- };
-
- var second: ArrayList(u8) = .empty;
- defer second.deinit(alloc);
- var third: ?ArrayList(u8) = null;
- defer if (third) |*t| t.deinit(alloc);
-
- sw: switch (@as(States, .before_second)) {
- .before_second => {
- // Drop whitespace
- const byte = try in.peekByte();
- if (isWhitespace(byte)) {
- in.toss(1);
- continue :sw .before_second;
- }
- continue :sw .in_second;
- },
- .in_second => {
- for (1..in.buffer.len) |i| {
- try in.fill(i + 1);
- if (isWhitespace(in.buffered()[i])) {
- @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]);
- in.toss(i);
- break;
- }
- } else return error.EndOfStream;
- continue :sw .after_second;
- },
- .after_second => {
- const byte = try in.peekByte();
- if (byte == '\r') {
- continue :sw .in_end;
- } else if (isWhitespace(byte)) {
- in.toss(1);
- continue :sw .after_second;
- }
- third = .empty;
- continue :sw .in_third;
- },
- .in_third => {
- for (1..in.buffer.len) |i| {
- try in.fill(i + 1);
- if (isWhitespace(in.buffered()[i])) {
- @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]);
- in.toss(i);
- break;
- }
- } else return error.EndOfStream;
- continue :sw .in_end;
- },
- .in_end => {
- try expectStreamBytes(in, "\r\n");
- },
- }
-
- const reply_to: ?[]const u8, const bytes: usize =
- if (third) |t| .{
- try alloc.dupe(u8, second.items),
- try parseUnsigned(usize, t.items, 10),
- } else .{
- null,
- try parseUnsigned(usize, second.items, 10),
- };
-
- const payload: Payload = try .read(alloc, in, bytes);
- errdefer payload.deinit(alloc);
- try expectStreamBytes(in, "\r\n");
-
- return .{
- .PUB = .{
- .subject = subject,
- .payload = payload,
- .reply_to = reply_to,
- },
- };
-}
-
-pub fn hpub(alloc: Allocator, in: *Reader) !Message {
- try in.discardAll(1); // throw away space
-
- // Parse subject
- const subject: []const u8 = try readSubject(alloc, in, .@"pub");
- errdefer alloc.free(subject);
-
- const States = enum {
- before_second,
- in_second,
- after_second,
- in_third,
- after_third,
- in_fourth,
- in_end,
+ break :blk min_len;
};
-
- var second: ArrayList(u8) = .empty;
- defer second.deinit(alloc);
- var third: ArrayList(u8) = .empty;
- defer third.deinit(alloc);
- var fourth: ?ArrayList(u8) = null;
- defer if (fourth) |*f| f.deinit(alloc);
-
- sw: switch (@as(States, .before_second)) {
- .before_second => {
- // Drop whitespace
- const byte = try in.peekByte();
- if (isWhitespace(byte)) {
- in.toss(1);
- continue :sw .before_second;
- }
- continue :sw .in_second;
- },
- .in_second => {
- for (1..in.buffer.len) |i| {
- try in.fill(i + 1);
- if (isWhitespace(in.buffered()[i])) {
- @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]);
- in.toss(i);
- break;
- }
- } else return error.EndOfStream;
- continue :sw .after_second;
- },
- .after_second => {
- const byte = try in.peekByte();
- if (byte == '\r') {
- continue :sw .in_end;
- } else if (isWhitespace(byte)) {
- in.toss(1);
- continue :sw .after_second;
+ std.debug.assert(in.buffer.len >= longest_ctrl);
+ while (true) {
+ var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r");
+ if (iter.next()) |str| {
+ if (client_control.get(str)) |ctrl| {
+ in.toss(str.len);
+ return ctrl;
+ } else if (str.len >= longest_ctrl) {
+ return error.InvalidControl;
}
- third = .empty;
- continue :sw .in_third;
- },
- .in_third => {
- for (1..in.buffer.len) |i| {
- try in.fill(i + 1);
- if (isWhitespace(in.buffered()[i])) {
- @memcpy(try third.addManyAsSlice(alloc, i), in.buffered()[0..i]);
- in.toss(i);
- break;
- }
- } else return error.EndOfStream;
- continue :sw .after_third;
- },
- .after_third => {
- const byte = try in.peekByte();
- if (byte == '\r') {
- continue :sw .in_end;
- } else if (isWhitespace(byte)) {
- in.toss(1);
- continue :sw .after_third;
- }
- fourth = .empty;
- continue :sw .in_fourth;
- },
- .in_fourth => {
- for (1..in.buffer.len) |i| {
- try in.fill(i + 1);
- if (isWhitespace(in.buffered()[i])) {
- @memcpy(try fourth.?.addManyAsSlice(alloc, i), in.buffered()[0..i]);
- in.toss(i);
- break;
- }
- } else return error.EndOfStream;
- continue :sw .in_end;
- },
- .in_end => {
- try expectStreamBytes(in, "\r\n");
- },
- }
-
- const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize =
- if (fourth) |f| .{
- try alloc.dupe(u8, second.items),
- try parseUnsigned(usize, third.items, 10),
- try parseUnsigned(usize, f.items, 10),
- } else .{
- null,
- try parseUnsigned(usize, second.items, 10),
- try parseUnsigned(usize, third.items, 10),
- };
-
- const payload: Payload = try .read(alloc, in, total_bytes);
- errdefer payload.deinit(alloc);
- try expectStreamBytes(in, "\r\n");
-
- return .{
- .HPUB = .{
- .header_bytes = header_bytes,
- .@"pub" = .{
- .subject = subject,
- .payload = payload,
- .reply_to = reply_to,
- },
- },
- };
-}
-
-fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 {
- var subject_list: ArrayList(u8) = .empty;
- errdefer subject_list.deinit(alloc);
-
- // Handle the first character
- {
- const byte = try in.takeByte();
- if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>')))
- return error.InvalidStream;
-
- try subject_list.append(alloc, byte);
- }
-
- switch (pub_or_sub) {
- .sub => {
- while (in.takeByte()) |byte| {
- if (isWhitespace(byte)) break;
- if (byte == '.') {
- const next_byte = try in.peekByte();
- if (next_byte == '.' or isWhitespace(next_byte))
- return error.InvalidStream;
- } else if (byte == '>') {
- const next_byte = try in.takeByte();
- if (!isWhitespace(next_byte))
- return error.InvalidStream;
- } else if (byte == '*') {
- const next_byte = try in.peekByte();
- if (next_byte != '.' and !isWhitespace(next_byte))
- return error.InvalidStream;
- }
- try subject_list.append(alloc, byte);
- } else |err| return err;
- },
- .@"pub" => {
- while (in.takeByte()) |byte| {
- if (isWhitespace(byte)) break;
- if (byte == '*' or byte == '>') return error.InvalidStream;
- if (byte == '.') {
- const next_byte = try in.peekByte();
- if (next_byte == '.' or isWhitespace(next_byte))
- return error.InvalidStream;
- }
- try subject_list.append(alloc, byte);
- } else |err| return err;
- },
- }
-
- return subject_list.toOwnedSlice(alloc);
-}
-
-inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void {
- if (!std.mem.eql(u8, try reader.take(expected.len), expected)) {
- @branchHint(.unlikely);
- return error.InvalidStream;
+ }
+ try in.fillMore();
}
}
-test sub {
- const alloc = std.testing.allocator;
- const expectEqualDeep = std.testing.expectEqualDeep;
+test control {
{
- var in: Reader = .fixed(" foo 1\r\n");
- var res = try sub(alloc, &in);
- defer res.SUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .SUB = .{
- .subject = "foo",
- .queue_group = null,
- .sid = "1",
- },
- },
- res,
- );
+ var buf: [7]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "PUB " },
+ });
+ try std.testing.expectEqual(.PUB, try control(&in.interface));
+ try std.testing.expectEqualSlices(u8, " ", in.interface.buffered());
}
{
- var in: Reader = .fixed(" foo 1\r\n");
- var res = try sub(alloc, &in);
- defer res.SUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .SUB = .{
- .subject = "foo",
- .queue_group = null,
- .sid = "1",
- },
- },
- res,
- );
+ var buf: [7]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "PUB" },
+ });
+ try std.testing.expectEqual(.PUB, try control(&in.interface));
+ try std.testing.expectEqualSlices(u8, "", in.interface.buffered());
}
{
- var in: Reader = .fixed(" foo q 1\r\n");
- var res = try sub(alloc, &in);
- defer res.SUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .SUB = .{
- .subject = "foo",
- .queue_group = "q",
- .sid = "1",
- },
- },
- res,
- );
+ var buf: [7]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "PU" },
+ .{ .buffer = "B" },
+ });
+ try std.testing.expectEqual(.PUB, try control(&in.interface));
+ try std.testing.expectEqualSlices(u8, "", in.interface.buffered());
}
{
- var in: Reader = .fixed(" 1 q 1\r\n");
- var res = try sub(alloc, &in);
- defer res.SUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .SUB = .{
- .subject = "1",
- .queue_group = "q",
- .sid = "1",
- },
- },
- res,
- );
+ var buf: [7]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "PIN" },
+ .{ .buffer = "G\r\n" },
+ });
+ try std.testing.expectEqual(.PING, try control(&in.interface));
+ try std.testing.expectEqualSlices(u8, "\r\n", in.interface.buffered());
}
{
- var in: Reader = .fixed(" $SRV.PING 4\r\n");
- var res = try sub(alloc, &in);
- defer res.SUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .SUB = .{
- .subject = "$SRV.PING",
- .queue_group = null,
- .sid = "4",
- },
- },
- res,
- );
+ var buf: [7]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "CONNECT" },
+ });
+ try std.testing.expectEqual(.CONNECT, try control(&in.interface));
}
{
- var in: Reader = .fixed(" foo.echo q 10\r\n");
- var res = try sub(alloc, &in);
- defer res.SUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .SUB = .{
- .subject = "foo.echo",
- .queue_group = "q",
- .sid = "10",
- },
- },
- res,
- );
+ var buf: [7]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "CONNECC" },
+ });
+ try std.testing.expectError(error.InvalidControl, control(&in.interface));
}
}
-test unsub {
- const alloc = std.testing.allocator;
- const expectEqualDeep = std.testing.expectEqualDeep;
- const expectEqual = std.testing.expectEqual;
- {
- var in: Reader = .fixed(" 1\r\n");
- var res = try unsub(alloc, &in);
- defer res.UNSUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .UNSUB = .{
- .sid = "1",
- .max_msgs = null,
- },
- },
- res,
- );
- try expectEqual(0, in.buffered().len);
- }
+/// The return value is owned by the reader passed to this function.
+/// Operations that modify the readers buffer invalidates this value.
+pub fn @"pub"(in: *Reader) !Message.Pub {
+ // TODO: Add pedantic option.
+ // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1
+
+ while (true) {
+ var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r");
+
+ if (iter.next()) |subject| {
+ if (iter.next()) |second| {
+ if (in.buffered()[iter.index] == '\r') {
+ const bytes_str = second;
+ const bytes = try parseUnsigned(usize, bytes_str, 10);
+
+ // 4 bytes for CRLF on either side of the payload.
+ in.toss(iter.index + bytes + 4);
+ return .{
+ .subject = subject,
+ .reply_to = null,
+ .payload = iter.rest()[1 .. 1 + bytes],
+ };
+ }
- {
- var in: Reader = .fixed(" 1 1\r\n");
- var res = try unsub(alloc, &in);
- defer res.UNSUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .UNSUB = .{
- .sid = "1",
- .max_msgs = 1,
- },
- },
- res,
- );
- try expectEqual(0, in.buffered().len);
+ const reply_to = second;
+ if (iter.next()) |bytes_str| {
+ const bytes = try parseUnsigned(usize, bytes_str, 10);
+
+ if (in.buffered()[iter.index] == '\r') {
+ if (iter.rest().len > bytes) {
+ // 4 bytes for CRLF on either side of the payload.
+ in.toss(iter.index + bytes + 4);
+ return .{
+ .subject = subject,
+ .reply_to = reply_to,
+ .payload = iter.rest()[1 .. 1 + bytes],
+ };
+ }
+ }
+ }
+ }
+ }
+
+ try in.fillMore();
}
}
test @"pub" {
- const alloc = std.testing.allocator;
- const expectEqualDeep = std.testing.expectEqualDeep;
- const expectEqual = std.testing.expectEqual;
{
- var in: Reader = .fixed(" foo 3\r\nbar\r\n");
- var res = try @"pub"(alloc, &in);
- defer res.PUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .PUB = .{
- .subject = "foo",
- .reply_to = null,
- .payload = .{
- .len = 3,
- .short = blk: {
- var s: [128]u8 = undefined;
- @memcpy(s[0..3], "bar");
- break :blk s;
- },
- .long = null,
- },
- },
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo bar 2\r\nhi\r\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Pub{
+ .subject = "foo",
+ .reply_to = "bar",
+ .payload = "hi",
},
- res,
+ try @"pub"(&in.interface),
);
- try expectEqual(0, in.buffered().len);
+ try std.testing.expectEqualSlices(u8, "", in.interface.buffered());
}
-
{
- var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n");
- var res = try @"pub"(alloc, &in);
- defer res.PUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .PUB = .{
- .subject = "foo",
- .reply_to = "reply.to",
- .payload = .{
- .len = 3,
- .short = blk: {
- var s: [128]u8 = undefined;
- @memcpy(s[0..3], "bar");
- break :blk s;
- },
- .long = null,
- },
- },
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo 2\r\nhi\r\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Pub{
+ .subject = "foo",
+ .reply_to = null,
+ .payload = "hi",
},
- res,
+ try @"pub"(&in.interface),
);
- try expectEqual(0, in.buffered().len);
- }
-
- // numeric reply subject
- {
- var in: Reader = .fixed(" foo 5 3\r\nbar\r\n");
- var res = try @"pub"(alloc, &in);
- defer res.PUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .PUB = .{
- .subject = "foo",
- .reply_to = "5",
- .payload = .{
- .len = 3,
- .short = blk: {
- var s: [128]u8 = undefined;
- @memcpy(s[0..3], "bar");
- break :blk s;
- },
- .long = null,
- },
- },
- },
- res,
- );
- try expectEqual(0, in.buffered().len);
+ try std.testing.expectEqualSlices(u8, "", in.interface.buffered());
}
}
-test hpub {
- const alloc = std.testing.allocator;
- const expectEqualDeep = std.testing.expectEqualDeep;
- const expectEqual = std.testing.expectEqual;
- {
- var in: Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
- var res = try hpub(alloc, &in);
- defer res.HPUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .HPUB = .{
- .header_bytes = 22,
- .@"pub" = .{
- .subject = "foo",
- .reply_to = null,
- .payload = .{
- .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len,
- .short = blk: {
- var s: [128]u8 = undefined;
- const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!";
- @memcpy(s[0..str.len], str);
- break :blk s;
- },
- .long = null,
- },
- },
- },
- },
- res,
- );
- try expectEqual(0, in.buffered().len);
- }
-
- {
- var in: Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
- var res = try hpub(alloc, &in);
- defer res.HPUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .HPUB = .{
- .header_bytes = 22,
- .@"pub" = .{
- .subject = "foo",
- .reply_to = "reply.to",
- .payload = .{
- .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len,
- .short = blk: {
- var s: [128]u8 = undefined;
- const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!";
- @memcpy(s[0..str.len], str);
- break :blk s;
- },
- .long = null,
- },
- },
- },
- },
- res,
- );
- try expectEqual(0, in.buffered().len);
- }
-
- {
- var in: Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
- var res = try hpub(alloc, &in);
- defer res.HPUB.deinit(alloc);
- try expectEqualDeep(
- Message{
- .HPUB = .{
- .header_bytes = 22,
- .@"pub" = .{
- .subject = "foo",
- .reply_to = "6",
- .payload = .{
- .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len,
- .short = blk: {
- var s: [128]u8 = undefined;
- const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!";
- @memcpy(s[0..str.len], str);
- break :blk s;
- },
- .long = null,
- },
- },
- },
- },
- res,
- );
- try expectEqual(0, in.buffered().len);
- }
-}
+// /// Get the next Message from the input stream.
+// pub fn next(alloc: Allocator, in: *Reader) !Message {
+// errdefer log.err("Failed to parse {s}", .{operation_string.items});
+
+// switch (operation) {
+// .CONNECT => return connect(alloc, in),
+// .PUB => {
+// @branchHint(.likely);
+// return @"pub"(alloc, in);
+// },
+// .HPUB => {
+// @branchHint(.likely);
+// return hpub(alloc, in);
+// },
+// .PING => {
+// try expectStreamBytes(in, "\r\n");
+// return .PING;
+// },
+// .PONG => {
+// try expectStreamBytes(in, "\r\n");
+// return .PONG;
+// },
+// .SUB => return sub(alloc, in),
+// .UNSUB => return unsub(alloc, in),
+// else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}),
+// }
+// }
+
+// pub fn connect(alloc: Allocator, in: *Reader) !Message {
+// // for storing the json string
+// var connect_string_writer_allocating: AllocatingWriter = .init(alloc);
+// defer connect_string_writer_allocating.deinit();
+// var connect_string_writer = &connect_string_writer_allocating.writer;
+
+// // for parsing the json string
+// var connect_arena_allocator: ArenaAllocator = .init(alloc);
+// defer connect_arena_allocator.deinit();
+// const connect_allocator = connect_arena_allocator.allocator();
+
+// try in.discardAll(1); // throw away space
+
+// // Should read the next JSON object to the fixed buffer writer.
+// _ = try in.streamDelimiter(connect_string_writer, '}');
+// try connect_string_writer.writeByte('}');
+// try expectStreamBytes(in, "}\r\n"); // discard '}\r\n'
+
+// const connect_str = try connect_string_writer_allocating.toOwnedSlice();
+// defer alloc.free(connect_str);
+// const res = try std.json.parseFromSliceLeaky(
+// Message.Connect,
+// connect_allocator,
+// connect_str,
+// .{ .allocate = .alloc_always },
+// );
+
+// return .{ .CONNECT = try res.dupe(alloc) };
+// }
+
+// pub fn sub(alloc: Allocator, in: *Reader) !Message {
+// try in.discardAll(1); // throw away space
+// const subject = try readSubject(alloc, in, .sub);
+
+// const States = enum {
+// before_second,
+// in_second,
+// after_second,
+// in_third,
+// in_end,
+// };
+
+// var second: ArrayList(u8) = .empty;
+// errdefer second.deinit(alloc);
+// var third: ?ArrayList(u8) = null;
+// errdefer if (third) |*t| t.deinit(alloc);
+
+// sw: switch (@as(States, .before_second)) {
+// .before_second => {
+// const byte = try in.peekByte();
+// if (isWhitespace(byte)) {
+// in.toss(1);
+// continue :sw .before_second;
+// }
+// continue :sw .in_second;
+// },
+// .in_second => {
+// for (1..in.buffer.len) |i| {
+// try in.fill(i + 1);
+// if (isWhitespace(in.buffered()[i])) {
+// @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]);
+// in.toss(i);
+// break;
+// }
+// } else return error.EndOfStream;
+// continue :sw .after_second;
+// },
+// .after_second => {
+// const byte = try in.peekByte();
+// if (byte == '\r') {
+// continue :sw .in_end;
+// } else if (isWhitespace(byte)) {
+// in.toss(1);
+// continue :sw .after_second;
+// }
+// third = .empty;
+// continue :sw .in_third;
+// },
+// .in_third => {
+// for (1..in.buffer.len) |i| {
+// try in.fill(i + 1);
+// if (isWhitespace(in.buffered()[i])) {
+// @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]);
+// in.toss(i);
+// break;
+// }
+// } else return error.EndOfStream;
+// continue :sw .in_end;
+// },
+// .in_end => {
+// try expectStreamBytes(in, "\r\n");
+// },
+// }
+
+// return .{
+// .SUB = .{
+// .subject = subject,
+// .queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null,
+// .sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc),
+// },
+// };
+// }
+
+// pub fn unsub(alloc: Allocator, in: *Reader) !Message {
+// const States = enum {
+// before_first,
+// in_first,
+// after_first,
+// in_second,
+// in_end,
+// };
+
+// var first: ArrayList(u8) = .empty;
+// errdefer first.deinit(alloc);
+// var second: ?ArrayList(u8) = null;
+// defer if (second) |*s| s.deinit(alloc);
+
+// sw: switch (@as(States, .before_first)) {
+// .before_first => {
+// const byte = try in.peekByte();
+// if (isWhitespace(byte)) {
+// in.toss(1);
+// continue :sw .before_first;
+// }
+// continue :sw .in_first;
+// },
+// .in_first => {
+// const byte = try in.peekByte();
+// if (!isWhitespace(byte)) {
+// try first.append(alloc, byte);
+// in.toss(1);
+// continue :sw .in_first;
+// }
+// continue :sw .after_first;
+// },
+// .after_first => {
+// const byte = try in.peekByte();
+// if (byte == '\r') {
+// continue :sw .in_end;
+// } else if (isWhitespace(byte)) {
+// in.toss(1);
+// continue :sw .after_first;
+// }
+// second = .empty;
+// continue :sw .in_second;
+// },
+// .in_second => {
+// const byte = try in.peekByte();
+// if (byte == '\r') {
+// continue :sw .in_end;
+// }
+// try second.?.append(alloc, byte);
+// in.toss(1);
+// continue :sw .in_second;
+// },
+// .in_end => {
+// try expectStreamBytes(in, "\r\n");
+// },
+// }
+
+// return .{
+// .UNSUB = .{
+// .sid = try first.toOwnedSlice(alloc),
+// .max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null,
+// },
+// };
+// }
+
+// pub fn @"pub"(alloc: Allocator, in: *Reader) !Message {
+// try in.discardAll(1); // throw away space
+
+// // Parse subject
+// const subject: []const u8 = try readSubject(alloc, in, .@"pub");
+// errdefer alloc.free(subject);
+
+// const States = enum {
+// before_second,
+// in_second,
+// after_second,
+// in_third,
+// in_end,
+// };
+
+// var second: ArrayList(u8) = .empty;
+// defer second.deinit(alloc);
+// var third: ?ArrayList(u8) = null;
+// defer if (third) |*t| t.deinit(alloc);
+
+// sw: switch (@as(States, .before_second)) {
+// .before_second => {
+// // Drop whitespace
+// const byte = try in.peekByte();
+// if (isWhitespace(byte)) {
+// in.toss(1);
+// continue :sw .before_second;
+// }
+// continue :sw .in_second;
+// },
+// .in_second => {
+// for (1..in.buffer.len) |i| {
+// try in.fill(i + 1);
+// if (isWhitespace(in.buffered()[i])) {
+// @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]);
+// in.toss(i);
+// break;
+// }
+// } else return error.EndOfStream;
+// continue :sw .after_second;
+// },
+// .after_second => {
+// const byte = try in.peekByte();
+// if (byte == '\r') {
+// continue :sw .in_end;
+// } else if (isWhitespace(byte)) {
+// in.toss(1);
+// continue :sw .after_second;
+// }
+// third = .empty;
+// continue :sw .in_third;
+// },
+// .in_third => {
+// for (1..in.buffer.len) |i| {
+// try in.fill(i + 1);
+// if (isWhitespace(in.buffered()[i])) {
+// @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]);
+// in.toss(i);
+// break;
+// }
+// } else return error.EndOfStream;
+// continue :sw .in_end;
+// },
+// .in_end => {
+// try expectStreamBytes(in, "\r\n");
+// },
+// }
+
+// const reply_to: ?[]const u8, const bytes: usize =
+// if (third) |t| .{
+// try alloc.dupe(u8, second.items),
+// try parseUnsigned(usize, t.items, 10),
+// } else .{
+// null,
+// try parseUnsigned(usize, second.items, 10),
+// };
+
+// const payload: Payload = try .read(alloc, in, bytes);
+// errdefer payload.deinit(alloc);
+// try expectStreamBytes(in, "\r\n");
+
+// return .{
+// .PUB = .{
+// .subject = subject,
+// .payload = payload,
+// .reply_to = reply_to,
+// },
+// };
+// }
+
+// pub fn hpub(alloc: Allocator, in: *Reader) !Message {
+// try in.discardAll(1); // throw away space
+
+// // Parse subject
+// const subject: []const u8 = try readSubject(alloc, in, .@"pub");
+// errdefer alloc.free(subject);
+
+// const States = enum {
+// before_second,
+// in_second,
+// after_second,
+// in_third,
+// after_third,
+// in_fourth,
+// in_end,
+// };
+
+// var second: ArrayList(u8) = .empty;
+// defer second.deinit(alloc);
+// var third: ArrayList(u8) = .empty;
+// defer third.deinit(alloc);
+// var fourth: ?ArrayList(u8) = null;
+// defer if (fourth) |*f| f.deinit(alloc);
+
+// sw: switch (@as(States, .before_second)) {
+// .before_second => {
+// // Drop whitespace
+// const byte = try in.peekByte();
+// if (isWhitespace(byte)) {
+// in.toss(1);
+// continue :sw .before_second;
+// }
+// continue :sw .in_second;
+// },
+// .in_second => {
+// for (1..in.buffer.len) |i| {
+// try in.fill(i + 1);
+// if (isWhitespace(in.buffered()[i])) {
+// @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]);
+// in.toss(i);
+// break;
+// }
+// } else return error.EndOfStream;
+// continue :sw .after_second;
+// },
+// .after_second => {
+// const byte = try in.peekByte();
+// if (byte == '\r') {
+// continue :sw .in_end;
+// } else if (isWhitespace(byte)) {
+// in.toss(1);
+// continue :sw .after_second;
+// }
+// third = .empty;
+// continue :sw .in_third;
+// },
+// .in_third => {
+// for (1..in.buffer.len) |i| {
+// try in.fill(i + 1);
+// if (isWhitespace(in.buffered()[i])) {
+// @memcpy(try third.addManyAsSlice(alloc, i), in.buffered()[0..i]);
+// in.toss(i);
+// break;
+// }
+// } else return error.EndOfStream;
+// continue :sw .after_third;
+// },
+// .after_third => {
+// const byte = try in.peekByte();
+// if (byte == '\r') {
+// continue :sw .in_end;
+// } else if (isWhitespace(byte)) {
+// in.toss(1);
+// continue :sw .after_third;
+// }
+// fourth = .empty;
+// continue :sw .in_fourth;
+// },
+// .in_fourth => {
+// for (1..in.buffer.len) |i| {
+// try in.fill(i + 1);
+// if (isWhitespace(in.buffered()[i])) {
+// @memcpy(try fourth.?.addManyAsSlice(alloc, i), in.buffered()[0..i]);
+// in.toss(i);
+// break;
+// }
+// } else return error.EndOfStream;
+// continue :sw .in_end;
+// },
+// .in_end => {
+// try expectStreamBytes(in, "\r\n");
+// },
+// }
+
+// const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize =
+// if (fourth) |f| .{
+// try alloc.dupe(u8, second.items),
+// try parseUnsigned(usize, third.items, 10),
+// try parseUnsigned(usize, f.items, 10),
+// } else .{
+// null,
+// try parseUnsigned(usize, second.items, 10),
+// try parseUnsigned(usize, third.items, 10),
+// };
+
+// const payload: Payload = try .read(alloc, in, total_bytes);
+// errdefer payload.deinit(alloc);
+// try expectStreamBytes(in, "\r\n");
+
+// return .{
+// .HPUB = .{
+// .header_bytes = header_bytes,
+// .@"pub" = .{
+// .subject = subject,
+// .payload = payload,
+// .reply_to = reply_to,
+// },
+// },
+// };
+// }
+
+// fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 {
+// var subject_list: ArrayList(u8) = .empty;
+// errdefer subject_list.deinit(alloc);
+
+// // Handle the first character
+// {
+// const byte = try in.takeByte();
+// if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>')))
+// return error.InvalidStream;
+
+// try subject_list.append(alloc, byte);
+// }
+
+// switch (pub_or_sub) {
+// .sub => {
+// while (in.takeByte()) |byte| {
+// if (isWhitespace(byte)) break;
+// if (byte == '.') {
+// const next_byte = try in.peekByte();
+// if (next_byte == '.' or isWhitespace(next_byte))
+// return error.InvalidStream;
+// } else if (byte == '>') {
+// const next_byte = try in.takeByte();
+// if (!isWhitespace(next_byte))
+// return error.InvalidStream;
+// } else if (byte == '*') {
+// const next_byte = try in.peekByte();
+// if (next_byte != '.' and !isWhitespace(next_byte))
+// return error.InvalidStream;
+// }
+// try subject_list.append(alloc, byte);
+// } else |err| return err;
+// },
+// .@"pub" => {
+// while (in.takeByte()) |byte| {
+// if (isWhitespace(byte)) break;
+// if (byte == '*' or byte == '>') return error.InvalidStream;
+// if (byte == '.') {
+// const next_byte = try in.peekByte();
+// if (next_byte == '.' or isWhitespace(next_byte))
+// return error.InvalidStream;
+// }
+// try subject_list.append(alloc, byte);
+// } else |err| return err;
+// },
+// }
+
+// return subject_list.toOwnedSlice(alloc);
+// }
+
+// inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void {
+// if (!std.mem.eql(u8, try reader.take(expected.len), expected)) {
+// @branchHint(.unlikely);
+// return error.InvalidStream;
+// }
+// }
+
+// test sub {
+// const alloc = std.testing.allocator;
+// const expectEqualDeep = std.testing.expectEqualDeep;
+// {
+// var in: Reader = .fixed(" foo 1\r\n");
+// var res = try sub(alloc, &in);
+// defer res.SUB.deinit(alloc);
+// try expectEqualDeep(
+// Message{
+// .SUB = .{
+// .subject = "foo",
+// .queue_group = null,
+// .sid = "1",
+// },
+// },
+// res,
+// );
+// }
+// {
+// var in: Reader = .fixed(" foo 1\r\n");
+// var res = try sub(alloc, &in);
+// defer res.SUB.deinit(alloc);
+// try expectEqualDeep(
+// Message{
+// .SUB = .{
+// .subject = "foo",
+// .queue_group = null,
+// .sid = "1",
+// },
+// },
+// res,
+// );
+// }
+// {
+// var in: Reader = .fixed(" foo q 1\r\n");
+// var res = try sub(alloc, &in);
+// defer res.SUB.deinit(alloc);
+// try expectEqualDeep(
+// Message{
+// .SUB = .{
+// .subject = "foo",
+// .queue_group = "q",
+// .sid = "1",
+// },
+// },
+// res,
+// );
+// }
+// {
+// var in: Reader = .fixed(" 1 q 1\r\n");
+// var res = try sub(alloc, &in);
+// defer res.SUB.deinit(alloc);
+// try expectEqualDeep(
+// Message{
+// .SUB = .{
+// .subject = "1",
+// .queue_group = "q",
+// .sid = "1",
+// },
+// },
+// res,
+// );
+// }
+// {
+// var in: Reader = .fixed(" $SRV.PING 4\r\n");
+// var res = try sub(alloc, &in);
+// defer res.SUB.deinit(alloc);
+// try expectEqualDeep(
+// Message{
+// .SUB = .{
+// .subject = "$SRV.PING",
+// .queue_group = null,
+// .sid = "4",
+// },
+// },
+// res,
+// );
+// }
+// {
+// var in: Reader = .fixed(" foo.echo q 10\r\n");
+// var res = try sub(alloc, &in);
+// defer res.SUB.deinit(alloc);
+// try expectEqualDeep(
+// Message{
+// .SUB = .{
+// .subject = "foo.echo",
+// .queue_group = "q",
+// .sid = "10",
+// },
+// },
+// res,
+// );
+// }
+// }
+
+// test unsub {
+// const alloc = std.testing.allocator;
+// const expectEqualDeep = std.testing.expectEqualDeep;
+// const expectEqual = std.testing.expectEqual;
+// {
+// var in: Reader = .fixed(" 1\r\n");
+// var res = try unsub(alloc, &in);
+// defer res.UNSUB.deinit(alloc);
+// try expectEqualDeep(
+// Message{
+// .UNSUB = .{
+// .sid = "1",
+// .max_msgs = null,
+// },
+// },
+// res,
+// );
+// try expectEqual(0, in.buffered().len);
+// }
+
+// {
+// var in: Reader = .fixed(" 1 1\r\n");
+// var res = try unsub(alloc, &in);
+// defer res.UNSUB.deinit(alloc);
+// try expectEqualDeep(
+// Message{
+// .UNSUB = .{
+// .sid = "1",
+// .max_msgs = 1,
+// },
+// },
+// res,
+// );
+// try expectEqual(0, in.buffered().len);
+// }
+// }
+
+// test @"pub" {
+// const alloc = std.testing.allocator;
+// const expectEqualDeep = std.testing.expectEqualDeep;
+// const expectEqual = std.testing.expectEqual;
+// {
+// var in: Reader = .fixed(" foo 3\r\nbar\r\n");
+// var res = try @"pub"(alloc, &in);
+// defer res.PUB.deinit(alloc);
+// try expectEqualDeep(
+// Message{
+// .PUB = .{
+// .subject = "foo",
+// .reply_to = null,
+// .payload = .{
+// .len = 3,
+// .short = blk: {
+// var s: [128]u8 = undefined;
+// @memcpy(s[0..3], "bar");
+// break :blk s;
+// },
+// .long = null,
+// },
+// },
+// },
+// res,
+// );
+// try expectEqual(0, in.buffered().len);
+// }
+
+// {
+// var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n");
+// var res = try @"pub"(alloc, &in);
+// defer res.PUB.deinit(alloc);
+// try expectEqualDeep(
+// Message{
+// .PUB = .{
+// .subject = "foo",
+// .reply_to = "reply.to",
+// .payload = .{
+// .len = 3,
+// .short = blk: {
+// var s: [128]u8 = undefined;
+// @memcpy(s[0..3], "bar");
+// break :blk s;
+// },
+// .long = null,
+// },
+// },
+// },
+// res,
+// );
+// try expectEqual(0, in.buffered().len);
+// }
+
+// // numeric reply subject
+// {
+// var in: Reader = .fixed(" foo 5 3\r\nbar\r\n");
+// var res = try @"pub"(alloc, &in);
+// defer res.PUB.deinit(alloc);
+// try expectEqualDeep(
+// Message{
+// .PUB = .{
+// .subject = "foo",
+// .reply_to = "5",
+// .payload = .{
+// .len = 3,
+// .short = blk: {
+// var s: [128]u8 = undefined;
+// @memcpy(s[0..3], "bar");
+// break :blk s;
+// },
+// .long = null,
+// },
+// },
+// },
+// res,
+// );
+// try expectEqual(0, in.buffered().len);
+// }
+// }
+
+// test hpub {
+// const alloc = std.testing.allocator;
+// const expectEqualDeep = std.testing.expectEqualDeep;
+// const expectEqual = std.testing.expectEqual;
+// {
+// var in: Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
+// var res = try hpub(alloc, &in);
+// defer res.HPUB.deinit(alloc);
+// try expectEqualDeep(
+// Message{
+// .HPUB = .{
+// .header_bytes = 22,
+// .@"pub" = .{
+// .subject = "foo",
+// .reply_to = null,
+// .payload = .{
+// .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len,
+// .short = blk: {
+// var s: [128]u8 = undefined;
+// const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!";
+// @memcpy(s[0..str.len], str);
+// break :blk s;
+// },
+// .long = null,
+// },
+// },
+// },
+// },
+// res,
+// );
+// try expectEqual(0, in.buffered().len);
+// }
+
+// {
+// var in: Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
+// var res = try hpub(alloc, &in);
+// defer res.HPUB.deinit(alloc);
+// try expectEqualDeep(
+// Message{
+// .HPUB = .{
+// .header_bytes = 22,
+// .@"pub" = .{
+// .subject = "foo",
+// .reply_to = "reply.to",
+// .payload = .{
+// .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len,
+// .short = blk: {
+// var s: [128]u8 = undefined;
+// const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!";
+// @memcpy(s[0..str.len], str);
+// break :blk s;
+// },
+// .long = null,
+// },
+// },
+// },
+// },
+// res,
+// );
+// try expectEqual(0, in.buffered().len);
+// }
+
+// {
+// var in: Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
+// var res = try hpub(alloc, &in);
+// defer res.HPUB.deinit(alloc);
+// try expectEqualDeep(
+// Message{
+// .HPUB = .{
+// .header_bytes = 22,
+// .@"pub" = .{
+// .subject = "foo",
+// .reply_to = "6",
+// .payload = .{
+// .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len,
+// .short = blk: {
+// var s: [128]u8 = undefined;
+// const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!";
+// @memcpy(s[0..str.len], str);
+// break :blk s;
+// },
+// .long = null,
+// },
+// },
+// },
+// },
+// res,
+// );
+// try expectEqual(0, in.buffered().len);
+// }
+// }
diff --git a/src/Server/serialize.zig b/src/Server/serialize.zig
new file mode 100644
index 0000000..b79beca
--- /dev/null
+++ b/src/Server/serialize.zig
@@ -0,0 +1,6 @@
+const std = @import("std");
+
+const message = @import("./message.zig");
+const Message = message.Message;
+
+pub fn message(msg: Message)