summaryrefslogtreecommitdiff
path: root/lib/pleroma/web/fed_sockets/outgoing_handler.ex
blob: e235a7c4382e6b28a2954b9932c641d7501c6eaa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only

defmodule Pleroma.Web.FedSockets.OutgoingHandler do
  use GenServer

  require Logger

  alias Pleroma.Application
  alias Pleroma.Web.ActivityPub.InternalFetchActor
  alias Pleroma.Web.FedSockets
  alias Pleroma.Web.FedSockets.FedRegistry
  alias Pleroma.Web.FedSockets.FedSocket
  alias Pleroma.Web.FedSockets.SocketInfo

  def start_link(uri) do
    GenServer.start_link(__MODULE__, %{uri: uri})
  end

  def init(%{uri: uri}) do
    case initiate_connection(uri) do
      {:ok, ws_origin, conn_pid} ->
        FedRegistry.add_fed_socket(ws_origin, conn_pid)

      {:error, reason} ->
        Logger.debug("Outgoing connection failed - #{inspect(reason)}")
        :ignore
    end
  end

  def handle_info({:gun_ws, conn_pid, _ref, {:text, data}}, socket_info) do
    socket_info = SocketInfo.touch(socket_info)

    case FedSocket.receive_package(socket_info, data) do
      {:noreply, _} ->
        {:noreply, socket_info}

      {:reply, reply} ->
        :gun.ws_send(conn_pid, {:text, Jason.encode!(reply)})
        {:noreply, socket_info}

      {:error, reason} ->
        Logger.error("incoming error - receive_package: #{inspect(reason)}")
        {:noreply, socket_info}
    end
  end

  def handle_info(:close, state) do
    Logger.debug("Sending close frame !!!!!!!")
    {:close, state}
  end

  def handle_info({:gun_down, _pid, _prot, :closed, _}, state) do
    {:stop, :normal, state}
  end

  def handle_info({:send, data}, %{conn_pid: conn_pid} = socket_info) do
    socket_info = SocketInfo.touch(socket_info)
    :gun.ws_send(conn_pid, {:text, data})
    {:noreply, socket_info}
  end

  def handle_info({:gun_ws, _, _, :pong}, state) do
    {:noreply, state, :hibernate}
  end

  def handle_info(msg, state) do
    Logger.debug("#{__MODULE__} unhandled event #{inspect(msg)}")
    {:noreply, state}
  end

  def terminate(reason, state) do
    Logger.debug(
      "#{__MODULE__} terminating outgoing connection for #{inspect(state)} for #{inspect(reason)}"
    )

    {:ok, state}
  end

  def initiate_connection(uri) do
    ws_uri =
      uri
      |> SocketInfo.origin()
      |> FedSockets.uri_for_origin()

    %{host: host, port: port, path: path} = URI.parse(ws_uri)

    with {:ok, conn_pid} <- :gun.open(to_charlist(host), port, %{protocols: [:http]}),
         {:ok, _} <- :gun.await_up(conn_pid),
         reference <-
           :gun.get(conn_pid, to_charlist(path), [
             {'user-agent', to_charlist(Application.user_agent())}
           ]),
         {:response, :fin, 204, _} <- :gun.await(conn_pid, reference),
         headers <- build_headers(uri),
         ref <- :gun.ws_upgrade(conn_pid, to_charlist(path), headers, %{silence_pings: false}) do
      receive do
        {:gun_upgrade, ^conn_pid, ^ref, [<<"websocket">>], _} ->
          {:ok, ws_uri, conn_pid}
      after
        15_000 ->
          Logger.debug("Fedsocket timeout connecting to #{inspect(uri)}")
          {:error, :timeout}
      end
    else
      {:response, :nofin, 404, _} ->
        {:error, :fedsockets_not_supported}

      e ->
        Logger.debug("Fedsocket error connecting to #{inspect(uri)}")
        {:error, e}
    end
  end

  defp build_headers(uri) do
    host_for_sig = uri |> URI.parse() |> host_signature()

    shake = FedSocket.shake()
    digest = "SHA-256=" <> (:crypto.hash(:sha256, shake) |> Base.encode64())
    date = Pleroma.Signature.signed_date()
    shake_size = byte_size(shake)

    signature_opts = %{
      "(request-target)": shake,
      "content-length": to_charlist("#{shake_size}"),
      date: date,
      digest: digest,
      host: host_for_sig
    }

    signature = Pleroma.Signature.sign(InternalFetchActor.get_actor(), signature_opts)

    [
      {'signature', to_charlist(signature)},
      {'date', date},
      {'digest', to_charlist(digest)},
      {'content-length', to_charlist("#{shake_size}")},
      {to_charlist("(request-target)"), to_charlist(shake)},
      {'user-agent', to_charlist(Application.user_agent())}
    ]
  end

  defp host_signature(%{host: host, scheme: scheme, port: port}) do
    if port == URI.default_port(scheme) do
      host
    else
      "#{host}:#{port}"
    end
  end
end