diff options
| author | Robby Zambito <contact@robbyzambito.me> | 2026-01-07 22:48:50 -0500 |
|---|---|---|
| committer | Robby Zambito <contact@robbyzambito.me> | 2026-01-07 23:19:19 -0500 |
| commit | 45feccbad8c7306c15137a6003f3df1183d9c2a9 (patch) | |
| tree | 5a541a2e45eb2fbe8f0ec4ba3da0829d029ccd45 /src/Server/parse.zig | |
| parent | 96a3705069cf33a00ded143f876734c2a045cf1e (diff) | |
WAY FASTER but doesn't send all?
Seems to not flush the last message
Diffstat (limited to 'src/Server/parse.zig')
| -rw-r--r-- | src/Server/parse.zig | 384 |
1 files changed, 327 insertions, 57 deletions
diff --git a/src/Server/parse.zig b/src/Server/parse.zig index 6e013c4..9035311 100644 --- a/src/Server/parse.zig +++ b/src/Server/parse.zig @@ -16,8 +16,8 @@ const isWhitespace = std.ascii.isWhitespace; const parseUnsigned = std.fmt.parseUnsigned; const message = @import("./message.zig"); -pub const Message = message.Message; -pub const Payload = @import("./Payload.zig"); +const Message = message.Message; +const Payload = @import("./Payload.zig"); const client_control = StaticStringMap(message.Control).initComptime( .{ @@ -44,8 +44,10 @@ pub fn control(in: *Reader) !message.Control { } break :blk min_len; }; + // log.debug("buffered: '{s}'", .{in.buffered()}); std.debug.assert(in.buffer.len >= longest_ctrl); while (true) { + // log.debug("buffered l: '{s}'", .{in.buffered()}); var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); if (iter.next()) |str| { if (client_control.get(str)) |ctrl| { @@ -121,32 +123,39 @@ pub fn @"pub"(in: *Reader) !Message.Pub { if (iter.next()) |subject| { if (iter.next()) |second| { - if (in.buffered()[iter.index] == '\r') { - const bytes_str = second; - const bytes = try parseUnsigned(usize, bytes_str, 10); + if (in.buffered().len > iter.index) { + if (in.buffered()[iter.index] == '\r') { + const bytes_str = second; + const bytes = try parseUnsigned(usize, bytes_str, 10); - // 4 bytes for CRLF on either side of the payload. - in.toss(iter.index + bytes + 4); - return .{ - .subject = subject, - .reply_to = null, - .payload = iter.rest()[1 .. 1 + bytes], - }; - } + // 4 bytes for CRLF on either side of the payload. + _ = try in.take(iter.index + 2); + defer { + _ = in.take(2) catch { + log.warn("very bad parsing issue", .{}); + }; + } + return .{ + .subject = subject, + .reply_to = null, + .payload = try in.take(bytes), + }; + } - const reply_to = second; - if (iter.next()) |bytes_str| { - const bytes = try parseUnsigned(usize, bytes_str, 10); + const reply_to = second; + if (iter.next()) |bytes_str| { + const bytes = try parseUnsigned(usize, bytes_str, 10); - if (in.buffered()[iter.index] == '\r') { - if (iter.rest().len > bytes) { - // 4 bytes for CRLF on either side of the payload. - in.toss(iter.index + bytes + 4); - return .{ - .subject = subject, - .reply_to = reply_to, - .payload = iter.rest()[1 .. 1 + bytes], - }; + if (in.buffered()[iter.index] == '\r') { + if (iter.rest().len > bytes + 2) { + // 4 bytes for CRLF on either side of the payload. + _ = try in.take(iter.index + bytes + 4); + return .{ + .subject = subject, + .reply_to = reply_to, + .payload = iter.rest()[1 .. 1 + bytes], + }; + } } } } @@ -188,8 +197,269 @@ test @"pub" { ); try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); } + { + var buf: [64]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "foo " }, + .{ .buffer = "2\r\nhi\r\n" }, + }); + try std.testing.expectEqualDeep( + Message.Pub{ + .subject = "foo", + .reply_to = null, + .payload = "hi", + }, + try @"pub"(&in.interface), + ); + try std.testing.expectEqualSlices(u8, "", in.interface.buffered()); + } + { + var buf: [64]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "foo " }, + .{ .buffer = "2" }, + .{ .buffer = "\r\nhi\r\n " }, + }); + try std.testing.expectEqualDeep( + Message.Pub{ + .subject = "foo", + .reply_to = null, + .payload = "hi", + }, + try @"pub"(&in.interface), + ); + try std.testing.expectEqualSlices(u8, " ", in.interface.buffered()); + } + { + var buf: [64]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "foo " }, + .{ .buffer = "2" }, + .{ .buffer = "\r\nhi\r" }, + .{ .buffer = "\n " }, + }); + try std.testing.expectEqualDeep( + Message.Pub{ + .subject = "foo", + .reply_to = null, + .payload = "hi", + }, + try @"pub"(&in.interface), + ); + try std.testing.expectEqualSlices(u8, " ", in.interface.buffered()); + } } +/// The return value is owned by the reader passed to this function. +/// Operations that modify the readers buffer invalidates this value. +pub fn sub(in: *Reader) !Message.Sub { + // TODO: Add pedantic option. + // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1 + + while (true) { + var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); + + if (iter.next()) |subject| { + if (iter.next()) |second| { + if (in.buffered().len > iter.index) { + if (in.buffered()[iter.index] == '\r') { + const sid = second; + + // 2 bytes for CRLF at the end. + _ = try in.take(iter.index + 2); + return .{ + .subject = subject, + .queue_group = null, + .sid = sid, + }; + } + + const queue_group = second; + if (iter.next()) |sid| { + if (in.buffered()[iter.index] == '\r') { + // 2 bytes for CRLF at the end. + _ = try in.take(iter.index + 2); + return .{ + .subject = subject, + .queue_group = queue_group, + .sid = sid, + }; + } + } + } + } + } + + try in.fillMore(); + } +} + +test sub { + { + var buf: [64]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "foo q 1\r\n" }, + }); + try std.testing.expectEqualDeep( + Message.Sub{ + .subject = "foo", + .queue_group = "q", + .sid = "1", + }, + try sub(&in.interface), + ); + } + { + var buf: [64]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "foo 1\r\n" }, + }); + try std.testing.expectEqualDeep( + Message.Sub{ + .subject = "foo", + .queue_group = null, + .sid = "1", + }, + try sub(&in.interface), + ); + } + { + var buf: [64]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "foo " }, + .{ .buffer = " 1\r\n" }, + }); + try std.testing.expectEqualDeep( + Message.Sub{ + .subject = "foo", + .queue_group = null, + .sid = "1", + }, + try sub(&in.interface), + ); + } + { + var buf: [64]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "foo " }, + .{ .buffer = " 1\r" }, + .{ .buffer = "\n" }, + }); + try std.testing.expectEqualDeep( + Message.Sub{ + .subject = "foo", + .queue_group = null, + .sid = "1", + }, + try sub(&in.interface), + ); + } +} + +/// The return value is owned by the reader passed to this function. +/// Operations that modify the readers buffer invalidates this value. +pub fn unsub(in: *Reader) !Message.Unsub { + // TODO: Add pedantic option. + // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1 + + while (true) { + var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); + + if (iter.next()) |sid| { + if (in.buffered()[iter.index] == '\r') { + // 2 bytes for CRLF at the end. + _ = try in.take(iter.index + 2); + return .{ + .sid = sid, + .max_msgs = null, + }; + } + if (iter.next()) |max_msgs_str| { + if (in.buffered()[iter.index] == '\r') { + const max_msgs = try parseUnsigned(usize, max_msgs_str, 10); + // 2 bytes for CRLF at the end. + _ = try in.take(iter.index + 2); + return .{ + .sid = sid, + .max_msgs = max_msgs, + }; + } + } + } + + try in.fillMore(); + } +} + +test unsub { + { + var buf: [64]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "foo 1\r\n" }, + }); + try std.testing.expectEqualDeep( + Message.Unsub{ + .sid = "foo", + .max_msgs = 1, + }, + try unsub(&in.interface), + ); + } + { + var buf: [64]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "foo\r\n" }, + }); + try std.testing.expectEqualDeep( + Message.Unsub{ + .sid = "foo", + .max_msgs = null, + }, + try unsub(&in.interface), + ); + } + { + var buf: [64]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "foo " }, + .{ .buffer = " 1\r\n" }, + }); + try std.testing.expectEqualDeep( + Message.Unsub{ + .sid = "foo", + .max_msgs = 1, + }, + try unsub(&in.interface), + ); + } + { + var buf: [64]u8 = undefined; + var in: std.testing.Reader = .init(&buf, &.{ + .{ .buffer = "foo " }, + .{ .buffer = " 1\r" }, + .{ .buffer = "\n" }, + }); + try std.testing.expectEqualDeep( + Message.Unsub{ + .sid = "foo", + .max_msgs = 1, + }, + try unsub(&in.interface), + ); + } +} + +/// The return value is owned by the reader passed to this function. +/// Operations that modify the readers buffer invalidates this value. +pub fn hpub(in: *Reader) !Message.HPub { + // TODO: Add pedantic option. + // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1 + _ = in; + @compileError("TODO"); +} + +test hpub {} + // /// Get the next Message from the input stream. // pub fn next(alloc: Allocator, in: *Reader) !Message { // errdefer log.err("Failed to parse {s}", .{operation_string.items}); @@ -218,35 +488,35 @@ test @"pub" { // } // } -// pub fn connect(alloc: Allocator, in: *Reader) !Message { -// // for storing the json string -// var connect_string_writer_allocating: AllocatingWriter = .init(alloc); -// defer connect_string_writer_allocating.deinit(); -// var connect_string_writer = &connect_string_writer_allocating.writer; +pub fn connect(alloc: Allocator, in: *Reader) !Message.Connect { + // for storing the json string + var connect_string_writer_allocating: AllocatingWriter = .init(alloc); + defer connect_string_writer_allocating.deinit(); + var connect_string_writer = &connect_string_writer_allocating.writer; -// // for parsing the json string -// var connect_arena_allocator: ArenaAllocator = .init(alloc); -// defer connect_arena_allocator.deinit(); -// const connect_allocator = connect_arena_allocator.allocator(); + // for parsing the json string + var connect_arena_allocator: ArenaAllocator = .init(alloc); + defer connect_arena_allocator.deinit(); + const connect_allocator = connect_arena_allocator.allocator(); -// try in.discardAll(1); // throw away space + try in.discardAll(1); // throw away space -// // Should read the next JSON object to the fixed buffer writer. -// _ = try in.streamDelimiter(connect_string_writer, '}'); -// try connect_string_writer.writeByte('}'); -// try expectStreamBytes(in, "}\r\n"); // discard '}\r\n' - -// const connect_str = try connect_string_writer_allocating.toOwnedSlice(); -// defer alloc.free(connect_str); -// const res = try std.json.parseFromSliceLeaky( -// Message.Connect, -// connect_allocator, -// connect_str, -// .{ .allocate = .alloc_always }, -// ); - -// return .{ .CONNECT = try res.dupe(alloc) }; -// } + // Should read the next JSON object to the fixed buffer writer. + _ = try in.streamDelimiter(connect_string_writer, '}'); + try connect_string_writer.writeByte('}'); + try expectStreamBytes(in, "}\r\n"); // discard '}\r\n' + + const connect_str = try connect_string_writer_allocating.toOwnedSlice(); + defer alloc.free(connect_str); + const res = try std.json.parseFromSliceLeaky( + Message.Connect, + connect_allocator, + connect_str, + .{ .allocate = .alloc_always }, + ); + + return res.dupe(alloc); +} // pub fn sub(alloc: Allocator, in: *Reader) !Message { // try in.discardAll(1); // throw away space @@ -647,12 +917,12 @@ test @"pub" { // return subject_list.toOwnedSlice(alloc); // } -// inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void { -// if (!std.mem.eql(u8, try reader.take(expected.len), expected)) { -// @branchHint(.unlikely); -// return error.InvalidStream; -// } -// } +inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void { + if (!std.mem.eql(u8, try reader.take(expected.len), expected)) { + @branchHint(.unlikely); + return error.InvalidStream; + } +} // test sub { // const alloc = std.testing.allocator; |
