diff options
Diffstat (limited to 'src/Server/parse.zig')
| -rw-r--r-- | src/Server/parse.zig | 835 |
1 files changed, 835 insertions, 0 deletions
diff --git a/src/Server/parse.zig b/src/Server/parse.zig new file mode 100644 index 0000000..d58c0e5 --- /dev/null +++ b/src/Server/parse.zig @@ -0,0 +1,835 @@ +const std = @import("std"); +const ArenaAllocator = std.heap.ArenaAllocator; +const Allocator = std.mem.Allocator; +const ArrayList = std.ArrayList; +const Reader = std.Io.Reader; +const Writer = std.Io.Writer; +const AllocatingWriter = std.Io.Writer.Allocating; +const StaticStringMap = std.StaticStringMap; + +const log = std.log; + +const isDigit = std.ascii.isDigit; +const isUpper = std.ascii.isUpper; +const isWhitespace = std.ascii.isWhitespace; + +const parseUnsigned = std.fmt.parseUnsigned; + +const message = @import("./parse/message.zig"); +pub const Message = message.Message; +pub const Payload = @import("./parse/Payload.zig"); + +const client_types = StaticStringMap(message.Control).initComptime( + .{ + // {"INFO", .info}, + .{ @tagName(.CONNECT), .CONNECT }, + .{ @tagName(.PUB), .PUB }, + .{ @tagName(.HPUB), .HPUB }, + .{ @tagName(.SUB), .SUB }, + .{ @tagName(.UNSUB), .UNSUB }, + // {"MSG", .msg}, + // {"HMSG", .hmsg}, + .{ @tagName(.PING), .PING }, + .{ @tagName(.PONG), .PONG }, + // {"+OK", .@"+ok"}, + // {"-ERR", .@"-err"}, + }, +); +fn parseStaticStringMap(input: []const u8) ?message.Control { + return client_types.get(input); +} + +/// Parse a string into its associated MessageType. +const parse = parseStaticStringMap; + +/// Get the next Message from the input stream. +pub fn next(alloc: Allocator, in: *Reader) !Message { + var operation_string: ArrayList(u8) = blk: { + comptime var buf_len = 0; + comptime { + for (client_types.keys()) |key| { + buf_len = @max(buf_len, key.len); + } + } + var buf: [buf_len]u8 = undefined; + break :blk .initBuffer(&buf); + }; + + while (in.peekByte()) |byte| { + if (isUpper(byte)) { + try operation_string.appendBounded(byte); + in.toss(1); + } else break; + } else |err| return err; + + const operation = parse(operation_string.items) orelse { + log.err("Invalid operation: '{s}'", .{operation_string.items}); + return error.InvalidOperation; + }; + + errdefer log.err("Failed to parse {s}", .{operation_string.items}); + + switch (operation) { + .CONNECT => return connect(alloc, in), + .PUB => { + @branchHint(.likely); + return @"pub"(alloc, in); + }, + .HPUB => { + @branchHint(.likely); + return hpub(alloc, in); + }, + .PING => { + try expectStreamBytes(in, "\r\n"); + return .PING; + }, + .PONG => { + try expectStreamBytes(in, "\r\n"); + return .PONG; + }, + .SUB => return sub(alloc, in), + .UNSUB => return unsub(alloc, in), + else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}), + } +} + +pub fn connect(alloc: Allocator, in: *Reader) !Message { + // for storing the json string + var connect_string_writer_allocating: AllocatingWriter = .init(alloc); + defer connect_string_writer_allocating.deinit(); + var connect_string_writer = &connect_string_writer_allocating.writer; + + // for parsing the json string + var connect_arena_allocator: ArenaAllocator = .init(alloc); + defer connect_arena_allocator.deinit(); + const connect_allocator = connect_arena_allocator.allocator(); + + try in.discardAll(1); // throw away space + + // Should read the next JSON object to the fixed buffer writer. + _ = try in.streamDelimiter(connect_string_writer, '}'); + try connect_string_writer.writeByte('}'); + try expectStreamBytes(in, "}\r\n"); // discard '}\r\n' + + const connect_str = try connect_string_writer_allocating.toOwnedSlice(); + defer alloc.free(connect_str); + const res = try std.json.parseFromSliceLeaky( + Message.Connect, + connect_allocator, + connect_str, + .{ .allocate = .alloc_always }, + ); + + return .{ .CONNECT = try res.dupe(alloc) }; +} + +pub fn sub(alloc: Allocator, in: *Reader) !Message { + try in.discardAll(1); // throw away space + const subject = try readSubject(alloc, in, .sub); + + const States = enum { + before_second, + in_second, + after_second, + in_third, + in_end, + }; + + var second: ArrayList(u8) = .empty; + errdefer second.deinit(alloc); + var third: ?ArrayList(u8) = null; + errdefer if (third) |*t| t.deinit(alloc); + + sw: switch (@as(States, .before_second)) { + .before_second => { + const byte = try in.peekByte(); + if (isWhitespace(byte)) { + in.toss(1); + continue :sw .before_second; + } + continue :sw .in_second; + }, + .in_second => { + for (1..in.buffer.len) |i| { + try in.fill(i + 1); + if (isWhitespace(in.buffered()[i])) { + @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); + in.toss(i); + break; + } + } else return error.EndOfStream; + continue :sw .after_second; + }, + .after_second => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (isWhitespace(byte)) { + in.toss(1); + continue :sw .after_second; + } + third = .empty; + continue :sw .in_third; + }, + .in_third => { + for (1..in.buffer.len) |i| { + try in.fill(i + 1); + if (isWhitespace(in.buffered()[i])) { + @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); + in.toss(i); + break; + } + } else return error.EndOfStream; + continue :sw .in_end; + }, + .in_end => { + try expectStreamBytes(in, "\r\n"); + }, + } + + return .{ + .SUB = .{ + .subject = subject, + .queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null, + .sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc), + }, + }; +} + +pub fn unsub(alloc: Allocator, in: *Reader) !Message { + const States = enum { + before_first, + in_first, + after_first, + in_second, + in_end, + }; + + var first: ArrayList(u8) = .empty; + errdefer first.deinit(alloc); + var second: ?ArrayList(u8) = null; + defer if (second) |*s| s.deinit(alloc); + + sw: switch (@as(States, .before_first)) { + .before_first => { + const byte = try in.peekByte(); + if (isWhitespace(byte)) { + in.toss(1); + continue :sw .before_first; + } + continue :sw .in_first; + }, + .in_first => { + const byte = try in.peekByte(); + if (!isWhitespace(byte)) { + try first.append(alloc, byte); + in.toss(1); + continue :sw .in_first; + } + continue :sw .after_first; + }, + .after_first => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (isWhitespace(byte)) { + in.toss(1); + continue :sw .after_first; + } + second = .empty; + continue :sw .in_second; + }, + .in_second => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } + try second.?.append(alloc, byte); + in.toss(1); + continue :sw .in_second; + }, + .in_end => { + try expectStreamBytes(in, "\r\n"); + }, + } + + return .{ + .UNSUB = .{ + .sid = try first.toOwnedSlice(alloc), + .max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null, + }, + }; +} + +pub fn @"pub"(alloc: Allocator, in: *Reader) !Message { + try in.discardAll(1); // throw away space + + // Parse subject + const subject: []const u8 = try readSubject(alloc, in, .@"pub"); + errdefer alloc.free(subject); + + const States = enum { + before_second, + in_second, + after_second, + in_third, + in_end, + }; + + var second: ArrayList(u8) = .empty; + defer second.deinit(alloc); + var third: ?ArrayList(u8) = null; + defer if (third) |*t| t.deinit(alloc); + + sw: switch (@as(States, .before_second)) { + .before_second => { + // Drop whitespace + const byte = try in.peekByte(); + if (isWhitespace(byte)) { + in.toss(1); + continue :sw .before_second; + } + continue :sw .in_second; + }, + .in_second => { + for (1..in.buffer.len) |i| { + try in.fill(i + 1); + if (isWhitespace(in.buffered()[i])) { + @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); + in.toss(i); + break; + } + } else return error.EndOfStream; + continue :sw .after_second; + }, + .after_second => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (isWhitespace(byte)) { + in.toss(1); + continue :sw .after_second; + } + third = .empty; + continue :sw .in_third; + }, + .in_third => { + for (1..in.buffer.len) |i| { + try in.fill(i + 1); + if (isWhitespace(in.buffered()[i])) { + @memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); + in.toss(i); + break; + } + } else return error.EndOfStream; + continue :sw .in_end; + }, + .in_end => { + try expectStreamBytes(in, "\r\n"); + }, + } + + const reply_to: ?[]const u8, const bytes: usize = + if (third) |t| .{ + try alloc.dupe(u8, second.items), + try parseUnsigned(usize, t.items, 10), + } else .{ + null, + try parseUnsigned(usize, second.items, 10), + }; + + const payload: Payload = try .read(alloc, in, bytes); + errdefer payload.deinit(alloc); + try expectStreamBytes(in, "\r\n"); + + return .{ + .PUB = .{ + .subject = subject, + .payload = payload, + .reply_to = reply_to, + }, + }; +} + +pub fn hpub(alloc: Allocator, in: *Reader) !Message { + try in.discardAll(1); // throw away space + + // Parse subject + const subject: []const u8 = try readSubject(alloc, in, .@"pub"); + errdefer alloc.free(subject); + + const States = enum { + before_second, + in_second, + after_second, + in_third, + after_third, + in_fourth, + in_end, + }; + + var second: ArrayList(u8) = .empty; + defer second.deinit(alloc); + var third: ArrayList(u8) = .empty; + defer third.deinit(alloc); + var fourth: ?ArrayList(u8) = null; + defer if (fourth) |*f| f.deinit(alloc); + + sw: switch (@as(States, .before_second)) { + .before_second => { + // Drop whitespace + const byte = try in.peekByte(); + if (isWhitespace(byte)) { + in.toss(1); + continue :sw .before_second; + } + continue :sw .in_second; + }, + .in_second => { + for (1..in.buffer.len) |i| { + try in.fill(i + 1); + if (isWhitespace(in.buffered()[i])) { + @memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]); + in.toss(i); + break; + } + } else return error.EndOfStream; + continue :sw .after_second; + }, + .after_second => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (isWhitespace(byte)) { + in.toss(1); + continue :sw .after_second; + } + third = .empty; + continue :sw .in_third; + }, + .in_third => { + for (1..in.buffer.len) |i| { + try in.fill(i + 1); + if (isWhitespace(in.buffered()[i])) { + @memcpy(try third.addManyAsSlice(alloc, i), in.buffered()[0..i]); + in.toss(i); + break; + } + } else return error.EndOfStream; + continue :sw .after_third; + }, + .after_third => { + const byte = try in.peekByte(); + if (byte == '\r') { + continue :sw .in_end; + } else if (isWhitespace(byte)) { + in.toss(1); + continue :sw .after_third; + } + fourth = .empty; + continue :sw .in_fourth; + }, + .in_fourth => { + for (1..in.buffer.len) |i| { + try in.fill(i + 1); + if (isWhitespace(in.buffered()[i])) { + @memcpy(try fourth.?.addManyAsSlice(alloc, i), in.buffered()[0..i]); + in.toss(i); + break; + } + } else return error.EndOfStream; + continue :sw .in_end; + }, + .in_end => { + try expectStreamBytes(in, "\r\n"); + }, + } + + const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize = + if (fourth) |f| .{ + try alloc.dupe(u8, second.items), + try parseUnsigned(usize, third.items, 10), + try parseUnsigned(usize, f.items, 10), + } else .{ + null, + try parseUnsigned(usize, second.items, 10), + try parseUnsigned(usize, third.items, 10), + }; + + const payload: Payload = try .read(alloc, in, total_bytes); + errdefer payload.deinit(alloc); + try expectStreamBytes(in, "\r\n"); + + return .{ + .HPUB = .{ + .header_bytes = header_bytes, + .@"pub" = .{ + .subject = subject, + .payload = payload, + .reply_to = reply_to, + }, + }, + }; +} + +fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 { + var subject_list: ArrayList(u8) = .empty; + errdefer subject_list.deinit(alloc); + + // Handle the first character + { + const byte = try in.takeByte(); + if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>'))) + return error.InvalidStream; + + try subject_list.append(alloc, byte); + } + + switch (pub_or_sub) { + .sub => { + while (in.takeByte()) |byte| { + if (isWhitespace(byte)) break; + if (byte == '.') { + const next_byte = try in.peekByte(); + if (next_byte == '.' or isWhitespace(next_byte)) + return error.InvalidStream; + } else if (byte == '>') { + const next_byte = try in.takeByte(); + if (!isWhitespace(next_byte)) + return error.InvalidStream; + } else if (byte == '*') { + const next_byte = try in.peekByte(); + if (next_byte != '.' and !isWhitespace(next_byte)) + return error.InvalidStream; + } + try subject_list.append(alloc, byte); + } else |err| return err; + }, + .@"pub" => { + while (in.takeByte()) |byte| { + if (isWhitespace(byte)) break; + if (byte == '*' or byte == '>') return error.InvalidStream; + if (byte == '.') { + const next_byte = try in.peekByte(); + if (next_byte == '.' or isWhitespace(next_byte)) + return error.InvalidStream; + } + try subject_list.append(alloc, byte); + } else |err| return err; + }, + } + + return subject_list.toOwnedSlice(alloc); +} + +inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void { + if (!std.mem.eql(u8, try reader.take(expected.len), expected)) { + @branchHint(.unlikely); + return error.InvalidStream; + } +} + +test sub { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; + { + var in: Reader = .fixed(" foo 1\r\n"); + var res = try sub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "foo", + .queue_group = null, + .sid = "1", + }, + }, + res, + ); + } + { + var in: Reader = .fixed(" foo 1\r\n"); + var res = try sub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "foo", + .queue_group = null, + .sid = "1", + }, + }, + res, + ); + } + { + var in: Reader = .fixed(" foo q 1\r\n"); + var res = try sub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "foo", + .queue_group = "q", + .sid = "1", + }, + }, + res, + ); + } + { + var in: Reader = .fixed(" 1 q 1\r\n"); + var res = try sub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "1", + .queue_group = "q", + .sid = "1", + }, + }, + res, + ); + } + { + var in: Reader = .fixed(" $SRV.PING 4\r\n"); + var res = try sub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "$SRV.PING", + .queue_group = null, + .sid = "4", + }, + }, + res, + ); + } + { + var in: Reader = .fixed(" foo.echo q 10\r\n"); + var res = try sub(alloc, &in); + defer res.SUB.deinit(alloc); + try expectEqualDeep( + Message{ + .SUB = .{ + .subject = "foo.echo", + .queue_group = "q", + .sid = "10", + }, + }, + res, + ); + } +} + +test unsub { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; + const expectEqual = std.testing.expectEqual; + { + var in: Reader = .fixed(" 1\r\n"); + var res = try unsub(alloc, &in); + defer res.UNSUB.deinit(alloc); + try expectEqualDeep( + Message{ + .UNSUB = .{ + .sid = "1", + .max_msgs = null, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } + + { + var in: Reader = .fixed(" 1 1\r\n"); + var res = try unsub(alloc, &in); + defer res.UNSUB.deinit(alloc); + try expectEqualDeep( + Message{ + .UNSUB = .{ + .sid = "1", + .max_msgs = 1, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } +} + +test @"pub" { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; + const expectEqual = std.testing.expectEqual; + { + var in: Reader = .fixed(" foo 3\r\nbar\r\n"); + var res = try @"pub"(alloc, &in); + defer res.PUB.deinit(alloc); + try expectEqualDeep( + Message{ + .PUB = .{ + .subject = "foo", + .reply_to = null, + .payload = .{ + .len = 3, + .short = blk: { + var s: [128]u8 = undefined; + @memcpy(s[0..3], "bar"); + break :blk s; + }, + .long = null, + }, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } + + { + var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n"); + var res = try @"pub"(alloc, &in); + defer res.PUB.deinit(alloc); + try expectEqualDeep( + Message{ + .PUB = .{ + .subject = "foo", + .reply_to = "reply.to", + .payload = .{ + .len = 3, + .short = blk: { + var s: [128]u8 = undefined; + @memcpy(s[0..3], "bar"); + break :blk s; + }, + .long = null, + }, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } + + // numeric reply subject + { + var in: Reader = .fixed(" foo 5 3\r\nbar\r\n"); + var res = try @"pub"(alloc, &in); + defer res.PUB.deinit(alloc); + try expectEqualDeep( + Message{ + .PUB = .{ + .subject = "foo", + .reply_to = "5", + .payload = .{ + .len = 3, + .short = blk: { + var s: [128]u8 = undefined; + @memcpy(s[0..3], "bar"); + break :blk s; + }, + .long = null, + }, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } +} + +test hpub { + const alloc = std.testing.allocator; + const expectEqualDeep = std.testing.expectEqualDeep; + const expectEqual = std.testing.expectEqual; + { + var in: Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); + var res = try hpub(alloc, &in); + defer res.HPUB.deinit(alloc); + try expectEqualDeep( + Message{ + .HPUB = .{ + .header_bytes = 22, + .@"pub" = .{ + .subject = "foo", + .reply_to = null, + .payload = .{ + .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, + .short = blk: { + var s: [128]u8 = undefined; + const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; + @memcpy(s[0..str.len], str); + break :blk s; + }, + .long = null, + }, + }, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } + + { + var in: Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); + var res = try hpub(alloc, &in); + defer res.HPUB.deinit(alloc); + try expectEqualDeep( + Message{ + .HPUB = .{ + .header_bytes = 22, + .@"pub" = .{ + .subject = "foo", + .reply_to = "reply.to", + .payload = .{ + .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, + .short = blk: { + var s: [128]u8 = undefined; + const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; + @memcpy(s[0..str.len], str); + break :blk s; + }, + .long = null, + }, + }, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } + + { + var in: Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); + var res = try hpub(alloc, &in); + defer res.HPUB.deinit(alloc); + try expectEqualDeep( + Message{ + .HPUB = .{ + .header_bytes = 22, + .@"pub" = .{ + .subject = "foo", + .reply_to = "6", + .payload = .{ + .len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len, + .short = blk: { + var s: [128]u8 = undefined; + const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"; + @memcpy(s[0..str.len], str); + break :blk s; + }, + .long = null, + }, + }, + }, + }, + res, + ); + try expectEqual(0, in.buffered().len); + } +} |
