summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.zig1
-rw-r--r--src/server/client.zig11
-rw-r--r--src/server/main.zig46
-rw-r--r--src/server/message_parser.zig291
4 files changed, 295 insertions, 54 deletions
diff --git a/src/main.zig b/src/main.zig
index a1414c8..7e4974c 100644
--- a/src/main.zig
+++ b/src/main.zig
@@ -57,6 +57,7 @@ pub fn main() !void {
.server_name = zits.Server.createName(),
.version = "zits-master",
.max_payload = 1048576,
+ .headers = true,
};
if (serve_matches.getSingleValue("port")) |port| {
info.port = std.fmt.parseUnsigned(@TypeOf(info.port), port, 10) catch |err| std.process.fatal("Could not parse port ({s}): {}\n", .{ port, err });
diff --git a/src/server/client.zig b/src/server/client.zig
index 23a0c9d..37fc60c 100644
--- a/src/server/client.zig
+++ b/src/server/client.zig
@@ -76,6 +76,17 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io
},
);
},
+ .hmsg => |hmsg| {
+ std.log.debug("Sending hmsg: {any}", .{hmsg});
+ try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n{s}\r\n", .{
+ hmsg.msg.subject,
+ hmsg.msg.sid,
+ hmsg.msg.reply_to orelse "",
+ hmsg.header_bytes,
+ hmsg.msg.payload.len,
+ hmsg.msg.payload,
+ });
+ },
.@"-err" => |s| {
_ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
},
diff --git a/src/server/main.zig b/src/server/main.zig
index 81c8fbd..ba74987 100644
--- a/src/server/main.zig
+++ b/src/server/main.zig
@@ -83,6 +83,10 @@ pub fn start(server: *Server, io: std.Io, gpa: std.mem.Allocator) !void {
server.info.port,
), io, .{ .reuse_address = true });
defer tcp_server.deinit(io);
+ std.log.debug("Server headers: {s}", .{if (server.info.headers) "true" else "false"});
+ std.log.debug("Server max payload: {d}", .{server.info.max_payload});
+ std.log.info("Server ID: {s}", .{server.info.server_id});
+ std.log.info("Server name: {s}", .{server.info.server_name});
std.log.info("Server listening on {s}:{d}", .{ server.info.host, server.info.port });
var client_group: std.Io.Group = .init;
@@ -185,9 +189,13 @@ fn handleConnection(
// Respond to ping with pong.
try client.send(io, .pong);
},
- .@"pub" => |pb| {
- defer pb.deinit(server_allocator);
- try server.publishMessage(io, server_allocator, &client, pb);
+ .@"pub", .hpub => {
+ defer switch (msg) {
+ .@"pub" => |pb| pb.deinit(server_allocator),
+ .hpub => |hp| hp.deinit(server_allocator),
+ else => unreachable,
+ };
+ try server.publishMessage(io, server_allocator, &client, msg);
},
.sub => |sub| {
try server.subscribe(io, server_allocator, id, sub);
@@ -248,7 +256,7 @@ test subjectMatches {
try std.testing.expect(subjectMatches("foo.>", "foo.bar.baz"));
}
-fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void {
+fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message) !void {
errdefer {
if (source_client.connect) |c| {
if (c.verbose) {
@@ -256,20 +264,36 @@ fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_
}
}
}
+ const subject = switch (msg) {
+ .@"pub" => |pb| pb.subject,
+ .hpub => |hp| hp.@"pub".subject,
+ else => unreachable,
+ };
try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io);
for (server.subscriptions.items) |subscription| {
- if (subjectMatches(subscription.subject, msg.subject)) {
+ if (subjectMatches(subscription.subject, subject)) {
const client = server.clients.get(subscription.client_id) orelse {
std.debug.print("trying to publish to a client that no longer exists: {d}\n", .{subscription.client_id});
continue;
};
- client.send(io, .{
- .msg = try msg.toMsg(alloc, subscription.sid),
- }) catch |err| switch (err) {
- error.Canceled => return err,
- else => {},
- };
+
+ switch (msg) {
+ .@"pub" => |pb| client.send(io, .{
+ .msg = try pb.toMsg(alloc, subscription.sid),
+ }) catch |err| switch (err) {
+ error.Canceled => return err,
+ else => {},
+ },
+ .hpub => |hp| client.send(io, .{ .hmsg = try hp.toHMsg(
+ alloc,
+ subscription.sid,
+ ) }) catch |err| switch (err) {
+ error.Canceled => return err,
+ else => {},
+ },
+ else => unreachable,
+ }
}
}
if (source_client.connect) |c| {
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig
index 68e7a20..5c86261 100644
--- a/src/server/message_parser.zig
+++ b/src/server/message_parser.zig
@@ -35,11 +35,11 @@ pub const Message = union(MessageType) {
info: ServerInfo,
connect: Connect,
@"pub": Pub,
- hpub: void,
+ hpub: HPub,
sub: Sub,
unsub: Unsub,
msg: Msg,
- hmsg: void,
+ hmsg: HMsg,
ping,
pong,
@"+ok": void,
@@ -148,6 +148,36 @@ pub const Message = union(MessageType) {
return res.dupe(alloc);
}
};
+ pub const HPub = struct {
+ header_bytes: usize,
+ @"pub": Pub,
+
+ pub fn deinit(self: HPub, alloc: std.mem.Allocator) void {
+ self.@"pub".deinit(alloc);
+ }
+
+ pub fn toHMsg(self: HPub, alloc: std.mem.Allocator, sid: []const u8) !HMsg {
+ return .{
+ .header_bytes = self.header_bytes,
+ .msg = try self.@"pub".toMsg(alloc, sid),
+ };
+ }
+ };
+
+ pub const HMsg = struct {
+ header_bytes: usize,
+ msg: Msg,
+
+ pub fn deinit(self: HMsg, alloc: std.mem.Allocator) void {
+ self.msg.deinit(alloc);
+ }
+
+ pub fn dupe(self: HMsg, alloc: std.mem.Allocator) !HMsg {
+ var res = self;
+ res.msg = try self.msg.dupe(alloc);
+ return res;
+ }
+ };
pub const Sub = struct {
/// The subject name to subscribe to.
subject: []const u8,
@@ -235,6 +265,7 @@ pub const Message = union(MessageType) {
return error.InvalidOperation;
};
+ std.log.debug("parsing {s}", .{operation_string.items});
switch (operation) {
.connect => {
// for storing the json string
@@ -265,47 +296,10 @@ pub const Message = union(MessageType) {
return .{ .connect = try res.dupe(alloc) };
},
.@"pub" => {
- try in.discardAll(1); // throw away space
-
- // Parse subject
- const subject: []const u8 = try readSubject(alloc, in);
- errdefer alloc.free(subject);
-
- // Parse byte count
- const byte_count = blk: {
- var byte_count_list: std.ArrayList(u8) = try .initCapacity(alloc, 64);
- defer byte_count_list.deinit(alloc);
- while (in.peekByte()) |byte| {
- if (std.ascii.isWhitespace(byte)) {
- try expectStreamBytes(in, "\r\n");
- break;
- }
- defer in.toss(1);
-
- if (std.ascii.isDigit(byte)) {
- try byte_count_list.append(alloc, byte);
- } else {
- return error.InvalidStream;
- }
- } else |err| return err;
-
- break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
- };
-
- const payload = blk: {
- const bytes = try alloc.alloc(u8, byte_count);
- errdefer alloc.free(bytes);
- try in.readSliceAll(bytes);
- try expectStreamBytes(in, "\r\n");
- break :blk bytes;
- };
-
- return .{
- .@"pub" = .{
- .subject = subject,
- .payload = payload,
- },
- };
+ return parsePub(alloc, in);
+ },
+ .hpub => {
+ return parseHPub(alloc, in);
},
.ping => {
try expectStreamBytes(in, "\r\n");
@@ -414,6 +408,217 @@ pub const Message = union(MessageType) {
}
};
+fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
+ try in.discardAll(1); // throw away space
+
+ // Parse subject
+ const subject: []const u8 = try readSubject(alloc, in);
+ errdefer alloc.free(subject);
+
+ const second = blk: {
+ // Drop whitespace
+ while (in.peekByte()) |byte| {
+ if (std.ascii.isWhitespace(byte)) {
+ in.toss(1);
+ } else break;
+ } else |err| return err;
+
+ var acc: std.ArrayList(u8) = .empty;
+ errdefer acc.deinit(alloc);
+ while (in.peekByte()) |byte| {
+ if (std.ascii.isWhitespace(byte)) break;
+ try acc.append(alloc, byte);
+ in.toss(1);
+ } else |err| return err;
+
+ break :blk try acc.toOwnedSlice(alloc);
+ };
+ defer alloc.free(second);
+
+ const byte_count: usize, const reply_to: ?[]const u8 =
+ if (std.fmt.parseUnsigned(usize, second, 10)) |s| blk: {
+ try expectStreamBytes(in, "\r\n");
+ break :blk .{ s, null };
+ } else |_| .{
+ blk: {
+ var byte_count_list: std.ArrayList(u8) = .empty;
+ defer byte_count_list.deinit(alloc);
+ try in.discardAll(1); // discard space
+ while (in.peekByte()) |byte| {
+ if (std.ascii.isWhitespace(byte)) {
+ try expectStreamBytes(in, "\r\n");
+ break;
+ }
+ defer in.toss(1);
+ if (std.ascii.isDigit(byte)) {
+ try byte_count_list.append(alloc, byte);
+ } else {
+ return error.InvalidStream;
+ }
+ } else |err| return err;
+ break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
+ },
+ try alloc.dupe(u8, second),
+ };
+
+ const payload = blk: {
+ const bytes = try alloc.alloc(u8, byte_count);
+ errdefer alloc.free(bytes);
+ try in.readSliceAll(bytes);
+ try expectStreamBytes(in, "\r\n");
+ break :blk bytes;
+ };
+
+ return .{
+ .@"pub" = .{
+ .subject = subject,
+ .payload = payload,
+ .reply_to = reply_to,
+ },
+ };
+}
+
+test parsePub {
+ {
+ var in: std.Io.Reader = .fixed(" foo 3\r\nbar\r\n");
+ var res = try parsePub(std.testing.allocator, &in);
+ defer res.@"pub".deinit(std.testing.allocator);
+ try std.testing.expectEqualDeep(
+ Message{ .@"pub" = .{
+ .subject = "foo",
+ .reply_to = null,
+ .payload = "bar",
+ } },
+ res,
+ );
+ }
+
+ {
+ var in: std.Io.Reader = .fixed(" foo reply.to 3\r\nbar\r\n");
+ var res = try parsePub(std.testing.allocator, &in);
+ defer res.@"pub".deinit(std.testing.allocator);
+ try std.testing.expectEqualDeep(
+ Message{ .@"pub" = .{
+ .subject = "foo",
+ .reply_to = "reply.to",
+ .payload = "bar",
+ } },
+ res,
+ );
+ }
+
+ // numeric reply subject
+ {
+ var in: std.Io.Reader = .fixed(" foo 5 3\r\nbar\r\n");
+ var res = try parsePub(std.testing.allocator, &in);
+ defer res.@"pub".deinit(std.testing.allocator);
+ try std.testing.expectEqualDeep(
+ Message{ .@"pub" = .{
+ .subject = "foo",
+ .reply_to = "reply.to",
+ .payload = "bar",
+ } },
+ res,
+ );
+ }
+}
+
+fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
+ try in.discardAll(1); // throw away space
+
+ // Parse subject
+ const subject: []const u8 = try readSubject(alloc, in);
+ errdefer alloc.free(subject);
+
+ const second = blk: {
+ // Drop whitespace
+ while (in.peekByte()) |byte| {
+ if (std.ascii.isWhitespace(byte)) {
+ in.toss(1);
+ } else break;
+ } else |err| return err;
+
+ var acc: std.ArrayList(u8) = .empty;
+ errdefer acc.deinit(alloc);
+ while (in.peekByte()) |byte| {
+ if (std.ascii.isWhitespace(byte)) break;
+ try acc.append(alloc, byte);
+ in.toss(1);
+ } else |err| return err;
+
+ break :blk try acc.toOwnedSlice(alloc);
+ };
+ errdefer alloc.free(second);
+
+ const header_byte_count: usize, const reply_to: ?[]const u8 =
+ if (std.fmt.parseUnsigned(usize, second, 10)) |s| blk: {
+ try expectStreamBytes(in, "\r\n");
+ break :blk .{ s, null };
+ } else |_| .{
+ blk: {
+ var byte_count_list: std.ArrayList(u8) = .empty;
+ defer byte_count_list.deinit(alloc);
+ try in.discardAll(1); // discard space
+ while (in.peekByte()) |byte| {
+ if (std.ascii.isWhitespace(byte)) {
+ try expectStreamBytes(in, "\r\n");
+ break;
+ }
+ defer in.toss(1);
+ if (std.ascii.isDigit(byte)) {
+ try byte_count_list.append(alloc, byte);
+ } else {
+ return error.InvalidStream;
+ }
+ } else |err| return err;
+ break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
+ },
+ second,
+ };
+
+ std.log.debug("buffered: '{s}'", .{in.buffered()});
+
+ // Parse byte count
+ const byte_count = blk: {
+ var byte_count_list: std.ArrayList(u8) = .empty;
+ defer byte_count_list.deinit(alloc);
+ while (in.peekByte()) |byte| {
+ if (std.ascii.isWhitespace(byte)) {
+ try expectStreamBytes(in, "\r\n");
+ break;
+ }
+ defer in.toss(1);
+
+ if (std.ascii.isDigit(byte)) {
+ try byte_count_list.append(alloc, byte);
+ } else {
+ return error.InvalidStream;
+ }
+ } else |err| return err;
+
+ break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
+ };
+
+ const payload = blk: {
+ const bytes = try alloc.alloc(u8, byte_count);
+ errdefer alloc.free(bytes);
+ try in.readSliceAll(bytes);
+ try expectStreamBytes(in, "\r\n");
+ break :blk bytes;
+ };
+
+ return .{
+ .hpub = .{
+ .header_bytes = header_byte_count,
+ .@"pub" = .{
+ .subject = subject,
+ .payload = payload,
+ .reply_to = reply_to,
+ },
+ },
+ };
+}
+
fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 {
var subject_list: std.ArrayList(u8) = try .initCapacity(alloc, 1024);
errdefer subject_list.deinit(alloc);