summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/server/Client.zig150
-rw-r--r--src/server/Server.zig41
-rw-r--r--src/server/message_parser.zig60
3 files changed, 148 insertions, 103 deletions
diff --git a/src/server/Client.zig b/src/server/Client.zig
index 1e93360..8ff92e8 100644
--- a/src/server/Client.zig
+++ b/src/server/Client.zig
@@ -3,10 +3,16 @@ const std = @import("std");
const Client = @This();
+pub const Msgs = union(enum) {
+ MSG: Message.Msg,
+ HMSG: Message.HMsg,
+};
+
connect: ?Message.Connect,
// Messages for this client to receive.
recv_queue: *std.Io.Queue(Message),
+msg_queue: *std.Io.Queue(Msgs),
from_client: *std.Io.Reader,
to_client: *std.Io.Writer,
@@ -14,12 +20,14 @@ to_client: *std.Io.Writer,
pub fn init(
connect: ?Message.Connect,
recv_queue: *std.Io.Queue(Message),
+ msg_queue: *std.Io.Queue(Msgs),
in: *std.Io.Reader,
out: *std.Io.Writer,
) Client {
return .{
.connect = connect,
.recv_queue = recv_queue,
+ .msg_queue = msg_queue,
.from_client = in,
.to_client = out,
};
@@ -33,72 +41,98 @@ pub fn deinit(self: *Client, alloc: std.mem.Allocator) void {
}
pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
- var msgs: [256]Message = undefined;
+ var msgs_buf: [1024]Msgs = undefined;
+
+ var recv_msgs_task = io.concurrent(recvMsgsMsg, .{ self, io, &msgs_buf }) catch @panic("Concurrency unavailable");
+ errdefer _ = recv_msgs_task.cancel(io) catch {};
+
+ var recv_proto_task = io.concurrent(recvProtoMsg, .{ self, io }) catch unreachable;
+ errdefer _ = recv_proto_task.cancel(io) catch {};
while (true) {
- const len = try self.recv_queue.get(io, &msgs, 1);
- std.debug.assert(len <= msgs.len);
- for (0..len) |i| {
- const msg = msgs[i];
- defer switch (msg) {
- .MSG => |m| m.deinit(alloc),
- .HMSG => |h| h.deinit(alloc),
- else => {},
- };
- errdefer for (msgs[i + 1 .. len]) |mg| switch (mg) {
- .MSG => |m| {
- m.deinit(alloc);
- },
- else => {},
- };
- switch (msg) {
- .@"+OK" => {
- _ = try self.to_client.write("+OK\r\n");
- },
- .PONG => {
- _ = try self.to_client.write("PONG\r\n");
- },
- .INFO => |info| {
- _ = try self.to_client.write("INFO ");
- try std.json.Stringify.value(info, .{}, self.to_client);
- _ = try self.to_client.write("\r\n");
- },
- .MSG => |m| {
- @branchHint(.likely);
- try self.to_client.print(
- "MSG {s} {s} {s} {d}\r\n{s}\r\n",
- .{
- m.subject,
- m.sid,
- m.reply_to orelse "",
- m.payload.len,
- m.payload,
+ switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) {
+ .msgs => |msgs_err| {
+ @branchHint(.likely);
+ defer recv_msgs_task = io.concurrent(recvMsgsMsg, .{ self, io, &msgs_buf }) catch unreachable;
+ const msgs = try msgs_err;
+ for (0..msgs.len) |i| {
+ const msg = msgs[i];
+ defer switch (msg) {
+ .MSG => |m| m.deinit(alloc),
+ .HMSG => |h| h.deinit(alloc),
+ };
+ errdefer for (msgs[i + 1 ..]) |mg| switch (mg) {
+ .MSG => |m| {
+ m.deinit(alloc);
},
- );
- },
- .HMSG => |hmsg| {
- @branchHint(.likely);
- try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n{s}\r\n", .{
- hmsg.msg.subject,
- hmsg.msg.sid,
- hmsg.msg.reply_to orelse "",
- hmsg.header_bytes,
- hmsg.msg.payload.len,
- hmsg.msg.payload,
- });
- },
- .@"-ERR" => |s| {
- _ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
- },
- else => |m| {
- std.debug.panic("unimplemented write: {any}\n", .{m});
- },
- }
+ .HMSG => |h| {
+ h.deinit(alloc);
+ },
+ };
+ switch (msg) {
+ .MSG => |m| {
+ try self.to_client.print(
+ "MSG {s} {s} {s} {d}\r\n{s}\r\n",
+ .{
+ m.subject,
+ m.sid,
+ m.reply_to orelse "",
+ m.payload.len,
+ m.payload,
+ },
+ );
+ },
+ .HMSG => |hmsg| {
+ try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n{s}\r\n", .{
+ hmsg.msg.subject,
+ hmsg.msg.sid,
+ hmsg.msg.reply_to orelse "",
+ hmsg.header_bytes,
+ hmsg.msg.payload.len,
+ hmsg.msg.payload,
+ });
+ },
+ }
+ }
+ },
+ .proto => |msg_err| {
+ @branchHint(.unlikely);
+ defer recv_proto_task = io.concurrent(recvProtoMsg, .{ self, io }) catch unreachable;
+ const msg = try msg_err;
+ switch (msg) {
+ .@"+OK" => {
+ _ = try self.to_client.write("+OK\r\n");
+ },
+ .PONG => {
+ _ = try self.to_client.write("PONG\r\n");
+ },
+ .INFO => |info| {
+ _ = try self.to_client.write("INFO ");
+ try std.json.Stringify.value(info, .{}, self.to_client);
+ _ = try self.to_client.write("\r\n");
+ },
+ .@"-ERR" => |s| {
+ _ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
+ },
+ else => |m| {
+ std.debug.panic("unimplemented write: {any}\n", .{m});
+ },
+ }
+ },
}
try self.to_client.flush();
}
}
+fn recvProtoMsg(self: *Client, io: std.Io) !Message {
+ return self.recv_queue.getOne(io);
+}
+
+fn recvMsgsMsg(self: *Client, io: std.Io, buf: []Msgs) ![]Msgs {
+ const len = try self.msg_queue.get(io, buf, 1);
+ return buf[0..len];
+}
+
pub fn send(self: *Client, io: std.Io, msg: Message) !void {
try self.recv_queue.putOne(io, msg);
}
diff --git a/src/server/Server.zig b/src/server/Server.zig
index f7b849c..3fedfcb 100644
--- a/src/server/Server.zig
+++ b/src/server/Server.zig
@@ -16,13 +16,14 @@ pub const MessageType = message_parser.MessageType;
pub const Message = message_parser.Message;
const ServerInfo = Message.ServerInfo;
pub const Client = @import("./Client.zig");
+const Msgs = Client.Msgs;
const Server = @This();
pub const Subscription = struct {
subject: []const u8,
client_id: usize,
sid: []const u8,
- queue: *Queue(Message),
+ queue: *Queue(Msgs),
fn deinit(self: Subscription, alloc: Allocator) void {
alloc.free(self.subject);
@@ -152,29 +153,34 @@ fn handleConnection(
const in = &reader.interface;
// Set up buffer queue
- const qbuf: []Message = try server_allocator.alloc(Message, r_buffer.len / @sizeOf(Message));
+ const qbuf: []Message = try server_allocator.alloc(Message, 16);
defer server_allocator.free(qbuf);
- var queue: Queue(Message) = .init(qbuf);
+ var recv_queue: Queue(Message) = .init(qbuf);
+ defer recv_queue.close(io);
+
+ const mbuf: []Msgs = try server_allocator.alloc(Msgs, w_buf_size / (@sizeOf(Msgs) + 128));
+ defer server_allocator.free(mbuf);
+ var msgs_queue: Queue(Msgs) = .init(mbuf);
defer {
- queue.close(io);
- while (queue.getOne(io)) |msg| {
+ msgs_queue.close(io);
+ while (msgs_queue.getOne(io)) |msg| {
switch (msg) {
.MSG => |m| m.deinit(server_allocator),
.HMSG => |h| h.deinit(server_allocator),
- else => {},
}
} else |_| {}
}
// Create client
- var client: Client = .init(null, &queue, in, out);
+ var client: Client = .init(null, &recv_queue, &msgs_queue, in, out);
defer client.deinit(server_allocator);
try server.addClient(server_allocator, id, &client);
defer server.removeClient(io, server_allocator, id);
// Do initial handshake with client
- try queue.putOne(io, .{ .INFO = server.info });
+ // try recv_queue.putOne(io, .PONG);
+ try recv_queue.putOne(io, .{ .INFO = server.info });
var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator });
defer client_task.cancel(io) catch {};
@@ -186,18 +192,19 @@ fn handleConnection(
// Respond to ping with pong.
try client.send(io, .PONG);
},
- .PUB, .HPUB => {
+ .PUB => |pb| {
+ @branchHint(.likely);
+ defer pb.deinit(server_allocator);
+ try server.publishMessage(io, server_allocator, &client, msg);
+ },
+ .HPUB => |hp| {
@branchHint(.likely);
- defer switch (msg) {
- .PUB => |pb| pb.deinit(server_allocator),
- .HPUB => |hp| hp.deinit(server_allocator),
- else => unreachable,
- };
+ defer hp.deinit(server_allocator);
try server.publishMessage(io, server_allocator, &client, msg);
},
.SUB => |sub| {
defer sub.deinit(server_allocator);
- try server.subscribe(io, server_allocator, id, &queue, sub);
+ try server.subscribe(io, server_allocator, id, &msgs_queue, sub);
},
.UNSUB => |unsub| {
defer unsub.deinit(server_allocator);
@@ -295,7 +302,7 @@ fn subscribe(
io: Io,
gpa: Allocator,
id: usize,
- queue: *Queue(Message),
+ queue: *Queue(Msgs),
msg: Message.Sub,
) !void {
try server.subs_lock.lock(io);
@@ -339,7 +346,7 @@ fn getBufferSizes(io: Io) struct { usize, usize } {
const default = .{ default_size, default_size };
const dir = Dir.openDirAbsolute(io, "/proc/sys/net/core", .{}) catch {
- log.err("couldn't open /proc/sys/net/core", .{});
+ log.warn("couldn't open /proc/sys/net/core", .{});
return default;
};
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig
index 8b4859b..ff1a573 100644
--- a/src/server/message_parser.zig
+++ b/src/server/message_parser.zig
@@ -268,34 +268,7 @@ pub const Message = union(enum) {
switch (operation) {
.CONNECT => {
- // for storing the json string
- var connect_string_writer_allocating: AllocatingWriter = .init(alloc);
- defer connect_string_writer_allocating.deinit();
- var connect_string_writer = &connect_string_writer_allocating.writer;
-
- // for parsing the json string
- var connect_arena_allocator: ArenaAllocator = .init(alloc);
- defer connect_arena_allocator.deinit();
- const connect_allocator = connect_arena_allocator.allocator();
-
- try in.discardAll(1); // throw away space
-
- // Should read the next JSON object to the fixed buffer writer.
- _ = try in.streamDelimiter(connect_string_writer, '}');
- try connect_string_writer.writeByte('}');
- try expectStreamBytes(in, "}\r\n"); // discard '}\r\n'
-
- const connect_str = try connect_string_writer_allocating.toOwnedSlice();
- defer alloc.free(connect_str);
- // TODO: should be CONNECTION allocator
- const res = try std.json.parseFromSliceLeaky(
- Connect,
- connect_allocator,
- connect_str,
- .{ .allocate = .alloc_always },
- );
-
- return .{ .CONNECT = try res.dupe(alloc) };
+ return parseConnect(alloc, in);
},
.PUB => {
@branchHint(.likely);
@@ -324,6 +297,37 @@ pub const Message = union(enum) {
}
};
+fn parseConnect(alloc: Allocator, in: *Reader) !Message {
+ // for storing the json string
+ var connect_string_writer_allocating: AllocatingWriter = .init(alloc);
+ defer connect_string_writer_allocating.deinit();
+ var connect_string_writer = &connect_string_writer_allocating.writer;
+
+ // for parsing the json string
+ var connect_arena_allocator: ArenaAllocator = .init(alloc);
+ defer connect_arena_allocator.deinit();
+ const connect_allocator = connect_arena_allocator.allocator();
+
+ try in.discardAll(1); // throw away space
+
+ // Should read the next JSON object to the fixed buffer writer.
+ _ = try in.streamDelimiter(connect_string_writer, '}');
+ try connect_string_writer.writeByte('}');
+ try expectStreamBytes(in, "}\r\n"); // discard '}\r\n'
+
+ const connect_str = try connect_string_writer_allocating.toOwnedSlice();
+ defer alloc.free(connect_str);
+ // TODO: should be CONNECTION allocator
+ const res = try std.json.parseFromSliceLeaky(
+ Message.Connect,
+ connect_allocator,
+ connect_str,
+ .{ .allocate = .alloc_always },
+ );
+
+ return .{ .CONNECT = try res.dupe(alloc) };
+}
+
fn parseSub(alloc: Allocator, in: *Reader) !Message {
try in.discardAll(1); // throw away space
const subject = try readSubject(alloc, in, .sub);