summaryrefslogtreecommitdiff
path: root/lib/pleroma/web/streamer.ex
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pleroma/web/streamer.ex')
-rw-r--r--lib/pleroma/web/streamer.ex42
1 files changed, 38 insertions, 4 deletions
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex
index ff7f62a1e..3c0da5c27 100644
--- a/lib/pleroma/web/streamer.ex
+++ b/lib/pleroma/web/streamer.ex
@@ -37,7 +37,7 @@ defmodule Pleroma.Web.Streamer do
{:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
- add_socket(topic, user)
+ add_socket(topic, oauth_token)
end
end
@@ -120,10 +120,10 @@ defmodule Pleroma.Web.Streamer do
end
@doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
- def add_socket(topic, user) do
+ def add_socket(topic, oauth_token) do
if should_env_send?() do
- auth? = if user, do: true
- Registry.register(@registry, topic, auth?)
+ oauth_token_id = if oauth_token, do: oauth_token.id, else: false
+ Registry.register(@registry, topic, oauth_token_id)
end
{:ok, topic}
@@ -296,6 +296,24 @@ defmodule Pleroma.Web.Streamer do
defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
+ defp push_to_socket(topic, %Activity{data: %{"type" => "Update"}} = item) do
+ create_activity =
+ Pleroma.Activity.get_create_by_object_ap_id(item.object.data["id"])
+ |> Map.put(:object, item.object)
+
+ anon_render = StreamerView.render("status_update.json", create_activity)
+
+ Registry.dispatch(@registry, topic, fn list ->
+ Enum.each(list, fn {pid, auth?} ->
+ if auth? do
+ send(pid, {:render_with_user, StreamerView, "status_update.json", create_activity})
+ else
+ send(pid, {:text, anon_render})
+ end
+ end)
+ end)
+ end
+
defp push_to_socket(topic, item) do
anon_render = StreamerView.render("update.json", item)
@@ -320,6 +338,22 @@ defmodule Pleroma.Web.Streamer do
end
end
+ def close_streams_by_oauth_token(oauth_token) do
+ if should_env_send?() do
+ Registry.select(
+ @registry,
+ [
+ {
+ {:"$1", :"$2", :"$3"},
+ [{:==, :"$3", oauth_token.id}],
+ [:"$2"]
+ }
+ ]
+ )
+ |> Enum.each(fn pid -> send(pid, :close) end)
+ end
+ end
+
# In test environement, only return true if the registry is started.
# In benchmark environment, returns false.
# In any other environment, always returns true.