diff options
Diffstat (limited to 'src/Connection.zig')
| -rw-r--r-- | src/Connection.zig | 182 |
1 files changed, 134 insertions, 48 deletions
diff --git a/src/Connection.zig b/src/Connection.zig index 95805de..fd201e9 100644 --- a/src/Connection.zig +++ b/src/Connection.zig @@ -1,57 +1,143 @@ -socket: RawSocket, -headers: EthIpUdp, -connection: SaprusMessage, - -const Connection = @This(); - -pub fn init(socket: RawSocket, headers: EthIpUdp, connection: SaprusMessage) Connection { - return .{ - .socket = socket, - .headers = headers, - .connection = connection, +pub fn Chunked(comptime cs: usize) type { + return struct { + socket: RawSocket, + headers: EthIpUdp, + connection: SaprusMessage, + + const Self = @This(); + + pub const chunk_size = cs; + + pub fn init(socket: RawSocket, headers: EthIpUdp, connection: SaprusMessage) Self { + return .{ + .socket = socket, + .headers = headers, + .connection = connection, + }; + } + + pub fn next(self: Self, io: Io, buf: []u8) ![]const u8 { + _ = io; + log.debug("Awaiting connection message", .{}); + const res = try self.socket.receive(buf); + log.debug("Received {} byte connection message", .{res.len}); + const msg: SaprusMessage = try .parse(res[42..]); + const connection_res = msg.connection; + + log.debug("Payload was {s}", .{connection_res.payload}); + + return connection_res.payload; + } + + pub fn send(self: *Self, io: Io, buf: []const u8) !void { + const io_source: std.Random.IoSource = .{ .io = io }; + const rand = io_source.interface(); + + log.debug("Sending connection message", .{}); + + self.connection.connection.payload = buf; + var connection_bytes_buf: [2048]u8 = undefined; + const connection_bytes = self.connection.toBytes(&connection_bytes_buf); + + self.headers.ip.id = rand.int(u16); + self.headers.setPayloadLen(connection_bytes.len); + + var msg_buf: [2048]u8 = undefined; + var msg_w: Io.Writer = .fixed(&msg_buf); + try msg_w.writeAll(&self.headers.toBytes()); + try msg_w.writeAll(connection_bytes); + const full_msg = msg_w.buffered(); + + try self.socket.send(full_msg); + + log.debug("Sent {} byte connection message", .{full_msg.len}); + } + + pub const Writer = struct { + connection: *Self, + io: Io, + interface: Io.Writer, + err: ?anyerror, + + pub fn init(io: Io, connection: *Self, buf: []u8) Writer { + return .{ + .connection = connection, + .io = io, + .interface = .{ + .vtable = &.{ + .drain = drain, + }, + .buffer = buf, + }, + .err = null, + }; + } + + pub fn drain(io_w: *Io.Writer, data: []const []const u8, splat: usize) Io.Writer.Error!usize { + _ = splat; + const self: *Writer = @alignCast(@fieldParentPtr("interface", io_w)); + var res: usize = 0; + + // Get buffered data from the writer + const buffered = io_w.buffered(); + var buf_offset: usize = 0; + + // Process buffered data in chunks + while (buf_offset < buffered.len) { + const current_chunk_size = @min(chunk_size, buffered.len - buf_offset); + const chunk = buffered[buf_offset..][0..current_chunk_size]; + + // Base64 encode the chunk + var encoded_buf: [chunk_size * 2]u8 = undefined; + const encoded_len = std.base64.standard.Encoder.calcSize(chunk.len); + const encoded = std.base64.standard.Encoder.encode(&encoded_buf, chunk); + + // Send encoded chunk + self.connection.send(self.io, encoded[0..encoded_len]) catch |err| { + self.err = err; + return error.WriteFailed; + }; + self.io.sleep(.fromMilliseconds(40), .boot) catch @panic("honk shoo"); + + buf_offset += current_chunk_size; + res += current_chunk_size; + } + + // Process data slices + for (data) |slice| { + var slice_offset: usize = 0; + + while (slice_offset < slice.len) { + const current_chunk_size = @min(chunk_size, slice.len - slice_offset); + const chunk = slice[slice_offset..][0..current_chunk_size]; + + // Base64 encode the chunk + var encoded_buf: [chunk_size * 2]u8 = undefined; + const encoded_len = std.base64.standard.Encoder.calcSize(chunk.len); + const encoded = std.base64.standard.Encoder.encode(&encoded_buf, chunk); + + // Send encoded chunk + self.connection.send(self.io, encoded[0..encoded_len]) catch |err| { + self.err = err; + return error.WriteFailed; + }; + self.io.sleep(.fromMilliseconds(40), .boot) catch @panic("honk shoo"); + + slice_offset += current_chunk_size; + res += current_chunk_size; + } + } + + return res; + } + }; }; } -pub fn next(self: Connection, io: Io, buf: []u8) ![]const u8 { - _ = io; - log.debug("Awaiting connection message", .{}); - const res = try self.socket.receive(buf); - log.debug("Received {} byte connection message", .{res.len}); - const msg: SaprusMessage = try .parse(res[42..]); - const connection_res = msg.connection; - - log.debug("Payload was {s}", .{connection_res.payload}); - - return connection_res.payload; -} - -pub fn send(self: *Connection, io: Io, buf: []const u8) !void { - const io_source: std.Random.IoSource = .{ .io = io }; - const rand = io_source.interface(); - - log.debug("Sending connection message", .{}); - - self.connection.connection.payload = buf; - var connection_bytes_buf: [2048]u8 = undefined; - const connection_bytes = self.connection.toBytes(&connection_bytes_buf); - - self.headers.ip.id = rand.int(u16); - self.headers.setPayloadLen(connection_bytes.len); - - var msg_buf: [2048]u8 = undefined; - var msg_w: Writer = .fixed(&msg_buf); - try msg_w.writeAll(&self.headers.toBytes()); - try msg_w.writeAll(connection_bytes); - const full_msg = msg_w.buffered(); - - try self.socket.send(full_msg); - - log.debug("Sent {} byte connection message", .{full_msg.len}); -} +pub const Default = Chunked(RawSocket.max_payload_len); const std = @import("std"); const Io = std.Io; -const Writer = std.Io.Writer; const log = std.log; |
