summaryrefslogtreecommitdiff
path: root/src/Server/parse.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/Server/parse.zig')
-rw-r--r--src/Server/parse.zig384
1 files changed, 327 insertions, 57 deletions
diff --git a/src/Server/parse.zig b/src/Server/parse.zig
index 6e013c4..9035311 100644
--- a/src/Server/parse.zig
+++ b/src/Server/parse.zig
@@ -16,8 +16,8 @@ const isWhitespace = std.ascii.isWhitespace;
const parseUnsigned = std.fmt.parseUnsigned;
const message = @import("./message.zig");
-pub const Message = message.Message;
-pub const Payload = @import("./Payload.zig");
+const Message = message.Message;
+const Payload = @import("./Payload.zig");
const client_control = StaticStringMap(message.Control).initComptime(
.{
@@ -44,8 +44,10 @@ pub fn control(in: *Reader) !message.Control {
}
break :blk min_len;
};
+ // log.debug("buffered: '{s}'", .{in.buffered()});
std.debug.assert(in.buffer.len >= longest_ctrl);
while (true) {
+ // log.debug("buffered l: '{s}'", .{in.buffered()});
var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r");
if (iter.next()) |str| {
if (client_control.get(str)) |ctrl| {
@@ -121,32 +123,39 @@ pub fn @"pub"(in: *Reader) !Message.Pub {
if (iter.next()) |subject| {
if (iter.next()) |second| {
- if (in.buffered()[iter.index] == '\r') {
- const bytes_str = second;
- const bytes = try parseUnsigned(usize, bytes_str, 10);
+ if (in.buffered().len > iter.index) {
+ if (in.buffered()[iter.index] == '\r') {
+ const bytes_str = second;
+ const bytes = try parseUnsigned(usize, bytes_str, 10);
- // 4 bytes for CRLF on either side of the payload.
- in.toss(iter.index + bytes + 4);
- return .{
- .subject = subject,
- .reply_to = null,
- .payload = iter.rest()[1 .. 1 + bytes],
- };
- }
+ // 4 bytes for CRLF on either side of the payload.
+ _ = try in.take(iter.index + 2);
+ defer {
+ _ = in.take(2) catch {
+ log.warn("very bad parsing issue", .{});
+ };
+ }
+ return .{
+ .subject = subject,
+ .reply_to = null,
+ .payload = try in.take(bytes),
+ };
+ }
- const reply_to = second;
- if (iter.next()) |bytes_str| {
- const bytes = try parseUnsigned(usize, bytes_str, 10);
+ const reply_to = second;
+ if (iter.next()) |bytes_str| {
+ const bytes = try parseUnsigned(usize, bytes_str, 10);
- if (in.buffered()[iter.index] == '\r') {
- if (iter.rest().len > bytes) {
- // 4 bytes for CRLF on either side of the payload.
- in.toss(iter.index + bytes + 4);
- return .{
- .subject = subject,
- .reply_to = reply_to,
- .payload = iter.rest()[1 .. 1 + bytes],
- };
+ if (in.buffered()[iter.index] == '\r') {
+ if (iter.rest().len > bytes + 2) {
+ // 4 bytes for CRLF on either side of the payload.
+ _ = try in.take(iter.index + bytes + 4);
+ return .{
+ .subject = subject,
+ .reply_to = reply_to,
+ .payload = iter.rest()[1 .. 1 + bytes],
+ };
+ }
}
}
}
@@ -188,8 +197,269 @@ test @"pub" {
);
try std.testing.expectEqualSlices(u8, "", in.interface.buffered());
}
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo " },
+ .{ .buffer = "2\r\nhi\r\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Pub{
+ .subject = "foo",
+ .reply_to = null,
+ .payload = "hi",
+ },
+ try @"pub"(&in.interface),
+ );
+ try std.testing.expectEqualSlices(u8, "", in.interface.buffered());
+ }
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo " },
+ .{ .buffer = "2" },
+ .{ .buffer = "\r\nhi\r\n " },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Pub{
+ .subject = "foo",
+ .reply_to = null,
+ .payload = "hi",
+ },
+ try @"pub"(&in.interface),
+ );
+ try std.testing.expectEqualSlices(u8, " ", in.interface.buffered());
+ }
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo " },
+ .{ .buffer = "2" },
+ .{ .buffer = "\r\nhi\r" },
+ .{ .buffer = "\n " },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Pub{
+ .subject = "foo",
+ .reply_to = null,
+ .payload = "hi",
+ },
+ try @"pub"(&in.interface),
+ );
+ try std.testing.expectEqualSlices(u8, " ", in.interface.buffered());
+ }
}
+/// The return value is owned by the reader passed to this function.
+/// Operations that modify the readers buffer invalidates this value.
+pub fn sub(in: *Reader) !Message.Sub {
+ // TODO: Add pedantic option.
+ // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1
+
+ while (true) {
+ var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r");
+
+ if (iter.next()) |subject| {
+ if (iter.next()) |second| {
+ if (in.buffered().len > iter.index) {
+ if (in.buffered()[iter.index] == '\r') {
+ const sid = second;
+
+ // 2 bytes for CRLF at the end.
+ _ = try in.take(iter.index + 2);
+ return .{
+ .subject = subject,
+ .queue_group = null,
+ .sid = sid,
+ };
+ }
+
+ const queue_group = second;
+ if (iter.next()) |sid| {
+ if (in.buffered()[iter.index] == '\r') {
+ // 2 bytes for CRLF at the end.
+ _ = try in.take(iter.index + 2);
+ return .{
+ .subject = subject,
+ .queue_group = queue_group,
+ .sid = sid,
+ };
+ }
+ }
+ }
+ }
+ }
+
+ try in.fillMore();
+ }
+}
+
+test sub {
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo q 1\r\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Sub{
+ .subject = "foo",
+ .queue_group = "q",
+ .sid = "1",
+ },
+ try sub(&in.interface),
+ );
+ }
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo 1\r\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Sub{
+ .subject = "foo",
+ .queue_group = null,
+ .sid = "1",
+ },
+ try sub(&in.interface),
+ );
+ }
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo " },
+ .{ .buffer = " 1\r\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Sub{
+ .subject = "foo",
+ .queue_group = null,
+ .sid = "1",
+ },
+ try sub(&in.interface),
+ );
+ }
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo " },
+ .{ .buffer = " 1\r" },
+ .{ .buffer = "\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Sub{
+ .subject = "foo",
+ .queue_group = null,
+ .sid = "1",
+ },
+ try sub(&in.interface),
+ );
+ }
+}
+
+/// The return value is owned by the reader passed to this function.
+/// Operations that modify the readers buffer invalidates this value.
+pub fn unsub(in: *Reader) !Message.Unsub {
+ // TODO: Add pedantic option.
+ // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1
+
+ while (true) {
+ var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r");
+
+ if (iter.next()) |sid| {
+ if (in.buffered()[iter.index] == '\r') {
+ // 2 bytes for CRLF at the end.
+ _ = try in.take(iter.index + 2);
+ return .{
+ .sid = sid,
+ .max_msgs = null,
+ };
+ }
+ if (iter.next()) |max_msgs_str| {
+ if (in.buffered()[iter.index] == '\r') {
+ const max_msgs = try parseUnsigned(usize, max_msgs_str, 10);
+ // 2 bytes for CRLF at the end.
+ _ = try in.take(iter.index + 2);
+ return .{
+ .sid = sid,
+ .max_msgs = max_msgs,
+ };
+ }
+ }
+ }
+
+ try in.fillMore();
+ }
+}
+
+test unsub {
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo 1\r\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Unsub{
+ .sid = "foo",
+ .max_msgs = 1,
+ },
+ try unsub(&in.interface),
+ );
+ }
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo\r\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Unsub{
+ .sid = "foo",
+ .max_msgs = null,
+ },
+ try unsub(&in.interface),
+ );
+ }
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo " },
+ .{ .buffer = " 1\r\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Unsub{
+ .sid = "foo",
+ .max_msgs = 1,
+ },
+ try unsub(&in.interface),
+ );
+ }
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo " },
+ .{ .buffer = " 1\r" },
+ .{ .buffer = "\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Unsub{
+ .sid = "foo",
+ .max_msgs = 1,
+ },
+ try unsub(&in.interface),
+ );
+ }
+}
+
+/// The return value is owned by the reader passed to this function.
+/// Operations that modify the readers buffer invalidates this value.
+pub fn hpub(in: *Reader) !Message.HPub {
+ // TODO: Add pedantic option.
+ // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1
+ _ = in;
+ @compileError("TODO");
+}
+
+test hpub {}
+
// /// Get the next Message from the input stream.
// pub fn next(alloc: Allocator, in: *Reader) !Message {
// errdefer log.err("Failed to parse {s}", .{operation_string.items});
@@ -218,35 +488,35 @@ test @"pub" {
// }
// }
-// pub fn connect(alloc: Allocator, in: *Reader) !Message {
-// // for storing the json string
-// var connect_string_writer_allocating: AllocatingWriter = .init(alloc);
-// defer connect_string_writer_allocating.deinit();
-// var connect_string_writer = &connect_string_writer_allocating.writer;
+pub fn connect(alloc: Allocator, in: *Reader) !Message.Connect {
+ // for storing the json string
+ var connect_string_writer_allocating: AllocatingWriter = .init(alloc);
+ defer connect_string_writer_allocating.deinit();
+ var connect_string_writer = &connect_string_writer_allocating.writer;
-// // for parsing the json string
-// var connect_arena_allocator: ArenaAllocator = .init(alloc);
-// defer connect_arena_allocator.deinit();
-// const connect_allocator = connect_arena_allocator.allocator();
+ // for parsing the json string
+ var connect_arena_allocator: ArenaAllocator = .init(alloc);
+ defer connect_arena_allocator.deinit();
+ const connect_allocator = connect_arena_allocator.allocator();
-// try in.discardAll(1); // throw away space
+ try in.discardAll(1); // throw away space
-// // Should read the next JSON object to the fixed buffer writer.
-// _ = try in.streamDelimiter(connect_string_writer, '}');
-// try connect_string_writer.writeByte('}');
-// try expectStreamBytes(in, "}\r\n"); // discard '}\r\n'
-
-// const connect_str = try connect_string_writer_allocating.toOwnedSlice();
-// defer alloc.free(connect_str);
-// const res = try std.json.parseFromSliceLeaky(
-// Message.Connect,
-// connect_allocator,
-// connect_str,
-// .{ .allocate = .alloc_always },
-// );
-
-// return .{ .CONNECT = try res.dupe(alloc) };
-// }
+ // Should read the next JSON object to the fixed buffer writer.
+ _ = try in.streamDelimiter(connect_string_writer, '}');
+ try connect_string_writer.writeByte('}');
+ try expectStreamBytes(in, "}\r\n"); // discard '}\r\n'
+
+ const connect_str = try connect_string_writer_allocating.toOwnedSlice();
+ defer alloc.free(connect_str);
+ const res = try std.json.parseFromSliceLeaky(
+ Message.Connect,
+ connect_allocator,
+ connect_str,
+ .{ .allocate = .alloc_always },
+ );
+
+ return res.dupe(alloc);
+}
// pub fn sub(alloc: Allocator, in: *Reader) !Message {
// try in.discardAll(1); // throw away space
@@ -647,12 +917,12 @@ test @"pub" {
// return subject_list.toOwnedSlice(alloc);
// }
-// inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void {
-// if (!std.mem.eql(u8, try reader.take(expected.len), expected)) {
-// @branchHint(.unlikely);
-// return error.InvalidStream;
-// }
-// }
+inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void {
+ if (!std.mem.eql(u8, try reader.take(expected.len), expected)) {
+ @branchHint(.unlikely);
+ return error.InvalidStream;
+ }
+}
// test sub {
// const alloc = std.testing.allocator;