summaryrefslogtreecommitdiff
path: root/lib/pleroma/web/streamer/streamer.ex
blob: 814d5a7292519e881b4d81cc28139e02211fce8b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only

defmodule Pleroma.Web.Streamer do
  alias Pleroma.Web.Streamer.State
  alias Pleroma.Web.Streamer.Worker

  @timeout 60_000
  @mix_env Mix.env()

  def add_socket(topic, socket) do
    State.add_socket(topic, socket)
  end

  def remove_socket(topic, socket) do
    State.remove_socket(topic, socket)
  end

  def get_sockets do
    State.get_sockets()
  end

  def stream(topics, items) do
    if should_send?() do
      Task.async(fn ->
        :poolboy.transaction(
          :streamer_worker,
          &Worker.stream(&1, topics, items),
          @timeout
        )
      end)
    end
  end

  def supervisor, do: Pleroma.Web.Streamer.Supervisor

  defp should_send? do
    handle_should_send(@mix_env)
  end

  defp handle_should_send(:test) do
    case Process.whereis(:streamer_worker) do
      nil ->
        false

      pid ->
        Process.alive?(pid)
    end
  end

  defp handle_should_send(:benchmark), do: false

  defp handle_should_send(_), do: true
end