summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-07 22:48:50 -0500
committerRobby Zambito <contact@robbyzambito.me>2026-01-07 23:19:19 -0500
commit45feccbad8c7306c15137a6003f3df1183d9c2a9 (patch)
tree5a541a2e45eb2fbe8f0ec4ba3da0829d029ccd45
parent96a3705069cf33a00ded143f876734c2a045cf1e (diff)
WAY FASTER but doesn't send all?
Seems to not flush the last message
-rw-r--r--src/Server.zig141
-rw-r--r--src/Server/Client.zig25
-rw-r--r--src/Server/message.zig7
-rw-r--r--src/Server/parse.zig384
-rw-r--r--src/main.zig6
-rw-r--r--src/subcommand/serve.zig2
6 files changed, 426 insertions, 139 deletions
diff --git a/src/Server.zig b/src/Server.zig
index 21eecb4..b5f9ee9 100644
--- a/src/Server.zig
+++ b/src/Server.zig
@@ -13,10 +13,11 @@ const Stream = std.Io.net.Stream;
pub const Client = @import("./Server/Client.zig");
-pub const parse = @import("./Server/parse.zig");
+pub const message = @import("./Server/message.zig");
+const parse = message.parse;
-const MessageType = parse.MessageType;
-const Message = parse.Message;
+const MessageType = message.Control;
+const Message = message.Message;
const ServerInfo = Message.ServerInfo;
const Msgs = Client.Msgs;
@@ -29,9 +30,8 @@ const Subscription = struct {
client_id: usize,
sid: []const u8,
queue_group: ?[]const u8,
- queue: *Queue(Msgs),
- // used to alloc messages in the queue
- alloc: Allocator,
+ queue_lock: *Mutex,
+ queue: *Queue(u8),
fn deinit(self: Subscription, alloc: Allocator) void {
alloc.free(self.subject);
@@ -168,66 +168,57 @@ fn handleConnection(
const in = &reader.interface;
// Set up buffer queue
- const qbuf: []Message = try alloc.alloc(Message, 16);
+ const qbuf: []u8 = try alloc.alloc(u8, r_buf_size);
defer alloc.free(qbuf);
- var recv_queue: Queue(Message) = .init(qbuf);
+ var recv_queue: Queue(u8) = .init(qbuf);
defer recv_queue.close(io);
- const mbuf: []Msgs = try alloc.alloc(Msgs, w_buf_size / @sizeOf(Msgs));
- defer alloc.free(mbuf);
- var msgs_queue: Queue(Msgs) = .init(mbuf);
- defer {
- msgs_queue.close(io);
- while (msgs_queue.getOne(io)) |msg| {
- switch (msg) {
- .MSG => |m| m.deinit(alloc),
- .HMSG => |h| h.deinit(alloc),
- }
- } else |_| {}
- }
-
// Create client
- var client: Client = .init(null, alloc, &recv_queue, &msgs_queue, in, out);
+ var client: Client = .init(null, &recv_queue, in, out);
defer client.deinit(server_allocator);
try server.addClient(server_allocator, id, &client);
defer server.removeClient(io, server_allocator, id);
// Do initial handshake with client
- // try recv_queue.putOne(io, .PONG);
- try recv_queue.putOne(io, .{ .INFO = server.info });
+ _ = try out.write("INFO ");
+ try std.json.Stringify.value(server.info, .{}, out);
+ _ = try out.write("\r\n");
+ try out.flush();
var client_task = try io.concurrent(Client.start, .{ &client, io });
defer client_task.cancel(io) catch {};
- while (client.next(server_allocator)) |ctrl| {
+ while (client.next()) |ctrl| {
switch (ctrl) {
.PING => {
// Respond to ping with pong.
try client.recv_queue_write_lock.lock(io);
defer client.recv_queue_write_lock.unlock(io);
- try client.send(io, "PONG\r\n");
+ _ = try client.from_client.take(2);
+ try client.recv_queue.putAll(io, "PONG\r\n");
+ // try client.send(io, "PONG\r\n");
},
.PUB => {
@branchHint(.likely);
- try server.publishMessage(io, server_allocator, &client, msg);
+ // log.debug("received a pub msg", .{});
+ try server.publishMessage(io, server_allocator, &client, .@"pub");
},
.HPUB => {
@branchHint(.likely);
- try server.publishMessage(io, server_allocator, &client, msg);
+ try server.publishMessage(io, server_allocator, &client, .hpub);
},
.SUB => {
- try server.subscribe(io, server_allocator, client, id, sub);
+ try server.subscribe(io, server_allocator, &client, id);
},
.UNSUB => {
- defer unsub.deinit(server_allocator);
- try server.unsubscribe(io, server_allocator, id, unsub);
+ try server.unsubscribe(io, server_allocator, client, id);
},
.CONNECT => {
if (client.connect) |*current| {
current.deinit(server_allocator);
}
- client.connect = connect;
+ client.connect = try parse.connect(server_allocator, client.from_client);
},
else => |e| {
panic("Unimplemented message: {any}\n", .{e});
@@ -279,19 +270,26 @@ fn publishMessage(
io: Io,
alloc: Allocator,
source_client: *Client,
- msg: Message,
+ comptime pub_or_hpub: enum { @"pub", hpub },
) !void {
defer if (source_client.connect) |c| {
if (c.verbose) {
- source_client.send(io, .@"+OK") catch {};
+ if (source_client.recv_queue_write_lock.lock(io)) |_| {
+ defer source_client.recv_queue_write_lock.unlock(io);
+ source_client.recv_queue.putAll(io, "+OK\r\n") catch {};
+ } else |_| {}
}
};
- const subject = switch (msg) {
- .PUB => |pb| pb.subject,
- .HPUB => |hp| hp.@"pub".subject,
- else => unreachable,
- };
+ _ = pub_or_hpub;
+
+ const msg = try parse.@"pub"(source_client.from_client);
+
+ // const subject = switch (pub_or_hpub) {
+ // .PUB => |pb| pb.subject,
+ // .HPUB => |hp| hp.@"pub".subject,
+ // else => unreachable,
+ // };
try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io);
var published_queue_groups: ArrayList([]const u8) = .empty;
@@ -301,7 +299,7 @@ fn publishMessage(
subs: for (0..server.subscriptions.items.len) |i| {
const subscription = server.subscriptions.items[i];
- if (subjectMatches(subscription.subject, subject)) {
+ if (subjectMatches(subscription.subject, msg.subject)) {
if (subscription.queue_group) |sg| {
for (published_queue_groups.items) |g| {
if (eql(u8, g, sg)) {
@@ -314,19 +312,46 @@ fn publishMessage(
// to prioritize other subscriptions in the queue next time.
try published_queue_sub_idxs.append(alloc, i);
}
- switch (msg) {
- .PUB => |pb| {
- try subscription.queue.putOne(io, .{
- .MSG = try pb.toMsg(subscription.alloc, subscription.sid),
- });
- },
- .HPUB => |hp| {
- try subscription.queue.putOne(io, .{
- .HMSG = try hp.toHMsg(subscription.alloc, subscription.sid),
- });
+
+ const m = msg.toMsg(subscription.sid);
+ var msg_line_buf: [1024]u8 = undefined;
+ var msg_line_writer: std.Io.Writer = .fixed(&msg_line_buf);
+
+ // try self.to_client.print(
+ // ,
+
+ // );
+ // try m.payload.write(self.to_client);
+ // try self.to_client.print("\r\n", .{});
+ try msg_line_writer.print(
+ "MSG {s} {s} {s} {d}\r\n",
+ .{
+ m.subject,
+ m.sid,
+ m.reply_to orelse "",
+ m.payload.len,
},
- else => unreachable,
- }
+ );
+
+ try subscription.queue_lock.lock(io);
+ defer subscription.queue_lock.unlock(io);
+ try subscription.queue.putAll(io, msg_line_writer.buffered());
+ try subscription.queue.putAll(io, m.payload);
+ try subscription.queue.putAll(io, "\r\n");
+
+ // switch (msg) {
+ // .PUB => |pb| {
+ // try subscription.queue.putOne(io, .{
+ // .MSG = try pb.toMsg(subscription.alloc, subscription.sid),
+ // });
+ // },
+ // .HPUB => |hp| {
+ // try subscription.queue.putOne(io, .{
+ // .HMSG = try hp.toHMsg(subscription.alloc, subscription.sid),
+ // });
+ // },
+ // else => unreachable,
+ // }
}
}
@@ -340,10 +365,11 @@ fn subscribe(
server: *Server,
io: Io,
gpa: Allocator,
- client: Client,
+ client: *Client,
id: usize,
- msg: Message.Sub,
+ // msg: Message.Sub,
) !void {
+ const msg = try parse.sub(client.from_client);
try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io);
const subject = try gpa.dupe(u8, msg.subject);
@@ -357,8 +383,8 @@ fn subscribe(
.client_id = id,
.sid = sid,
.queue_group = queue_group,
- .queue = client.msg_queue,
- .alloc = client.alloc,
+ .queue_lock = &client.recv_queue_write_lock,
+ .queue = client.recv_queue,
});
}
@@ -366,9 +392,10 @@ fn unsubscribe(
server: *Server,
io: Io,
gpa: Allocator,
+ client: Client,
id: usize,
- msg: Message.Unsub,
) !void {
+ const msg = try parse.unsub(client.from_client);
try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io);
const len = server.subscriptions.items.len;
diff --git a/src/Server/Client.zig b/src/Server/Client.zig
index 047f38d..a7dd007 100644
--- a/src/Server/Client.zig
+++ b/src/Server/Client.zig
@@ -1,5 +1,6 @@
-const parse = @import("parse.zig");
-const Message = parse.Message;
+const message = @import("message.zig");
+const parse = message.parse;
+const Message = message.Message;
const std = @import("std");
const Queue = std.Io.Queue;
@@ -68,17 +69,7 @@ pub fn start(self: *Client, io: std.Io) !void {
// };
// switch (msg) {
// .MSG => |m| {
- // try self.to_client.print(
- // "MSG {s} {s} {s} {d}\r\n",
- // .{
- // m.subject,
- // m.sid,
- // m.reply_to orelse "",
- // m.payload.len,
- // },
- // );
- // try m.payload.write(self.to_client);
- // try self.to_client.print("\r\n", .{});
+
// },
// .HMSG => |hmsg| {
// try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{
@@ -106,9 +97,7 @@ pub fn start(self: *Client, io: std.Io) !void {
// _ = try self.to_client.write("PONG\r\n");
// },
// .INFO => |info| {
- // _ = try self.to_client.write("INFO ");
- // try std.json.Stringify.value(info, .{}, self.to_client);
- // _ = try self.to_client.write("\r\n");
+
// },
// .@"-ERR" => |s| {
// _ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
@@ -177,8 +166,8 @@ test send {
}
}
-pub fn next(self: *Client, allocator: std.mem.Allocator) !Message {
- return parse.next(allocator, self.from_client);
+pub fn next(self: *Client) !message.Control {
+ return parse.control(self.from_client);
}
test next {
diff --git a/src/Server/message.zig b/src/Server/message.zig
index f22410f..8005453 100644
--- a/src/Server/message.zig
+++ b/src/Server/message.zig
@@ -5,6 +5,8 @@ const Reader = std.Io.Reader;
const Payload = @import("Payload.zig");
+pub const parse = @import("parse.zig");
+
pub const Control = @typeInfo(Message).@"union".tag_type.?;
pub const Message = union(enum) {
@@ -108,14 +110,13 @@ pub const Message = union(enum) {
/// The message payload data.
payload: []const u8,
- pub fn toMsg(self: Pub, alloc: Allocator, sid: []const u8) !Msg {
- const res: Msg = .{
+ pub fn toMsg(self: Pub, sid: []const u8) Msg {
+ return .{
.subject = self.subject,
.sid = sid,
.reply_to = self.reply_to,
.payload = self.payload,
};
- return res.dupe(alloc);
}
};
pub const HPub = struct {
diff --git a/src/Server/parse.zig b/src/Server/parse.zig
index 6e013c4..9035311 100644
--- a/src/Server/parse.zig
+++ b/src/Server/parse.zig
@@ -16,8 +16,8 @@ const isWhitespace = std.ascii.isWhitespace;
const parseUnsigned = std.fmt.parseUnsigned;
const message = @import("./message.zig");
-pub const Message = message.Message;
-pub const Payload = @import("./Payload.zig");
+const Message = message.Message;
+const Payload = @import("./Payload.zig");
const client_control = StaticStringMap(message.Control).initComptime(
.{
@@ -44,8 +44,10 @@ pub fn control(in: *Reader) !message.Control {
}
break :blk min_len;
};
+ // log.debug("buffered: '{s}'", .{in.buffered()});
std.debug.assert(in.buffer.len >= longest_ctrl);
while (true) {
+ // log.debug("buffered l: '{s}'", .{in.buffered()});
var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r");
if (iter.next()) |str| {
if (client_control.get(str)) |ctrl| {
@@ -121,32 +123,39 @@ pub fn @"pub"(in: *Reader) !Message.Pub {
if (iter.next()) |subject| {
if (iter.next()) |second| {
- if (in.buffered()[iter.index] == '\r') {
- const bytes_str = second;
- const bytes = try parseUnsigned(usize, bytes_str, 10);
+ if (in.buffered().len > iter.index) {
+ if (in.buffered()[iter.index] == '\r') {
+ const bytes_str = second;
+ const bytes = try parseUnsigned(usize, bytes_str, 10);
- // 4 bytes for CRLF on either side of the payload.
- in.toss(iter.index + bytes + 4);
- return .{
- .subject = subject,
- .reply_to = null,
- .payload = iter.rest()[1 .. 1 + bytes],
- };
- }
+ // 4 bytes for CRLF on either side of the payload.
+ _ = try in.take(iter.index + 2);
+ defer {
+ _ = in.take(2) catch {
+ log.warn("very bad parsing issue", .{});
+ };
+ }
+ return .{
+ .subject = subject,
+ .reply_to = null,
+ .payload = try in.take(bytes),
+ };
+ }
- const reply_to = second;
- if (iter.next()) |bytes_str| {
- const bytes = try parseUnsigned(usize, bytes_str, 10);
+ const reply_to = second;
+ if (iter.next()) |bytes_str| {
+ const bytes = try parseUnsigned(usize, bytes_str, 10);
- if (in.buffered()[iter.index] == '\r') {
- if (iter.rest().len > bytes) {
- // 4 bytes for CRLF on either side of the payload.
- in.toss(iter.index + bytes + 4);
- return .{
- .subject = subject,
- .reply_to = reply_to,
- .payload = iter.rest()[1 .. 1 + bytes],
- };
+ if (in.buffered()[iter.index] == '\r') {
+ if (iter.rest().len > bytes + 2) {
+ // 4 bytes for CRLF on either side of the payload.
+ _ = try in.take(iter.index + bytes + 4);
+ return .{
+ .subject = subject,
+ .reply_to = reply_to,
+ .payload = iter.rest()[1 .. 1 + bytes],
+ };
+ }
}
}
}
@@ -188,8 +197,269 @@ test @"pub" {
);
try std.testing.expectEqualSlices(u8, "", in.interface.buffered());
}
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo " },
+ .{ .buffer = "2\r\nhi\r\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Pub{
+ .subject = "foo",
+ .reply_to = null,
+ .payload = "hi",
+ },
+ try @"pub"(&in.interface),
+ );
+ try std.testing.expectEqualSlices(u8, "", in.interface.buffered());
+ }
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo " },
+ .{ .buffer = "2" },
+ .{ .buffer = "\r\nhi\r\n " },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Pub{
+ .subject = "foo",
+ .reply_to = null,
+ .payload = "hi",
+ },
+ try @"pub"(&in.interface),
+ );
+ try std.testing.expectEqualSlices(u8, " ", in.interface.buffered());
+ }
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo " },
+ .{ .buffer = "2" },
+ .{ .buffer = "\r\nhi\r" },
+ .{ .buffer = "\n " },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Pub{
+ .subject = "foo",
+ .reply_to = null,
+ .payload = "hi",
+ },
+ try @"pub"(&in.interface),
+ );
+ try std.testing.expectEqualSlices(u8, " ", in.interface.buffered());
+ }
}
+/// The return value is owned by the reader passed to this function.
+/// Operations that modify the readers buffer invalidates this value.
+pub fn sub(in: *Reader) !Message.Sub {
+ // TODO: Add pedantic option.
+ // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1
+
+ while (true) {
+ var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r");
+
+ if (iter.next()) |subject| {
+ if (iter.next()) |second| {
+ if (in.buffered().len > iter.index) {
+ if (in.buffered()[iter.index] == '\r') {
+ const sid = second;
+
+ // 2 bytes for CRLF at the end.
+ _ = try in.take(iter.index + 2);
+ return .{
+ .subject = subject,
+ .queue_group = null,
+ .sid = sid,
+ };
+ }
+
+ const queue_group = second;
+ if (iter.next()) |sid| {
+ if (in.buffered()[iter.index] == '\r') {
+ // 2 bytes for CRLF at the end.
+ _ = try in.take(iter.index + 2);
+ return .{
+ .subject = subject,
+ .queue_group = queue_group,
+ .sid = sid,
+ };
+ }
+ }
+ }
+ }
+ }
+
+ try in.fillMore();
+ }
+}
+
+test sub {
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo q 1\r\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Sub{
+ .subject = "foo",
+ .queue_group = "q",
+ .sid = "1",
+ },
+ try sub(&in.interface),
+ );
+ }
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo 1\r\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Sub{
+ .subject = "foo",
+ .queue_group = null,
+ .sid = "1",
+ },
+ try sub(&in.interface),
+ );
+ }
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo " },
+ .{ .buffer = " 1\r\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Sub{
+ .subject = "foo",
+ .queue_group = null,
+ .sid = "1",
+ },
+ try sub(&in.interface),
+ );
+ }
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo " },
+ .{ .buffer = " 1\r" },
+ .{ .buffer = "\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Sub{
+ .subject = "foo",
+ .queue_group = null,
+ .sid = "1",
+ },
+ try sub(&in.interface),
+ );
+ }
+}
+
+/// The return value is owned by the reader passed to this function.
+/// Operations that modify the readers buffer invalidates this value.
+pub fn unsub(in: *Reader) !Message.Unsub {
+ // TODO: Add pedantic option.
+ // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1
+
+ while (true) {
+ var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r");
+
+ if (iter.next()) |sid| {
+ if (in.buffered()[iter.index] == '\r') {
+ // 2 bytes for CRLF at the end.
+ _ = try in.take(iter.index + 2);
+ return .{
+ .sid = sid,
+ .max_msgs = null,
+ };
+ }
+ if (iter.next()) |max_msgs_str| {
+ if (in.buffered()[iter.index] == '\r') {
+ const max_msgs = try parseUnsigned(usize, max_msgs_str, 10);
+ // 2 bytes for CRLF at the end.
+ _ = try in.take(iter.index + 2);
+ return .{
+ .sid = sid,
+ .max_msgs = max_msgs,
+ };
+ }
+ }
+ }
+
+ try in.fillMore();
+ }
+}
+
+test unsub {
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo 1\r\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Unsub{
+ .sid = "foo",
+ .max_msgs = 1,
+ },
+ try unsub(&in.interface),
+ );
+ }
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo\r\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Unsub{
+ .sid = "foo",
+ .max_msgs = null,
+ },
+ try unsub(&in.interface),
+ );
+ }
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo " },
+ .{ .buffer = " 1\r\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Unsub{
+ .sid = "foo",
+ .max_msgs = 1,
+ },
+ try unsub(&in.interface),
+ );
+ }
+ {
+ var buf: [64]u8 = undefined;
+ var in: std.testing.Reader = .init(&buf, &.{
+ .{ .buffer = "foo " },
+ .{ .buffer = " 1\r" },
+ .{ .buffer = "\n" },
+ });
+ try std.testing.expectEqualDeep(
+ Message.Unsub{
+ .sid = "foo",
+ .max_msgs = 1,
+ },
+ try unsub(&in.interface),
+ );
+ }
+}
+
+/// The return value is owned by the reader passed to this function.
+/// Operations that modify the readers buffer invalidates this value.
+pub fn hpub(in: *Reader) !Message.HPub {
+ // TODO: Add pedantic option.
+ // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1
+ _ = in;
+ @compileError("TODO");
+}
+
+test hpub {}
+
// /// Get the next Message from the input stream.
// pub fn next(alloc: Allocator, in: *Reader) !Message {
// errdefer log.err("Failed to parse {s}", .{operation_string.items});
@@ -218,35 +488,35 @@ test @"pub" {
// }
// }
-// pub fn connect(alloc: Allocator, in: *Reader) !Message {
-// // for storing the json string
-// var connect_string_writer_allocating: AllocatingWriter = .init(alloc);
-// defer connect_string_writer_allocating.deinit();
-// var connect_string_writer = &connect_string_writer_allocating.writer;
+pub fn connect(alloc: Allocator, in: *Reader) !Message.Connect {
+ // for storing the json string
+ var connect_string_writer_allocating: AllocatingWriter = .init(alloc);
+ defer connect_string_writer_allocating.deinit();
+ var connect_string_writer = &connect_string_writer_allocating.writer;
-// // for parsing the json string
-// var connect_arena_allocator: ArenaAllocator = .init(alloc);
-// defer connect_arena_allocator.deinit();
-// const connect_allocator = connect_arena_allocator.allocator();
+ // for parsing the json string
+ var connect_arena_allocator: ArenaAllocator = .init(alloc);
+ defer connect_arena_allocator.deinit();
+ const connect_allocator = connect_arena_allocator.allocator();
-// try in.discardAll(1); // throw away space
+ try in.discardAll(1); // throw away space
-// // Should read the next JSON object to the fixed buffer writer.
-// _ = try in.streamDelimiter(connect_string_writer, '}');
-// try connect_string_writer.writeByte('}');
-// try expectStreamBytes(in, "}\r\n"); // discard '}\r\n'
-
-// const connect_str = try connect_string_writer_allocating.toOwnedSlice();
-// defer alloc.free(connect_str);
-// const res = try std.json.parseFromSliceLeaky(
-// Message.Connect,
-// connect_allocator,
-// connect_str,
-// .{ .allocate = .alloc_always },
-// );
-
-// return .{ .CONNECT = try res.dupe(alloc) };
-// }
+ // Should read the next JSON object to the fixed buffer writer.
+ _ = try in.streamDelimiter(connect_string_writer, '}');
+ try connect_string_writer.writeByte('}');
+ try expectStreamBytes(in, "}\r\n"); // discard '}\r\n'
+
+ const connect_str = try connect_string_writer_allocating.toOwnedSlice();
+ defer alloc.free(connect_str);
+ const res = try std.json.parseFromSliceLeaky(
+ Message.Connect,
+ connect_allocator,
+ connect_str,
+ .{ .allocate = .alloc_always },
+ );
+
+ return res.dupe(alloc);
+}
// pub fn sub(alloc: Allocator, in: *Reader) !Message {
// try in.discardAll(1); // throw away space
@@ -647,12 +917,12 @@ test @"pub" {
// return subject_list.toOwnedSlice(alloc);
// }
-// inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void {
-// if (!std.mem.eql(u8, try reader.take(expected.len), expected)) {
-// @branchHint(.unlikely);
-// return error.InvalidStream;
-// }
-// }
+inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void {
+ if (!std.mem.eql(u8, try reader.take(expected.len), expected)) {
+ @branchHint(.unlikely);
+ return error.InvalidStream;
+ }
+}
// test sub {
// const alloc = std.testing.allocator;
diff --git a/src/main.zig b/src/main.zig
index 79de5e7..411be6a 100644
--- a/src/main.zig
+++ b/src/main.zig
@@ -25,7 +25,7 @@ pub fn main() !void {
'a',
std.fmt.comptimePrint(
"Address to bind to (default: {s})",
- .{std.meta.fieldInfo(zits.Server.parse.Message.ServerInfo, .host).defaultValue().?},
+ .{std.meta.fieldInfo(zits.Server.message.Message.ServerInfo, .host).defaultValue().?},
),
),
yazap.Arg.singleValueOption(
@@ -33,7 +33,7 @@ pub fn main() !void {
'p',
std.fmt.comptimePrint(
"Port to listen on (default: {d})",
- .{std.meta.fieldInfo(zits.Server.parse.Message.ServerInfo, .port).defaultValue().?},
+ .{std.meta.fieldInfo(zits.Server.message.Message.ServerInfo, .port).defaultValue().?},
),
),
yazap.Arg.singleValueOption(
@@ -54,7 +54,7 @@ pub fn main() !void {
const matches = try app.parseProcess(io);
if (matches.subcommandMatches("serve")) |serve_matches| {
- var info: zits.Server.parse.Message.ServerInfo = .{
+ var info: zits.Server.message.Message.ServerInfo = .{
.server_id = zits.Server.default_id,
.server_name = zits.Server.default_name,
.version = "zits-master",
diff --git a/src/subcommand/serve.zig b/src/subcommand/serve.zig
index 54258a9..ea6b9dc 100644
--- a/src/subcommand/serve.zig
+++ b/src/subcommand/serve.zig
@@ -9,7 +9,7 @@ const Threaded = Io.Threaded;
const builtin = @import("builtin");
const zits = @import("zits");
-const Message = zits.Server.parse.Message;
+const Message = zits.Server.message.Message;
const ServerInfo = Message.ServerInfo;
const Server = zits.Server;