summaryrefslogtreecommitdiff
path: root/lib/pleroma/web/streamer.ex
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pleroma/web/streamer.ex')
-rw-r--r--lib/pleroma/web/streamer.ex55
1 files changed, 34 insertions, 21 deletions
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex
index d618dfe54..fc3bbb130 100644
--- a/lib/pleroma/web/streamer.ex
+++ b/lib/pleroma/web/streamer.ex
@@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
+# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Streamer do
@@ -36,9 +36,8 @@ defmodule Pleroma.Web.Streamer do
) ::
{:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
- case get_topic(stream, user, oauth_token, params) do
- {:ok, topic} -> add_socket(topic, user)
- error -> error
+ with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
+ add_socket(topic, user)
end
end
@@ -57,14 +56,23 @@ defmodule Pleroma.Web.Streamer do
{:ok, "hashtag:" <> tag}
end
+ # Allow remote instance streams.
+ def get_topic("public:remote", _user, _oauth_token, %{"instance" => instance} = _params) do
+ {:ok, "public:remote:" <> instance}
+ end
+
+ def get_topic("public:remote:media", _user, _oauth_token, %{"instance" => instance} = _params) do
+ {:ok, "public:remote:media:" <> instance}
+ end
+
# Expand user streams.
def get_topic(
stream,
%User{id: user_id} = user,
- %Token{user_id: token_user_id} = oauth_token,
+ %Token{user_id: user_id} = oauth_token,
_params
)
- when stream in @user_streams and user_id == token_user_id do
+ when stream in @user_streams do
# Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
required_scopes =
if stream == "user:notification" do
@@ -88,10 +96,9 @@ defmodule Pleroma.Web.Streamer do
def get_topic(
"list",
%User{id: user_id} = user,
- %Token{user_id: token_user_id} = oauth_token,
+ %Token{user_id: user_id} = oauth_token,
%{"list" => id}
- )
- when user_id == token_user_id do
+ ) do
cond do
OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
{:error, :unauthorized}
@@ -128,16 +135,10 @@ defmodule Pleroma.Web.Streamer do
def stream(topics, items) do
if should_env_send?() do
- List.wrap(topics)
- |> Enum.each(fn topic ->
- List.wrap(items)
- |> Enum.each(fn item ->
- spawn(fn -> do_stream(topic, item) end)
- end)
- end)
+ for topic <- List.wrap(topics), item <- List.wrap(items) do
+ spawn(fn -> do_stream(topic, item) end)
+ end
end
-
- :ok
end
def filtered_by_user?(user, item, streamed_type \\ :activity)
@@ -150,9 +151,8 @@ defmodule Pleroma.Web.Streamer do
recipients = MapSet.new(item.recipients)
domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
- with parent <- Object.normalize(item) || item,
- true <-
- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
+ with parent <- Object.normalize(item, fetch: false) || item,
+ true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
true <-
!(streamed_type == :activity && item.data["type"] == "Announce" &&
@@ -186,6 +186,19 @@ defmodule Pleroma.Web.Streamer do
end)
end
+ defp do_stream("follow_relationship", item) do
+ text = StreamerView.render("follow_relationships_update.json", item)
+ user_topic = "user:#{item.follower.id}"
+
+ Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
+
+ Registry.dispatch(@registry, user_topic, fn list ->
+ Enum.each(list, fn {pid, _auth} ->
+ send(pid, {:text, text})
+ end)
+ end)
+ end
+
defp do_stream("participation", participation) do
user_topic = "direct:#{participation.user_id}"
Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")