summaryrefslogtreecommitdiff
path: root/src/Connection.zig
diff options
context:
space:
mode:
Diffstat (limited to 'src/Connection.zig')
-rw-r--r--src/Connection.zig182
1 files changed, 134 insertions, 48 deletions
diff --git a/src/Connection.zig b/src/Connection.zig
index 95805de..fd201e9 100644
--- a/src/Connection.zig
+++ b/src/Connection.zig
@@ -1,57 +1,143 @@
-socket: RawSocket,
-headers: EthIpUdp,
-connection: SaprusMessage,
-
-const Connection = @This();
-
-pub fn init(socket: RawSocket, headers: EthIpUdp, connection: SaprusMessage) Connection {
- return .{
- .socket = socket,
- .headers = headers,
- .connection = connection,
+pub fn Chunked(comptime cs: usize) type {
+ return struct {
+ socket: RawSocket,
+ headers: EthIpUdp,
+ connection: SaprusMessage,
+
+ const Self = @This();
+
+ pub const chunk_size = cs;
+
+ pub fn init(socket: RawSocket, headers: EthIpUdp, connection: SaprusMessage) Self {
+ return .{
+ .socket = socket,
+ .headers = headers,
+ .connection = connection,
+ };
+ }
+
+ pub fn next(self: Self, io: Io, buf: []u8) ![]const u8 {
+ _ = io;
+ log.debug("Awaiting connection message", .{});
+ const res = try self.socket.receive(buf);
+ log.debug("Received {} byte connection message", .{res.len});
+ const msg: SaprusMessage = try .parse(res[42..]);
+ const connection_res = msg.connection;
+
+ log.debug("Payload was {s}", .{connection_res.payload});
+
+ return connection_res.payload;
+ }
+
+ pub fn send(self: *Self, io: Io, buf: []const u8) !void {
+ const io_source: std.Random.IoSource = .{ .io = io };
+ const rand = io_source.interface();
+
+ log.debug("Sending connection message", .{});
+
+ self.connection.connection.payload = buf;
+ var connection_bytes_buf: [2048]u8 = undefined;
+ const connection_bytes = self.connection.toBytes(&connection_bytes_buf);
+
+ self.headers.ip.id = rand.int(u16);
+ self.headers.setPayloadLen(connection_bytes.len);
+
+ var msg_buf: [2048]u8 = undefined;
+ var msg_w: Io.Writer = .fixed(&msg_buf);
+ try msg_w.writeAll(&self.headers.toBytes());
+ try msg_w.writeAll(connection_bytes);
+ const full_msg = msg_w.buffered();
+
+ try self.socket.send(full_msg);
+
+ log.debug("Sent {} byte connection message", .{full_msg.len});
+ }
+
+ pub const Writer = struct {
+ connection: *Self,
+ io: Io,
+ interface: Io.Writer,
+ err: ?anyerror,
+
+ pub fn init(io: Io, connection: *Self, buf: []u8) Writer {
+ return .{
+ .connection = connection,
+ .io = io,
+ .interface = .{
+ .vtable = &.{
+ .drain = drain,
+ },
+ .buffer = buf,
+ },
+ .err = null,
+ };
+ }
+
+ pub fn drain(io_w: *Io.Writer, data: []const []const u8, splat: usize) Io.Writer.Error!usize {
+ _ = splat;
+ const self: *Writer = @alignCast(@fieldParentPtr("interface", io_w));
+ var res: usize = 0;
+
+ // Get buffered data from the writer
+ const buffered = io_w.buffered();
+ var buf_offset: usize = 0;
+
+ // Process buffered data in chunks
+ while (buf_offset < buffered.len) {
+ const current_chunk_size = @min(chunk_size, buffered.len - buf_offset);
+ const chunk = buffered[buf_offset..][0..current_chunk_size];
+
+ // Base64 encode the chunk
+ var encoded_buf: [chunk_size * 2]u8 = undefined;
+ const encoded_len = std.base64.standard.Encoder.calcSize(chunk.len);
+ const encoded = std.base64.standard.Encoder.encode(&encoded_buf, chunk);
+
+ // Send encoded chunk
+ self.connection.send(self.io, encoded[0..encoded_len]) catch |err| {
+ self.err = err;
+ return error.WriteFailed;
+ };
+ self.io.sleep(.fromMilliseconds(40), .boot) catch @panic("honk shoo");
+
+ buf_offset += current_chunk_size;
+ res += current_chunk_size;
+ }
+
+ // Process data slices
+ for (data) |slice| {
+ var slice_offset: usize = 0;
+
+ while (slice_offset < slice.len) {
+ const current_chunk_size = @min(chunk_size, slice.len - slice_offset);
+ const chunk = slice[slice_offset..][0..current_chunk_size];
+
+ // Base64 encode the chunk
+ var encoded_buf: [chunk_size * 2]u8 = undefined;
+ const encoded_len = std.base64.standard.Encoder.calcSize(chunk.len);
+ const encoded = std.base64.standard.Encoder.encode(&encoded_buf, chunk);
+
+ // Send encoded chunk
+ self.connection.send(self.io, encoded[0..encoded_len]) catch |err| {
+ self.err = err;
+ return error.WriteFailed;
+ };
+ self.io.sleep(.fromMilliseconds(40), .boot) catch @panic("honk shoo");
+
+ slice_offset += current_chunk_size;
+ res += current_chunk_size;
+ }
+ }
+
+ return res;
+ }
+ };
};
}
-pub fn next(self: Connection, io: Io, buf: []u8) ![]const u8 {
- _ = io;
- log.debug("Awaiting connection message", .{});
- const res = try self.socket.receive(buf);
- log.debug("Received {} byte connection message", .{res.len});
- const msg: SaprusMessage = try .parse(res[42..]);
- const connection_res = msg.connection;
-
- log.debug("Payload was {s}", .{connection_res.payload});
-
- return connection_res.payload;
-}
-
-pub fn send(self: *Connection, io: Io, buf: []const u8) !void {
- const io_source: std.Random.IoSource = .{ .io = io };
- const rand = io_source.interface();
-
- log.debug("Sending connection message", .{});
-
- self.connection.connection.payload = buf;
- var connection_bytes_buf: [2048]u8 = undefined;
- const connection_bytes = self.connection.toBytes(&connection_bytes_buf);
-
- self.headers.ip.id = rand.int(u16);
- self.headers.setPayloadLen(connection_bytes.len);
-
- var msg_buf: [2048]u8 = undefined;
- var msg_w: Writer = .fixed(&msg_buf);
- try msg_w.writeAll(&self.headers.toBytes());
- try msg_w.writeAll(connection_bytes);
- const full_msg = msg_w.buffered();
-
- try self.socket.send(full_msg);
-
- log.debug("Sent {} byte connection message", .{full_msg.len});
-}
+pub const Default = Chunked(RawSocket.max_payload_len);
const std = @import("std");
const Io = std.Io;
-const Writer = std.Io.Writer;
const log = std.log;