summaryrefslogtreecommitdiff
path: root/src/Server.zig
diff options
context:
space:
mode:
authorRobby Zambito <contact@robbyzambito.me>2026-01-07 22:48:50 -0500
committerRobby Zambito <contact@robbyzambito.me>2026-01-07 23:19:19 -0500
commit45feccbad8c7306c15137a6003f3df1183d9c2a9 (patch)
tree5a541a2e45eb2fbe8f0ec4ba3da0829d029ccd45 /src/Server.zig
parent96a3705069cf33a00ded143f876734c2a045cf1e (diff)
WAY FASTER but doesn't send all?
Seems to not flush the last message
Diffstat (limited to 'src/Server.zig')
-rw-r--r--src/Server.zig141
1 files changed, 84 insertions, 57 deletions
diff --git a/src/Server.zig b/src/Server.zig
index 21eecb4..b5f9ee9 100644
--- a/src/Server.zig
+++ b/src/Server.zig
@@ -13,10 +13,11 @@ const Stream = std.Io.net.Stream;
pub const Client = @import("./Server/Client.zig");
-pub const parse = @import("./Server/parse.zig");
+pub const message = @import("./Server/message.zig");
+const parse = message.parse;
-const MessageType = parse.MessageType;
-const Message = parse.Message;
+const MessageType = message.Control;
+const Message = message.Message;
const ServerInfo = Message.ServerInfo;
const Msgs = Client.Msgs;
@@ -29,9 +30,8 @@ const Subscription = struct {
client_id: usize,
sid: []const u8,
queue_group: ?[]const u8,
- queue: *Queue(Msgs),
- // used to alloc messages in the queue
- alloc: Allocator,
+ queue_lock: *Mutex,
+ queue: *Queue(u8),
fn deinit(self: Subscription, alloc: Allocator) void {
alloc.free(self.subject);
@@ -168,66 +168,57 @@ fn handleConnection(
const in = &reader.interface;
// Set up buffer queue
- const qbuf: []Message = try alloc.alloc(Message, 16);
+ const qbuf: []u8 = try alloc.alloc(u8, r_buf_size);
defer alloc.free(qbuf);
- var recv_queue: Queue(Message) = .init(qbuf);
+ var recv_queue: Queue(u8) = .init(qbuf);
defer recv_queue.close(io);
- const mbuf: []Msgs = try alloc.alloc(Msgs, w_buf_size / @sizeOf(Msgs));
- defer alloc.free(mbuf);
- var msgs_queue: Queue(Msgs) = .init(mbuf);
- defer {
- msgs_queue.close(io);
- while (msgs_queue.getOne(io)) |msg| {
- switch (msg) {
- .MSG => |m| m.deinit(alloc),
- .HMSG => |h| h.deinit(alloc),
- }
- } else |_| {}
- }
-
// Create client
- var client: Client = .init(null, alloc, &recv_queue, &msgs_queue, in, out);
+ var client: Client = .init(null, &recv_queue, in, out);
defer client.deinit(server_allocator);
try server.addClient(server_allocator, id, &client);
defer server.removeClient(io, server_allocator, id);
// Do initial handshake with client
- // try recv_queue.putOne(io, .PONG);
- try recv_queue.putOne(io, .{ .INFO = server.info });
+ _ = try out.write("INFO ");
+ try std.json.Stringify.value(server.info, .{}, out);
+ _ = try out.write("\r\n");
+ try out.flush();
var client_task = try io.concurrent(Client.start, .{ &client, io });
defer client_task.cancel(io) catch {};
- while (client.next(server_allocator)) |ctrl| {
+ while (client.next()) |ctrl| {
switch (ctrl) {
.PING => {
// Respond to ping with pong.
try client.recv_queue_write_lock.lock(io);
defer client.recv_queue_write_lock.unlock(io);
- try client.send(io, "PONG\r\n");
+ _ = try client.from_client.take(2);
+ try client.recv_queue.putAll(io, "PONG\r\n");
+ // try client.send(io, "PONG\r\n");
},
.PUB => {
@branchHint(.likely);
- try server.publishMessage(io, server_allocator, &client, msg);
+ // log.debug("received a pub msg", .{});
+ try server.publishMessage(io, server_allocator, &client, .@"pub");
},
.HPUB => {
@branchHint(.likely);
- try server.publishMessage(io, server_allocator, &client, msg);
+ try server.publishMessage(io, server_allocator, &client, .hpub);
},
.SUB => {
- try server.subscribe(io, server_allocator, client, id, sub);
+ try server.subscribe(io, server_allocator, &client, id);
},
.UNSUB => {
- defer unsub.deinit(server_allocator);
- try server.unsubscribe(io, server_allocator, id, unsub);
+ try server.unsubscribe(io, server_allocator, client, id);
},
.CONNECT => {
if (client.connect) |*current| {
current.deinit(server_allocator);
}
- client.connect = connect;
+ client.connect = try parse.connect(server_allocator, client.from_client);
},
else => |e| {
panic("Unimplemented message: {any}\n", .{e});
@@ -279,19 +270,26 @@ fn publishMessage(
io: Io,
alloc: Allocator,
source_client: *Client,
- msg: Message,
+ comptime pub_or_hpub: enum { @"pub", hpub },
) !void {
defer if (source_client.connect) |c| {
if (c.verbose) {
- source_client.send(io, .@"+OK") catch {};
+ if (source_client.recv_queue_write_lock.lock(io)) |_| {
+ defer source_client.recv_queue_write_lock.unlock(io);
+ source_client.recv_queue.putAll(io, "+OK\r\n") catch {};
+ } else |_| {}
}
};
- const subject = switch (msg) {
- .PUB => |pb| pb.subject,
- .HPUB => |hp| hp.@"pub".subject,
- else => unreachable,
- };
+ _ = pub_or_hpub;
+
+ const msg = try parse.@"pub"(source_client.from_client);
+
+ // const subject = switch (pub_or_hpub) {
+ // .PUB => |pb| pb.subject,
+ // .HPUB => |hp| hp.@"pub".subject,
+ // else => unreachable,
+ // };
try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io);
var published_queue_groups: ArrayList([]const u8) = .empty;
@@ -301,7 +299,7 @@ fn publishMessage(
subs: for (0..server.subscriptions.items.len) |i| {
const subscription = server.subscriptions.items[i];
- if (subjectMatches(subscription.subject, subject)) {
+ if (subjectMatches(subscription.subject, msg.subject)) {
if (subscription.queue_group) |sg| {
for (published_queue_groups.items) |g| {
if (eql(u8, g, sg)) {
@@ -314,19 +312,46 @@ fn publishMessage(
// to prioritize other subscriptions in the queue next time.
try published_queue_sub_idxs.append(alloc, i);
}
- switch (msg) {
- .PUB => |pb| {
- try subscription.queue.putOne(io, .{
- .MSG = try pb.toMsg(subscription.alloc, subscription.sid),
- });
- },
- .HPUB => |hp| {
- try subscription.queue.putOne(io, .{
- .HMSG = try hp.toHMsg(subscription.alloc, subscription.sid),
- });
+
+ const m = msg.toMsg(subscription.sid);
+ var msg_line_buf: [1024]u8 = undefined;
+ var msg_line_writer: std.Io.Writer = .fixed(&msg_line_buf);
+
+ // try self.to_client.print(
+ // ,
+
+ // );
+ // try m.payload.write(self.to_client);
+ // try self.to_client.print("\r\n", .{});
+ try msg_line_writer.print(
+ "MSG {s} {s} {s} {d}\r\n",
+ .{
+ m.subject,
+ m.sid,
+ m.reply_to orelse "",
+ m.payload.len,
},
- else => unreachable,
- }
+ );
+
+ try subscription.queue_lock.lock(io);
+ defer subscription.queue_lock.unlock(io);
+ try subscription.queue.putAll(io, msg_line_writer.buffered());
+ try subscription.queue.putAll(io, m.payload);
+ try subscription.queue.putAll(io, "\r\n");
+
+ // switch (msg) {
+ // .PUB => |pb| {
+ // try subscription.queue.putOne(io, .{
+ // .MSG = try pb.toMsg(subscription.alloc, subscription.sid),
+ // });
+ // },
+ // .HPUB => |hp| {
+ // try subscription.queue.putOne(io, .{
+ // .HMSG = try hp.toHMsg(subscription.alloc, subscription.sid),
+ // });
+ // },
+ // else => unreachable,
+ // }
}
}
@@ -340,10 +365,11 @@ fn subscribe(
server: *Server,
io: Io,
gpa: Allocator,
- client: Client,
+ client: *Client,
id: usize,
- msg: Message.Sub,
+ // msg: Message.Sub,
) !void {
+ const msg = try parse.sub(client.from_client);
try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io);
const subject = try gpa.dupe(u8, msg.subject);
@@ -357,8 +383,8 @@ fn subscribe(
.client_id = id,
.sid = sid,
.queue_group = queue_group,
- .queue = client.msg_queue,
- .alloc = client.alloc,
+ .queue_lock = &client.recv_queue_write_lock,
+ .queue = client.recv_queue,
});
}
@@ -366,9 +392,10 @@ fn unsubscribe(
server: *Server,
io: Io,
gpa: Allocator,
+ client: Client,
id: usize,
- msg: Message.Unsub,
) !void {
+ const msg = try parse.unsub(client.from_client);
try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io);
const len = server.subscriptions.items.len;