summaryrefslogtreecommitdiff
path: root/src/Connection.zig
blob: fd201e9561c406c5c2d706496b7992df7bdd5ed6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
pub fn Chunked(comptime cs: usize) type {
    return struct {
        socket: RawSocket,
        headers: EthIpUdp,
        connection: SaprusMessage,

        const Self = @This();

        pub const chunk_size = cs;

        pub fn init(socket: RawSocket, headers: EthIpUdp, connection: SaprusMessage) Self {
            return .{
                .socket = socket,
                .headers = headers,
                .connection = connection,
            };
        }

        pub fn next(self: Self, io: Io, buf: []u8) ![]const u8 {
            _ = io;
            log.debug("Awaiting connection message", .{});
            const res = try self.socket.receive(buf);
            log.debug("Received {} byte connection message", .{res.len});
            const msg: SaprusMessage = try .parse(res[42..]);
            const connection_res = msg.connection;

            log.debug("Payload was {s}", .{connection_res.payload});

            return connection_res.payload;
        }

        pub fn send(self: *Self, io: Io, buf: []const u8) !void {
            const io_source: std.Random.IoSource = .{ .io = io };
            const rand = io_source.interface();

            log.debug("Sending connection message", .{});

            self.connection.connection.payload = buf;
            var connection_bytes_buf: [2048]u8 = undefined;
            const connection_bytes = self.connection.toBytes(&connection_bytes_buf);

            self.headers.ip.id = rand.int(u16);
            self.headers.setPayloadLen(connection_bytes.len);

            var msg_buf: [2048]u8 = undefined;
            var msg_w: Io.Writer = .fixed(&msg_buf);
            try msg_w.writeAll(&self.headers.toBytes());
            try msg_w.writeAll(connection_bytes);
            const full_msg = msg_w.buffered();

            try self.socket.send(full_msg);

            log.debug("Sent {} byte connection message", .{full_msg.len});
        }

        pub const Writer = struct {
            connection: *Self,
            io: Io,
            interface: Io.Writer,
            err: ?anyerror,

            pub fn init(io: Io, connection: *Self, buf: []u8) Writer {
                return .{
                    .connection = connection,
                    .io = io,
                    .interface = .{
                        .vtable = &.{
                            .drain = drain,
                        },
                        .buffer = buf,
                    },
                    .err = null,
                };
            }

            pub fn drain(io_w: *Io.Writer, data: []const []const u8, splat: usize) Io.Writer.Error!usize {
                _ = splat;
                const self: *Writer = @alignCast(@fieldParentPtr("interface", io_w));
                var res: usize = 0;

                // Get buffered data from the writer
                const buffered = io_w.buffered();
                var buf_offset: usize = 0;

                // Process buffered data in chunks
                while (buf_offset < buffered.len) {
                    const current_chunk_size = @min(chunk_size, buffered.len - buf_offset);
                    const chunk = buffered[buf_offset..][0..current_chunk_size];

                    // Base64 encode the chunk
                    var encoded_buf: [chunk_size * 2]u8 = undefined;
                    const encoded_len = std.base64.standard.Encoder.calcSize(chunk.len);
                    const encoded = std.base64.standard.Encoder.encode(&encoded_buf, chunk);

                    // Send encoded chunk
                    self.connection.send(self.io, encoded[0..encoded_len]) catch |err| {
                        self.err = err;
                        return error.WriteFailed;
                    };
                    self.io.sleep(.fromMilliseconds(40), .boot) catch @panic("honk shoo");

                    buf_offset += current_chunk_size;
                    res += current_chunk_size;
                }

                // Process data slices
                for (data) |slice| {
                    var slice_offset: usize = 0;

                    while (slice_offset < slice.len) {
                        const current_chunk_size = @min(chunk_size, slice.len - slice_offset);
                        const chunk = slice[slice_offset..][0..current_chunk_size];

                        // Base64 encode the chunk
                        var encoded_buf: [chunk_size * 2]u8 = undefined;
                        const encoded_len = std.base64.standard.Encoder.calcSize(chunk.len);
                        const encoded = std.base64.standard.Encoder.encode(&encoded_buf, chunk);

                        // Send encoded chunk
                        self.connection.send(self.io, encoded[0..encoded_len]) catch |err| {
                            self.err = err;
                            return error.WriteFailed;
                        };
                        self.io.sleep(.fromMilliseconds(40), .boot) catch @panic("honk shoo");

                        slice_offset += current_chunk_size;
                        res += current_chunk_size;
                    }
                }

                return res;
            }
        };
    };
}

pub const Default = Chunked(RawSocket.max_payload_len);

const std = @import("std");
const Io = std.Io;

const log = std.log;

const SaprusMessage = @import("./message.zig").Message;

const EthIpUdp = @import("./EthIpUdp.zig").EthIpUdp;
const RawSocket = @import("./RawSocket.zig");