const std = @import("std"); const ArenaAllocator = std.heap.ArenaAllocator; const Allocator = std.mem.Allocator; const ArrayList = std.ArrayList; const Reader = std.Io.Reader; const Writer = std.Io.Writer; const AllocatingWriter = std.Io.Writer.Allocating; const StaticStringMap = std.StaticStringMap; const log = std.log.scoped(.zits); const isDigit = std.ascii.isDigit; const isUpper = std.ascii.isUpper; const isWhitespace = std.ascii.isWhitespace; const parseUnsigned = std.fmt.parseUnsigned; const message = @import("./message.zig"); const Message = message.Message; const Payload = @import("./Payload.zig"); pub const Error = error{ EndOfStream, ReadFailed, InvalidStream, }; const client_control = StaticStringMap(message.Control).initComptime( .{ // {"INFO", .info}, .{ @tagName(.CONNECT), .CONNECT }, .{ @tagName(.PUB), .PUB }, .{ @tagName(.HPUB), .HPUB }, .{ @tagName(.SUB), .SUB }, .{ @tagName(.UNSUB), .UNSUB }, // {"MSG", .msg}, // {"HMSG", .hmsg}, .{ @tagName(.PING), .PING }, .{ @tagName(.PONG), .PONG }, // {"+OK", .@"+ok"}, // {"-ERR", .@"-err"}, }, ); pub fn control(in: *Reader) Error!message.Control { const longest_ctrl = comptime blk: { var min_len = 0; for (client_control.keys()) |ctrl| { min_len = @max(ctrl.len, min_len); } break :blk min_len; }; std.debug.assert(in.buffer.len >= longest_ctrl); // Wait until at least the enough text to parse the shortest control value is available try in.fill(3); while (true) { var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); if (iter.next()) |str| { if (client_control.get(str)) |ctrl| { in.toss(str.len); return ctrl; } else if (str.len >= longest_ctrl) { return error.InvalidStream; } } log.debug("filling more in control.", .{}); try in.fillMore(); } } test control { { var buf: [7]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = "PUB " }, }); try std.testing.expectEqual(.PUB, try control(&in.interface)); try std.testing.expectEqualSlices(u8, " ", in.interface.buffered()); } { var buf: [7]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = "PUB" }, }); try std.testing.expectEqual(.PUB, try control(&in.interface)); try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); } { var buf: [7]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = "PU" }, .{ .buffer = "B" }, }); try std.testing.expectEqual(.PUB, try control(&in.interface)); try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); } { var buf: [7]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = "PIN" }, .{ .buffer = "G\r\n" }, }); try std.testing.expectEqual(.PING, try control(&in.interface)); try std.testing.expectEqualSlices(u8, "\r\n", in.interface.buffered()); } { var buf: [7]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = "CONNECT" }, }); try std.testing.expectEqual(.CONNECT, try control(&in.interface)); } { var buf: [7]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = "CONNECC" }, }); try std.testing.expectError(error.InvalidStream, control(&in.interface)); } } /// The return value is owned by the reader passed to this function. /// Operations that modify the readers buffer invalidates this value. pub fn @"pub"(in: *Reader) Error!Message.Pub { // TODO: Add pedantic option. // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1 while (true) { var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); if (iter.next()) |subject| { if (iter.next()) |second| { if (in.buffered().len > iter.index) { if (in.buffered()[iter.index] == '\r') { const bytes_str = second; const bytes = parseUnsigned(usize, bytes_str, 10) catch return error.InvalidStream; if (in.buffered().len < iter.index + bytes + 4) { try in.fill(iter.index + bytes + 4); // Fill may shift buffer, so we have to retokenize it. continue; } // 4 bytes for CRLF on either side of the payload. in.toss(iter.index + 2); defer in.toss(2); return .{ .subject = subject, .reply_to = null, .payload = in.take(bytes) catch unreachable, }; } const reply_to = second; if (iter.next()) |bytes_str| { const bytes = parseUnsigned(usize, bytes_str, 10) catch return error.InvalidStream; if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') { if (in.buffered().len < iter.index + bytes + 4) { try in.fill(iter.index + bytes + 4); // Fill may shift buffer, so we have to retokenize it. continue; } // 4 bytes for CRLF on either side of the payload. in.toss(iter.index + 2); defer in.toss(2); return .{ .subject = subject, .reply_to = reply_to, .payload = in.take(bytes) catch unreachable, }; } } } } } try in.fillMore(); } } test @"pub" { { var buf: [64]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = "foo bar 2\r\nhi\r\n" }, }); try std.testing.expectEqualDeep( Message.Pub{ .subject = "foo", .reply_to = "bar", .payload = "hi", }, try @"pub"(&in.interface), ); try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); } { var buf: [64]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = "foo 2\r\nhi\r\n" }, }); try std.testing.expectEqualDeep( Message.Pub{ .subject = "foo", .reply_to = null, .payload = "hi", }, try @"pub"(&in.interface), ); try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); } { var buf: [64]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = "foo " }, .{ .buffer = "2\r\nhi\r\n" }, }); try std.testing.expectEqualDeep( Message.Pub{ .subject = "foo", .reply_to = null, .payload = "hi", }, try @"pub"(&in.interface), ); try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); } { var buf: [64]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = "foo " }, .{ .buffer = "2" }, .{ .buffer = "\r\nhi\r\n " }, }); try std.testing.expectEqualDeep( Message.Pub{ .subject = "foo", .reply_to = null, .payload = "hi", }, try @"pub"(&in.interface), ); try std.testing.expectEqualSlices(u8, " ", in.interface.buffered()); } { var buf: [64]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = "foo " }, .{ .buffer = "2" }, .{ .buffer = "\r\nhi\r" }, .{ .buffer = "\n " }, }); try std.testing.expectEqualDeep( Message.Pub{ .subject = "foo", .reply_to = null, .payload = "hi", }, try @"pub"(&in.interface), ); try std.testing.expectEqualSlices(u8, " ", in.interface.buffered()); } } /// The return value is owned by the reader passed to this function. /// Operations that modify the readers buffer invalidates this value. pub fn sub(in: *Reader) Error!Message.Sub { // TODO: Add pedantic option. // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1 while (true) { var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); if (iter.next()) |subject| { if (iter.next()) |second| { if (in.buffered().len > iter.index) { if (in.buffered()[iter.index] == '\r') { const sid = second; if (in.buffered().len < iter.index + 2) { try in.fill(iter.index + 2); // Fill may shift buffer, so we have to retokenize it. continue; } // 2 bytes for CRLF at the end. in.toss(iter.index + 2); return .{ .subject = subject, .queue_group = null, .sid = sid, }; } const queue_group = second; if (iter.next()) |sid| { if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') { if (in.buffered().len < iter.index + 2) { try in.fill(iter.index + 2); // Fill may shift buffer, so we have to retokenize it. continue; } // 2 bytes for CRLF at the end. in.toss(iter.index + 2); return .{ .subject = subject, .queue_group = queue_group, .sid = sid, }; } } } } } try in.fillMore(); } } test sub { { var buf: [64]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = "foo q 1\r\n" }, }); try std.testing.expectEqualDeep( Message.Sub{ .subject = "foo", .queue_group = "q", .sid = "1", }, try sub(&in.interface), ); } { var buf: [64]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = "foo 1\r\n" }, }); try std.testing.expectEqualDeep( Message.Sub{ .subject = "foo", .queue_group = null, .sid = "1", }, try sub(&in.interface), ); } { var buf: [64]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = "foo " }, .{ .buffer = " 1\r\n" }, }); try std.testing.expectEqualDeep( Message.Sub{ .subject = "foo", .queue_group = null, .sid = "1", }, try sub(&in.interface), ); } { var buf: [64]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = "foo " }, .{ .buffer = " 1\r" }, .{ .buffer = "\n" }, }); try std.testing.expectEqualDeep( Message.Sub{ .subject = "foo", .queue_group = null, .sid = "1", }, try sub(&in.interface), ); } } /// The return value is owned by the reader passed to this function. /// Operations that modify the readers buffer invalidates this value. pub fn unsub(in: *Reader) Error!Message.Unsub { // TODO: Add pedantic option. // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1 while (true) { var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r\n"); if (iter.next()) |sid| { if (in.buffered().len > iter.index) { if (in.buffered()[iter.index] == '\r') { if (in.buffered().len < iter.index + 2) { try in.fill(iter.index + 2); // Fill may shift buffer, so we have to retokenize it. continue; } // 2 bytes for CRLF at the end. in.toss(iter.index + 2); return .{ .sid = sid, .max_msgs = null, }; } if (iter.next()) |max_msgs_str| { if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') { const max_msgs = parseUnsigned(usize, max_msgs_str, 10) catch return error.InvalidStream; if (in.buffered().len < iter.index + 2) { try in.fill(iter.index + 2); // Fill may shift buffer, so we have to retokenize it. continue; } // 2 bytes for CRLF at the end. in.toss(iter.index + 2); return .{ .sid = sid, .max_msgs = max_msgs, }; } } } } in.fillMore() catch |err| switch (err) { error.EndOfStream => { iter.reset(); const sid = iter.next() orelse return error.EndOfStream; const max_msgs = if (iter.next()) |max_msgs_str| blk: { log.debug("max_msgs: {any}", .{max_msgs_str}); break :blk parseUnsigned(usize, max_msgs_str, 10) catch return error.InvalidStream; } else null; return .{ .sid = sid, .max_msgs = max_msgs, }; }, else => |e| return e, }; } } test unsub { { var buf: [64]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = "foo 1\r\n" }, }); try std.testing.expectEqualDeep( Message.Unsub{ .sid = "foo", .max_msgs = 1, }, try unsub(&in.interface), ); } { var buf: [64]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = "foo\r\n" }, }); try std.testing.expectEqualDeep( Message.Unsub{ .sid = "foo", .max_msgs = null, }, try unsub(&in.interface), ); } { var buf: [64]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = "foo " }, .{ .buffer = " 1\r\n" }, }); try std.testing.expectEqualDeep( Message.Unsub{ .sid = "foo", .max_msgs = 1, }, try unsub(&in.interface), ); } { var buf: [64]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = "foo " }, .{ .buffer = " 1\r" }, .{ .buffer = "\n" }, }); try std.testing.expectEqualDeep( Message.Unsub{ .sid = "foo", .max_msgs = 1, }, try unsub(&in.interface), ); } { var buf: [64]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = " 1\r" }, .{ .buffer = "\n" }, }); try std.testing.expectEqualDeep( Message.Unsub{ .sid = "1", .max_msgs = null, }, try unsub(&in.interface), ); } } /// The return value is owned by the reader passed to this function. /// Operations that modify the readers buffer invalidates this value. pub fn hpub(in: *Reader) Error!Message.HPub { // TODO: Add pedantic option. // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1 while (true) { var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); if (iter.next()) |subject| { if (iter.next()) |second| { if (iter.next()) |third| { if (in.buffered().len > iter.index) { if (in.buffered()[iter.index] == '\r') { const header_bytes_str = second; const total_bytes_str = third; const header_bytes = parseUnsigned(usize, header_bytes_str, 10) catch return error.InvalidStream; const total_bytes = parseUnsigned(usize, total_bytes_str, 10) catch return error.InvalidStream; if (in.buffered().len < iter.index + total_bytes + 4) { try in.fill(iter.index + total_bytes + 4); continue; } // 4 bytes for CRLF on either side of headers and payload. in.toss(iter.index + 2); defer in.toss(2); return .{ .header_bytes = header_bytes, .@"pub" = .{ .subject = subject, .reply_to = null, .payload = in.take(total_bytes) catch unreachable, }, }; } } const reply_to = second; const header_bytes_str = third; if (iter.next()) |total_bytes_str| { if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') { const header_bytes = parseUnsigned(usize, header_bytes_str, 10) catch return error.InvalidStream; const total_bytes = parseUnsigned(usize, total_bytes_str, 10) catch return error.InvalidStream; if (in.buffered().len < iter.index + total_bytes + 4) { try in.fill(iter.index + total_bytes + 4); continue; } // 4 bytes for CRLF on either side of headers and payload. in.toss(iter.index + 2); defer in.toss(2); return .{ .header_bytes = header_bytes, .@"pub" = .{ .subject = subject, .reply_to = reply_to, .payload = in.take(total_bytes) catch unreachable, }, }; } } } } } try in.fillMore(); } } test hpub { { var buf: [64]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = " foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n" }, }); try std.testing.expectEqualDeep( Message.HPub{ .header_bytes = 22, .@"pub" = .{ .subject = "foo", .reply_to = null, .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!", }, }, try hpub(&in.interface), ); try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); } { var buf: [64]u8 = undefined; var in: std.testing.Reader = .init(&buf, &.{ .{ .buffer = " foo reply 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n" }, }); try std.testing.expectEqualDeep( Message.HPub{ .header_bytes = 22, .@"pub" = .{ .subject = "foo", .reply_to = "reply", .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!", }, }, try hpub(&in.interface), ); try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); } // TODO: more tests } pub fn connect(alloc: Allocator, in: *Reader) (error{OutOfMemory} || Error)!Message.Connect { // for storing the json string var connect_string_writer_allocating: AllocatingWriter = .init(alloc); defer connect_string_writer_allocating.deinit(); var connect_string_writer = &connect_string_writer_allocating.writer; // for parsing the json string var connect_arena_allocator: ArenaAllocator = .init(alloc); defer connect_arena_allocator.deinit(); const connect_allocator = connect_arena_allocator.allocator(); try in.discardAll(1); // throw away space // Should read the next JSON object to the fixed buffer writer. _ = in.streamDelimiter(connect_string_writer, '}') catch |err| switch (err) { error.WriteFailed => return error.OutOfMemory, else => |e| return e, }; connect_string_writer.writeByte('}') catch return error.OutOfMemory; try expectStreamBytes(in, "}\r\n"); // discard '}\r\n' const connect_str = try connect_string_writer_allocating.toOwnedSlice(); defer alloc.free(connect_str); const res = std.json.parseFromSliceLeaky( Message.Connect, connect_allocator, connect_str, .{ .allocate = .alloc_always }, ) catch return error.InvalidStream; return res.dupe(alloc); } inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void { if (!std.mem.eql(u8, try reader.take(expected.len), expected)) { @branchHint(.unlikely); return error.InvalidStream; } }