summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/server/Client.zig10
-rw-r--r--src/server/Server.zig2
-rw-r--r--src/server/message_parser.zig75
3 files changed, 68 insertions, 19 deletions
diff --git a/src/server/Client.zig b/src/server/Client.zig
index 8ff92e8..4eceb89 100644
--- a/src/server/Client.zig
+++ b/src/server/Client.zig
@@ -72,25 +72,27 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
switch (msg) {
.MSG => |m| {
try self.to_client.print(
- "MSG {s} {s} {s} {d}\r\n{s}\r\n",
+ "MSG {s} {s} {s} {d}\r\n",
.{
m.subject,
m.sid,
m.reply_to orelse "",
m.payload.len,
- m.payload,
},
);
+ try m.payload.write(self.to_client);
+ try self.to_client.print("\r\n", .{});
},
.HMSG => |hmsg| {
- try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n{s}\r\n", .{
+ try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{
hmsg.msg.subject,
hmsg.msg.sid,
hmsg.msg.reply_to orelse "",
hmsg.header_bytes,
hmsg.msg.payload.len,
- hmsg.msg.payload,
});
+ try hmsg.msg.payload.write(self.to_client);
+ try self.to_client.print("\r\n", .{});
},
}
}
diff --git a/src/server/Server.zig b/src/server/Server.zig
index 3fedfcb..6b1eda5 100644
--- a/src/server/Server.zig
+++ b/src/server/Server.zig
@@ -158,7 +158,7 @@ fn handleConnection(
var recv_queue: Queue(Message) = .init(qbuf);
defer recv_queue.close(io);
- const mbuf: []Msgs = try server_allocator.alloc(Msgs, w_buf_size / (@sizeOf(Msgs) + 128));
+ const mbuf: []Msgs = try server_allocator.alloc(Msgs, w_buf_size / @sizeOf(Msgs));
defer server_allocator.free(mbuf);
var msgs_queue: Queue(Msgs) = .init(mbuf);
defer {
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig
index ff1a573..81b3858 100644
--- a/src/server/message_parser.zig
+++ b/src/server/message_parser.zig
@@ -5,7 +5,8 @@ const ArrayList = std.ArrayList;
const StaticStringMap = std.StaticStringMap;
const Io = std.Io;
-const AllocatingWriter = Io.Writer.Allocating;
+const Writer = Io.Writer;
+const AllocatingWriter = Writer.Allocating;
const Reader = Io.Reader;
const ascii = std.ascii;
@@ -17,6 +18,53 @@ const parseUnsigned = std.fmt.parseUnsigned;
const log = std.log;
+pub const Payload = struct {
+ len: u32,
+ short: [128]u8,
+ long: ?[]u8,
+
+ pub fn read(alloc: Allocator, in: *Reader, bytes: usize) !Payload {
+ var res: Payload = .{
+ .len = @intCast(bytes),
+ .short = undefined,
+ .long = null,
+ };
+
+ try in.readSliceAll(res.short[0..@min(bytes, res.short.len)]);
+ if (bytes > res.short.len) {
+ const long = try alloc.alloc(u8, bytes - res.short.len);
+ errdefer alloc.free(long);
+ try in.readSliceAll(long);
+ res.long = long;
+ }
+ return res;
+ }
+
+ pub fn write(self: Payload, out: *Writer) !void {
+ std.debug.assert(out.buffer.len >= self.short.len);
+ std.debug.assert(self.len <= self.short.len or self.long != null);
+ try out.writeAll(self.short[0..@min(self.len, self.short.len)]);
+ if (self.long) |l| {
+ try out.writeAll(l);
+ }
+ }
+
+ pub fn deinit(self: Payload, alloc: Allocator) void {
+ if (self.long) |l| {
+ alloc.free(l);
+ }
+ }
+
+ pub fn dupe(self: Payload, alloc: Allocator) !Payload {
+ var res = self;
+ if (self.long) |l| {
+ res.long = try alloc.dupe(u8, l);
+ }
+ errdefer if (res.long) |l| alloc.free(l);
+ return res;
+ }
+};
+
pub const MessageType = @typeInfo(Message).@"union".tag_type.?;
pub const Message = union(enum) {
@@ -118,11 +166,11 @@ pub const Message = union(enum) {
/// The reply subject that subscribers can use to send a response back to the publisher/requestor.
reply_to: ?[]const u8 = null,
/// The message payload data.
- payload: []const u8,
+ payload: Payload,
pub fn deinit(self: Pub, alloc: Allocator) void {
alloc.free(self.subject);
- alloc.free(self.payload);
+ self.payload.deinit(alloc);
if (self.reply_to) |r| alloc.free(r);
}
@@ -163,6 +211,7 @@ pub const Message = union(enum) {
pub fn dupe(self: HMsg, alloc: Allocator) !HMsg {
var res = self;
res.msg = try self.msg.dupe(alloc);
+ errdefer alloc.free(res.msg);
return res;
}
};
@@ -194,13 +243,13 @@ pub const Message = union(enum) {
subject: []const u8,
sid: []const u8,
reply_to: ?[]const u8,
- payload: []const u8,
+ payload: Payload,
pub fn deinit(self: Msg, alloc: Allocator) void {
alloc.free(self.subject);
alloc.free(self.sid);
if (self.reply_to) |r| alloc.free(r);
- alloc.free(self.payload);
+ self.payload.deinit(alloc);
}
pub fn dupe(self: Msg, alloc: Allocator) !Msg {
@@ -211,7 +260,7 @@ pub const Message = union(enum) {
errdefer alloc.free(res.sid);
res.reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null;
errdefer if (res.reply_to) |r| alloc.free(r);
- res.payload = try alloc.dupe(u8, self.payload);
+ res.payload = try self.payload.dupe(alloc);
errdefer alloc.free(res.payload);
return res;
}
@@ -613,8 +662,6 @@ fn parsePub(alloc: Allocator, in: *Reader) !Message {
defer second.deinit(alloc);
var third: ?ArrayList(u8) = null;
defer if (third) |*t| t.deinit(alloc);
- var payload: AllocatingWriter = .init(alloc);
- errdefer payload.deinit();
sw: switch (@as(States, .before_second)) {
.before_second => {
@@ -671,13 +718,14 @@ fn parsePub(alloc: Allocator, in: *Reader) !Message {
try parseUnsigned(usize, second.items, 10),
};
- try in.streamExact(&payload.writer, bytes);
+ const payload: Payload = try .read(alloc, in, bytes);
+ errdefer payload.deinit(alloc);
try expectStreamBytes(in, "\r\n");
return .{
.PUB = .{
.subject = subject,
- .payload = try payload.toOwnedSlice(),
+ .payload = payload,
.reply_to = reply_to,
},
};
@@ -763,8 +811,6 @@ fn parseHPub(alloc: Allocator, in: *Reader) !Message {
defer third.deinit(alloc);
var fourth: ?ArrayList(u8) = null;
defer if (fourth) |*f| f.deinit(alloc);
- var payload: AllocatingWriter = .init(alloc);
- errdefer payload.deinit();
sw: switch (@as(States, .before_second)) {
.before_second => {
@@ -843,7 +889,8 @@ fn parseHPub(alloc: Allocator, in: *Reader) !Message {
try parseUnsigned(usize, third.items, 10),
};
- try in.streamExact(&payload.writer, total_bytes);
+ const payload: Payload = try .read(alloc, in, total_bytes);
+ errdefer payload.deinit(alloc);
try expectStreamBytes(in, "\r\n");
return .{
@@ -851,7 +898,7 @@ fn parseHPub(alloc: Allocator, in: *Reader) !Message {
.header_bytes = header_bytes,
.@"pub" = .{
.subject = subject,
- .payload = try payload.toOwnedSlice(),
+ .payload = payload,
.reply_to = reply_to,
},
},