From cd5281030ee6cede5a39f8360d47c6c9ed9269d3 Mon Sep 17 00:00:00 2001 From: Robby Zambito Date: Fri, 2 Jan 2026 16:01:35 +0000 Subject: --- src/main.zig | 1 + src/server/client.zig | 11 ++ src/server/main.zig | 46 +++++-- src/server/message_parser.zig | 291 +++++++++++++++++++++++++++++++++++------- 4 files changed, 295 insertions(+), 54 deletions(-) (limited to 'src') diff --git a/src/main.zig b/src/main.zig index a1414c8..7e4974c 100644 --- a/src/main.zig +++ b/src/main.zig @@ -57,6 +57,7 @@ pub fn main() !void { .server_name = zits.Server.createName(), .version = "zits-master", .max_payload = 1048576, + .headers = true, }; if (serve_matches.getSingleValue("port")) |port| { info.port = std.fmt.parseUnsigned(@TypeOf(info.port), port, 10) catch |err| std.process.fatal("Could not parse port ({s}): {}\n", .{ port, err }); diff --git a/src/server/client.zig b/src/server/client.zig index 23a0c9d..37fc60c 100644 --- a/src/server/client.zig +++ b/src/server/client.zig @@ -76,6 +76,17 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io }, ); }, + .hmsg => |hmsg| { + std.log.debug("Sending hmsg: {any}", .{hmsg}); + try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n{s}\r\n", .{ + hmsg.msg.subject, + hmsg.msg.sid, + hmsg.msg.reply_to orelse "", + hmsg.header_bytes, + hmsg.msg.payload.len, + hmsg.msg.payload, + }); + }, .@"-err" => |s| { _ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); }, diff --git a/src/server/main.zig b/src/server/main.zig index 81c8fbd..ba74987 100644 --- a/src/server/main.zig +++ b/src/server/main.zig @@ -83,6 +83,10 @@ pub fn start(server: *Server, io: std.Io, gpa: std.mem.Allocator) !void { server.info.port, ), io, .{ .reuse_address = true }); defer tcp_server.deinit(io); + std.log.debug("Server headers: {s}", .{if (server.info.headers) "true" else "false"}); + std.log.debug("Server max payload: {d}", .{server.info.max_payload}); + std.log.info("Server ID: {s}", .{server.info.server_id}); + std.log.info("Server name: {s}", .{server.info.server_name}); std.log.info("Server listening on {s}:{d}", .{ server.info.host, server.info.port }); var client_group: std.Io.Group = .init; @@ -185,9 +189,13 @@ fn handleConnection( // Respond to ping with pong. try client.send(io, .pong); }, - .@"pub" => |pb| { - defer pb.deinit(server_allocator); - try server.publishMessage(io, server_allocator, &client, pb); + .@"pub", .hpub => { + defer switch (msg) { + .@"pub" => |pb| pb.deinit(server_allocator), + .hpub => |hp| hp.deinit(server_allocator), + else => unreachable, + }; + try server.publishMessage(io, server_allocator, &client, msg); }, .sub => |sub| { try server.subscribe(io, server_allocator, id, sub); @@ -248,7 +256,7 @@ test subjectMatches { try std.testing.expect(subjectMatches("foo.>", "foo.bar.baz")); } -fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void { +fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message) !void { errdefer { if (source_client.connect) |c| { if (c.verbose) { @@ -256,20 +264,36 @@ fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_ } } } + const subject = switch (msg) { + .@"pub" => |pb| pb.subject, + .hpub => |hp| hp.@"pub".subject, + else => unreachable, + }; try server.subs_lock.lock(io); defer server.subs_lock.unlock(io); for (server.subscriptions.items) |subscription| { - if (subjectMatches(subscription.subject, msg.subject)) { + if (subjectMatches(subscription.subject, subject)) { const client = server.clients.get(subscription.client_id) orelse { std.debug.print("trying to publish to a client that no longer exists: {d}\n", .{subscription.client_id}); continue; }; - client.send(io, .{ - .msg = try msg.toMsg(alloc, subscription.sid), - }) catch |err| switch (err) { - error.Canceled => return err, - else => {}, - }; + + switch (msg) { + .@"pub" => |pb| client.send(io, .{ + .msg = try pb.toMsg(alloc, subscription.sid), + }) catch |err| switch (err) { + error.Canceled => return err, + else => {}, + }, + .hpub => |hp| client.send(io, .{ .hmsg = try hp.toHMsg( + alloc, + subscription.sid, + ) }) catch |err| switch (err) { + error.Canceled => return err, + else => {}, + }, + else => unreachable, + } } } if (source_client.connect) |c| { diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig index 68e7a20..5c86261 100644 --- a/src/server/message_parser.zig +++ b/src/server/message_parser.zig @@ -35,11 +35,11 @@ pub const Message = union(MessageType) { info: ServerInfo, connect: Connect, @"pub": Pub, - hpub: void, + hpub: HPub, sub: Sub, unsub: Unsub, msg: Msg, - hmsg: void, + hmsg: HMsg, ping, pong, @"+ok": void, @@ -148,6 +148,36 @@ pub const Message = union(MessageType) { return res.dupe(alloc); } }; + pub const HPub = struct { + header_bytes: usize, + @"pub": Pub, + + pub fn deinit(self: HPub, alloc: std.mem.Allocator) void { + self.@"pub".deinit(alloc); + } + + pub fn toHMsg(self: HPub, alloc: std.mem.Allocator, sid: []const u8) !HMsg { + return .{ + .header_bytes = self.header_bytes, + .msg = try self.@"pub".toMsg(alloc, sid), + }; + } + }; + + pub const HMsg = struct { + header_bytes: usize, + msg: Msg, + + pub fn deinit(self: HMsg, alloc: std.mem.Allocator) void { + self.msg.deinit(alloc); + } + + pub fn dupe(self: HMsg, alloc: std.mem.Allocator) !HMsg { + var res = self; + res.msg = try self.msg.dupe(alloc); + return res; + } + }; pub const Sub = struct { /// The subject name to subscribe to. subject: []const u8, @@ -235,6 +265,7 @@ pub const Message = union(MessageType) { return error.InvalidOperation; }; + std.log.debug("parsing {s}", .{operation_string.items}); switch (operation) { .connect => { // for storing the json string @@ -265,47 +296,10 @@ pub const Message = union(MessageType) { return .{ .connect = try res.dupe(alloc) }; }, .@"pub" => { - try in.discardAll(1); // throw away space - - // Parse subject - const subject: []const u8 = try readSubject(alloc, in); - errdefer alloc.free(subject); - - // Parse byte count - const byte_count = blk: { - var byte_count_list: std.ArrayList(u8) = try .initCapacity(alloc, 64); - defer byte_count_list.deinit(alloc); - while (in.peekByte()) |byte| { - if (std.ascii.isWhitespace(byte)) { - try expectStreamBytes(in, "\r\n"); - break; - } - defer in.toss(1); - - if (std.ascii.isDigit(byte)) { - try byte_count_list.append(alloc, byte); - } else { - return error.InvalidStream; - } - } else |err| return err; - - break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10); - }; - - const payload = blk: { - const bytes = try alloc.alloc(u8, byte_count); - errdefer alloc.free(bytes); - try in.readSliceAll(bytes); - try expectStreamBytes(in, "\r\n"); - break :blk bytes; - }; - - return .{ - .@"pub" = .{ - .subject = subject, - .payload = payload, - }, - }; + return parsePub(alloc, in); + }, + .hpub => { + return parseHPub(alloc, in); }, .ping => { try expectStreamBytes(in, "\r\n"); @@ -414,6 +408,217 @@ pub const Message = union(MessageType) { } }; +fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { + try in.discardAll(1); // throw away space + + // Parse subject + const subject: []const u8 = try readSubject(alloc, in); + errdefer alloc.free(subject); + + const second = blk: { + // Drop whitespace + while (in.peekByte()) |byte| { + if (std.ascii.isWhitespace(byte)) { + in.toss(1); + } else break; + } else |err| return err; + + var acc: std.ArrayList(u8) = .empty; + errdefer acc.deinit(alloc); + while (in.peekByte()) |byte| { + if (std.ascii.isWhitespace(byte)) break; + try acc.append(alloc, byte); + in.toss(1); + } else |err| return err; + + break :blk try acc.toOwnedSlice(alloc); + }; + defer alloc.free(second); + + const byte_count: usize, const reply_to: ?[]const u8 = + if (std.fmt.parseUnsigned(usize, second, 10)) |s| blk: { + try expectStreamBytes(in, "\r\n"); + break :blk .{ s, null }; + } else |_| .{ + blk: { + var byte_count_list: std.ArrayList(u8) = .empty; + defer byte_count_list.deinit(alloc); + try in.discardAll(1); // discard space + while (in.peekByte()) |byte| { + if (std.ascii.isWhitespace(byte)) { + try expectStreamBytes(in, "\r\n"); + break; + } + defer in.toss(1); + if (std.ascii.isDigit(byte)) { + try byte_count_list.append(alloc, byte); + } else { + return error.InvalidStream; + } + } else |err| return err; + break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10); + }, + try alloc.dupe(u8, second), + }; + + const payload = blk: { + const bytes = try alloc.alloc(u8, byte_count); + errdefer alloc.free(bytes); + try in.readSliceAll(bytes); + try expectStreamBytes(in, "\r\n"); + break :blk bytes; + }; + + return .{ + .@"pub" = .{ + .subject = subject, + .payload = payload, + .reply_to = reply_to, + }, + }; +} + +test parsePub { + { + var in: std.Io.Reader = .fixed(" foo 3\r\nbar\r\n"); + var res = try parsePub(std.testing.allocator, &in); + defer res.@"pub".deinit(std.testing.allocator); + try std.testing.expectEqualDeep( + Message{ .@"pub" = .{ + .subject = "foo", + .reply_to = null, + .payload = "bar", + } }, + res, + ); + } + + { + var in: std.Io.Reader = .fixed(" foo reply.to 3\r\nbar\r\n"); + var res = try parsePub(std.testing.allocator, &in); + defer res.@"pub".deinit(std.testing.allocator); + try std.testing.expectEqualDeep( + Message{ .@"pub" = .{ + .subject = "foo", + .reply_to = "reply.to", + .payload = "bar", + } }, + res, + ); + } + + // numeric reply subject + { + var in: std.Io.Reader = .fixed(" foo 5 3\r\nbar\r\n"); + var res = try parsePub(std.testing.allocator, &in); + defer res.@"pub".deinit(std.testing.allocator); + try std.testing.expectEqualDeep( + Message{ .@"pub" = .{ + .subject = "foo", + .reply_to = "reply.to", + .payload = "bar", + } }, + res, + ); + } +} + +fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { + try in.discardAll(1); // throw away space + + // Parse subject + const subject: []const u8 = try readSubject(alloc, in); + errdefer alloc.free(subject); + + const second = blk: { + // Drop whitespace + while (in.peekByte()) |byte| { + if (std.ascii.isWhitespace(byte)) { + in.toss(1); + } else break; + } else |err| return err; + + var acc: std.ArrayList(u8) = .empty; + errdefer acc.deinit(alloc); + while (in.peekByte()) |byte| { + if (std.ascii.isWhitespace(byte)) break; + try acc.append(alloc, byte); + in.toss(1); + } else |err| return err; + + break :blk try acc.toOwnedSlice(alloc); + }; + errdefer alloc.free(second); + + const header_byte_count: usize, const reply_to: ?[]const u8 = + if (std.fmt.parseUnsigned(usize, second, 10)) |s| blk: { + try expectStreamBytes(in, "\r\n"); + break :blk .{ s, null }; + } else |_| .{ + blk: { + var byte_count_list: std.ArrayList(u8) = .empty; + defer byte_count_list.deinit(alloc); + try in.discardAll(1); // discard space + while (in.peekByte()) |byte| { + if (std.ascii.isWhitespace(byte)) { + try expectStreamBytes(in, "\r\n"); + break; + } + defer in.toss(1); + if (std.ascii.isDigit(byte)) { + try byte_count_list.append(alloc, byte); + } else { + return error.InvalidStream; + } + } else |err| return err; + break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10); + }, + second, + }; + + std.log.debug("buffered: '{s}'", .{in.buffered()}); + + // Parse byte count + const byte_count = blk: { + var byte_count_list: std.ArrayList(u8) = .empty; + defer byte_count_list.deinit(alloc); + while (in.peekByte()) |byte| { + if (std.ascii.isWhitespace(byte)) { + try expectStreamBytes(in, "\r\n"); + break; + } + defer in.toss(1); + + if (std.ascii.isDigit(byte)) { + try byte_count_list.append(alloc, byte); + } else { + return error.InvalidStream; + } + } else |err| return err; + + break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10); + }; + + const payload = blk: { + const bytes = try alloc.alloc(u8, byte_count); + errdefer alloc.free(bytes); + try in.readSliceAll(bytes); + try expectStreamBytes(in, "\r\n"); + break :blk bytes; + }; + + return .{ + .hpub = .{ + .header_bytes = header_byte_count, + .@"pub" = .{ + .subject = subject, + .payload = payload, + .reply_to = reply_to, + }, + }, + }; +} + fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 { var subject_list: std.ArrayList(u8) = try .initCapacity(alloc, 1024); errdefer subject_list.deinit(alloc); -- cgit