summaryrefslogtreecommitdiff
path: root/lib/pleroma/web/streamer/streamer.ex
blob: b7294d084e4ced07fb145b2d89640466bf225b6c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only

defmodule Pleroma.Web.Streamer do
  alias Pleroma.User
  alias Pleroma.Web.Streamer.State
  alias Pleroma.Web.Streamer.Worker

  @timeout 60_000
  @mix_env Mix.env()

  @public_streams ["public", "public:local", "public:media", "public:local:media"]
  @user_streams ["user", "user:notification", "direct"]

  @doc "Expands and authorizes a stream, and registers the process for streaming."
  @spec get_topic_and_add_socket(stream :: String.t(), State.t(), Map.t() | nil) ::
          {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
  def get_topic_and_add_socket(stream, socket, params \\ %{}) do
    user =
      case socket do
        %{assigns: %{user: user}} -> user
        _ -> nil
      end

    case get_topic(stream, user, params) do
      {:ok, topic} ->
        add_socket(topic, socket)
        {:ok, topic}

      error ->
        error
    end
  end

  @doc "Expand and authorizes a stream"
  @spec get_topic(stream :: String.t(), User.t() | nil, Map.t()) ::
          {:ok, topic :: String.t()} | {:error, :bad_topic}
  def get_topic(stream, user, params \\ %{})

  # Allow all public steams.
  def get_topic(stream, _, _) when stream in @public_streams do
    {:ok, stream}
  end

  # Allow all hashtags streams.
  def get_topic("hashtag", _, %{"tag" => tag}) do
    {:ok, "hashtag:" <> tag}
  end

  # Expand user streams.
  def get_topic(stream, %User{} = user, _) when stream in @user_streams do
    {:ok, stream <> ":" <> to_string(user.id)}
  end

  def get_topic(stream, _, _) when stream in @user_streams do
    {:error, :unauthorized}
  end

  # List streams.
  def get_topic("list", %User{} = user, %{"list" => id}) do
    if Pleroma.List.get(id, user) do
      {:ok, "list:" <> to_string(id)}
    else
      {:error, :bad_topic}
    end
  end

  def get_topic("list", _, _) do
    {:error, :unauthorized}
  end

  def get_topic(_, _, _) do
    {:error, :bad_topic}
  end

  def add_socket(topic, socket) do
    State.add_socket(topic, socket)
  end

  def remove_socket(topic, socket) do
    State.remove_socket(topic, socket)
  end

  def get_sockets do
    State.get_sockets()
  end

  def stream(topics, items) do
    if should_send?() do
      Task.async(fn ->
        :poolboy.transaction(
          :streamer_worker,
          &Worker.stream(&1, topics, items),
          @timeout
        )
      end)
    end
  end

  def supervisor, do: Pleroma.Web.Streamer.Supervisor

  defp should_send? do
    handle_should_send(@mix_env)
  end

  defp handle_should_send(:test) do
    case Process.whereis(:streamer_worker) do
      nil ->
        false

      pid ->
        Process.alive?(pid)
    end
  end

  defp handle_should_send(:benchmark), do: false

  defp handle_should_send(_), do: true
end