diff options
Diffstat (limited to 'src/server/Server.zig')
| -rw-r--r-- | src/server/Server.zig | 172 |
1 files changed, 117 insertions, 55 deletions
diff --git a/src/server/Server.zig b/src/server/Server.zig index eaecdf2..f7b849c 100644 --- a/src/server/Server.zig +++ b/src/server/Server.zig @@ -4,6 +4,7 @@ const ArrayList = std.ArrayList; const AutoHashMapUnmanaged = std.AutoHashMapUnmanaged; const Io = std.Io; +const Dir = Io.Dir; const Group = Io.Group; const IpAddress = std.Io.net.IpAddress; const Mutex = Io.Mutex; @@ -21,6 +22,7 @@ pub const Subscription = struct { subject: []const u8, client_id: usize, sid: []const u8, + queue: *Queue(Message), fn deinit(self: Subscription, alloc: Allocator) void { alloc.free(self.subject); @@ -63,13 +65,24 @@ pub fn start(server: *Server, io: Io, gpa: Allocator) !void { var client_group: Group = .init; defer client_group.cancel(io); + const read_buffer_size, const write_buffer_size = getBufferSizes(io); + log.debug("read buf: {d} write buf: {d}", .{ read_buffer_size, write_buffer_size }); + var id: usize = 0; while (true) : (id +%= 1) { if (server.clients.contains(id)) continue; log.debug("Accepting next client", .{}); const stream = try tcp_server.accept(io); log.debug("Accepted connection {d}", .{id}); - _ = client_group.concurrent(io, handleConnectionInfallible, .{ server, gpa, io, id, stream }) catch { + _ = client_group.concurrent(io, handleConnectionInfallible, .{ + server, + gpa, + io, + id, + stream, + read_buffer_size, + write_buffer_size, + }) catch { log.err("Could not start concurrent handler for {d}", .{id}); stream.close(io); }; @@ -96,13 +109,29 @@ fn removeClient(server: *Server, io: Io, allocator: Allocator, id: usize) void { } } -fn handleConnectionInfallible(server: *Server, server_allocator: Allocator, io: Io, id: usize, stream: Stream) void { - handleConnection(server, server_allocator, io, id, stream) catch |err| { +fn handleConnectionInfallible( + server: *Server, + server_allocator: Allocator, + io: Io, + id: usize, + stream: Stream, + r_buf_size: usize, + w_buf_size: usize, +) void { + handleConnection(server, server_allocator, io, id, stream, r_buf_size, w_buf_size) catch |err| { log.err("Failed processing client {d}: {any}", .{ id, err }); }; } -fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: usize, stream: Stream) !void { +fn handleConnection( + server: *Server, + server_allocator: Allocator, + io: Io, + id: usize, + stream: Stream, + r_buf_size: usize, + w_buf_size: usize, +) !void { defer stream.close(io); // TODO: use a client allocator for things that should only live for as long as the client? @@ -111,26 +140,27 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us // messages when done processing them (usually outside the client process, ie: publish). // Set up client writer - // TODO: how many bytes can fit in a network write syscall? cat /proc/sys/net/core/wmem_max - var w_buffer: [212992]u8 = undefined; - var writer = stream.writer(io, &w_buffer); + const w_buffer: []u8 = try server_allocator.alloc(u8, w_buf_size); + defer server_allocator.free(w_buffer); + var writer = stream.writer(io, w_buffer); const out = &writer.interface; // Set up client reader - // TODO: how many bytes can fit in a network read syscall? cat /proc/sys/net/core/rmem_max - var r_buffer: [212992]u8 = undefined; - var reader = stream.reader(io, &r_buffer); + const r_buffer: []u8 = try server_allocator.alloc(u8, r_buf_size); + defer server_allocator.free(r_buffer); + var reader = stream.reader(io, r_buffer); const in = &reader.interface; // Set up buffer queue - var qbuf: [r_buffer.len / @sizeOf(Message)]Message = undefined; - var queue: Queue(Message) = .init(&qbuf); + const qbuf: []Message = try server_allocator.alloc(Message, r_buffer.len / @sizeOf(Message)); + defer server_allocator.free(qbuf); + var queue: Queue(Message) = .init(qbuf); defer { queue.close(io); while (queue.getOne(io)) |msg| { switch (msg) { - .msg => |m| m.deinit(server_allocator), - .hmsg => |h| h.deinit(server_allocator), + .MSG => |m| m.deinit(server_allocator), + .HMSG => |h| h.deinit(server_allocator), else => {}, } } else |_| {} @@ -144,7 +174,7 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us defer server.removeClient(io, server_allocator, id); // Do initial handshake with client - try queue.putOne(io, .{ .info = server.info }); + try queue.putOne(io, .{ .INFO = server.info }); var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator }); defer client_task.cancel(io) catch {}; @@ -152,27 +182,28 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us // Messages are owned by the server after they are received from the client while (client.next(server_allocator)) |msg| { switch (msg) { - .ping => { + .PING => { // Respond to ping with pong. - try client.send(io, .pong); + try client.send(io, .PONG); }, - .@"pub", .hpub => { + .PUB, .HPUB => { + @branchHint(.likely); defer switch (msg) { - .@"pub" => |pb| pb.deinit(server_allocator), - .hpub => |hp| hp.deinit(server_allocator), + .PUB => |pb| pb.deinit(server_allocator), + .HPUB => |hp| hp.deinit(server_allocator), else => unreachable, }; try server.publishMessage(io, server_allocator, &client, msg); }, - .sub => |sub| { + .SUB => |sub| { defer sub.deinit(server_allocator); - try server.subscribe(io, server_allocator, id, sub); + try server.subscribe(io, server_allocator, id, &queue, sub); }, - .unsub => |unsub| { + .UNSUB => |unsub| { defer unsub.deinit(server_allocator); try server.unsubscribe(io, server_allocator, id, unsub); }, - .connect => |connect| { + .CONNECT => |connect| { if (client.connect) |*current| { current.deinit(server_allocator); } @@ -227,53 +258,46 @@ test subjectMatches { } fn publishMessage(server: *Server, io: Io, alloc: Allocator, source_client: *Client, msg: Message) !void { - errdefer { - if (source_client.connect) |c| { - if (c.verbose) { - source_client.send(io, .{ .@"-err" = "Slow Consumer" }) catch {}; - } + defer if (source_client.connect) |c| { + if (c.verbose) { + source_client.send(io, .@"+OK") catch {}; } - } + }; + const subject = switch (msg) { - .@"pub" => |pb| pb.subject, - .hpub => |hp| hp.@"pub".subject, + .PUB => |pb| pb.subject, + .HPUB => |hp| hp.@"pub".subject, else => unreachable, }; try server.subs_lock.lock(io); defer server.subs_lock.unlock(io); for (server.subscriptions.items) |subscription| { if (subjectMatches(subscription.subject, subject)) { - const client = server.clients.get(subscription.client_id) orelse { - log.debug("Trying to publish to a client that no longer exists: {d}\n", .{subscription.client_id}); - continue; - }; - switch (msg) { - .@"pub" => |pb| client.send(io, .{ - .msg = try pb.toMsg(alloc, subscription.sid), - }) catch |err| switch (err) { - error.Canceled => return err, - else => {}, + .PUB => |pb| { + try subscription.queue.putOne(io, .{ + .MSG = try pb.toMsg(alloc, subscription.sid), + }); }, - .hpub => |hp| client.send(io, .{ .hmsg = try hp.toHMsg( - alloc, - subscription.sid, - ) }) catch |err| switch (err) { - error.Canceled => return err, - else => {}, + .HPUB => |hp| { + try subscription.queue.putOne(io, .{ + .HMSG = try hp.toHMsg(alloc, subscription.sid), + }); }, else => unreachable, } } } - if (source_client.connect) |c| { - if (c.verbose) { - source_client.send(io, .@"+ok") catch {}; - } - } } -fn subscribe(server: *Server, io: Io, gpa: Allocator, id: usize, msg: Message.Sub) !void { +fn subscribe( + server: *Server, + io: Io, + gpa: Allocator, + id: usize, + queue: *Queue(Message), + msg: Message.Sub, +) !void { try server.subs_lock.lock(io); defer server.subs_lock.unlock(io); const subject = try gpa.dupe(u8, msg.subject); @@ -284,10 +308,17 @@ fn subscribe(server: *Server, io: Io, gpa: Allocator, id: usize, msg: Message.Su .subject = subject, .client_id = id, .sid = sid, + .queue = queue, }); } -fn unsubscribe(server: *Server, io: Io, gpa: Allocator, id: usize, msg: Message.Unsub) !void { +fn unsubscribe( + server: *Server, + io: Io, + gpa: Allocator, + id: usize, + msg: Message.Unsub, +) !void { try server.subs_lock.lock(io); defer server.subs_lock.unlock(io); const len = server.subscriptions.items.len; @@ -301,5 +332,36 @@ fn unsubscribe(server: *Server, io: Io, gpa: Allocator, id: usize, msg: Message. } } +const parseUnsigned = std.fmt.parseUnsigned; + +fn getBufferSizes(io: Io) struct { usize, usize } { + const default_size = 4 * 1024; + const default = .{ default_size, default_size }; + + const dir = Dir.openDirAbsolute(io, "/proc/sys/net/core", .{}) catch { + log.err("couldn't open /proc/sys/net/core", .{}); + return default; + }; + + var buf: [64]u8 = undefined; + + const rmem_max = readBufferSize(io, dir, "rmem_max", &buf, default_size); + const wmem_max = readBufferSize(io, dir, "wmem_max", &buf, default_size); + + return .{ rmem_max, wmem_max }; +} + +fn readBufferSize(io: Io, dir: anytype, filename: []const u8, buf: []u8, default: usize) usize { + const bytes = dir.readFile(io, filename, buf) catch |err| { + log.err("couldn't open {s}: {any}", .{ filename, err }); + return default; + }; + + return parseUnsigned(usize, bytes[0 .. bytes.len - 1], 10) catch |err| { + log.err("couldn't parse {s}: {any}", .{ bytes[0 .. bytes.len - 1], err }); + return default; + }; +} + pub const default_id = "server-id-123"; pub const default_name = "Zits Server"; |
