summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlain <lain@soykaf.club>2020-06-05 16:47:02 +0200
committerlain <lain@soykaf.club>2020-06-05 16:47:02 +0200
commit115d08a7542b92c5e1d889da41c0ee6837a1235e (patch)
tree9f1d3aa8a65f53d5a42b9ab64013c2d9a3296a19
parent65689ba9bd44e291fc626cce2bd5136b93a5da90 (diff)
Pipeline: Add a side effects step after the transaction finishes
This is to run things like streaming notifications out, which will sometimes need data that is created by the transaction, but is streamed out asynchronously.
-rw-r--r--lib/pleroma/notification.ex26
-rw-r--r--lib/pleroma/web/activity_pub/pipeline.ex4
-rw-r--r--lib/pleroma/web/activity_pub/side_effects.ex30
-rw-r--r--test/web/activity_pub/pipeline_test.exs9
-rw-r--r--test/web/activity_pub/side_effects_test.exs86
-rw-r--r--test/web/common_api/common_api_test.exs45
6 files changed, 170 insertions, 30 deletions
diff --git a/lib/pleroma/notification.ex b/lib/pleroma/notification.ex
index e5b880b10..49e27c05a 100644
--- a/lib/pleroma/notification.ex
+++ b/lib/pleroma/notification.ex
@@ -334,30 +334,34 @@ defmodule Pleroma.Notification do
end
end
- def create_notifications(%Activity{data: %{"to" => _, "type" => "Create"}} = activity) do
+ def create_notifications(activity, options \\ [])
+
+ def create_notifications(%Activity{data: %{"to" => _, "type" => "Create"}} = activity, options) do
object = Object.normalize(activity, false)
if object && object.data["type"] == "Answer" do
{:ok, []}
else
- do_create_notifications(activity)
+ do_create_notifications(activity, options)
end
end
- def create_notifications(%Activity{data: %{"type" => type}} = activity)
+ def create_notifications(%Activity{data: %{"type" => type}} = activity, options)
when type in ["Follow", "Like", "Announce", "Move", "EmojiReact"] do
- do_create_notifications(activity)
+ do_create_notifications(activity, options)
end
- def create_notifications(_), do: {:ok, []}
+ def create_notifications(_, _), do: {:ok, []}
+
+ defp do_create_notifications(%Activity{} = activity, options) do
+ do_send = Keyword.get(options, :do_send, true)
- defp do_create_notifications(%Activity{} = activity) do
{enabled_receivers, disabled_receivers} = get_notified_from_activity(activity)
potential_receivers = enabled_receivers ++ disabled_receivers
notifications =
Enum.map(potential_receivers, fn user ->
- do_send = user in enabled_receivers
+ do_send = do_send && user in enabled_receivers
create_notification(activity, user, do_send)
end)
@@ -623,4 +627,12 @@ defmodule Pleroma.Notification do
end
def skip?(_, _, _), do: false
+
+ def for_user_and_activity(user, activity) do
+ from(n in __MODULE__,
+ where: n.user_id == ^user.id,
+ where: n.activity_id == ^activity.id
+ )
+ |> Repo.one()
+ end
end
diff --git a/lib/pleroma/web/activity_pub/pipeline.ex b/lib/pleroma/web/activity_pub/pipeline.ex
index 0c54c4b23..6875c47f6 100644
--- a/lib/pleroma/web/activity_pub/pipeline.ex
+++ b/lib/pleroma/web/activity_pub/pipeline.ex
@@ -17,6 +17,10 @@ defmodule Pleroma.Web.ActivityPub.Pipeline do
{:ok, Activity.t() | Object.t(), keyword()} | {:error, any()}
def common_pipeline(object, meta) do
case Repo.transaction(fn -> do_common_pipeline(object, meta) end) do
+ {:ok, {:ok, activity, meta}} ->
+ SideEffects.handle_after_transaction(meta)
+ {:ok, activity, meta}
+
{:ok, value} ->
value
diff --git a/lib/pleroma/web/activity_pub/side_effects.ex b/lib/pleroma/web/activity_pub/side_effects.ex
index b3aacff40..10136789a 100644
--- a/lib/pleroma/web/activity_pub/side_effects.ex
+++ b/lib/pleroma/web/activity_pub/side_effects.ex
@@ -16,6 +16,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
alias Pleroma.Web.ActivityPub.Pipeline
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.Streamer
+ alias Pleroma.Web.Push
def handle(object, meta \\ [])
@@ -37,7 +38,12 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
# - Set up notifications
def handle(%{data: %{"type" => "Create"}} = activity, meta) do
with {:ok, _object, _meta} <- handle_object_creation(meta[:object_data], meta) do
- Notification.create_notifications(activity)
+ {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
+
+ meta =
+ meta
+ |> add_notifications(notifications)
+
{:ok, activity, meta}
else
e -> Repo.rollback(e)
@@ -200,4 +206,26 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
end
def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
+
+ defp send_notifications(meta) do
+ Keyword.get(meta, :created_notifications, [])
+ |> Enum.each(fn notification ->
+ Streamer.stream(["user", "user:notification"], notification)
+ Push.send(notification)
+ end)
+
+ meta
+ end
+
+ defp add_notifications(meta, notifications) do
+ existing = Keyword.get(meta, :created_notifications, [])
+
+ meta
+ |> Keyword.put(:created_notifications, notifications ++ existing)
+ end
+
+ def handle_after_transaction(meta) do
+ meta
+ |> send_notifications()
+ end
end
diff --git a/test/web/activity_pub/pipeline_test.exs b/test/web/activity_pub/pipeline_test.exs
index 26557720b..8deb64501 100644
--- a/test/web/activity_pub/pipeline_test.exs
+++ b/test/web/activity_pub/pipeline_test.exs
@@ -33,7 +33,10 @@ defmodule Pleroma.Web.ActivityPub.PipelineTest do
{
Pleroma.Web.ActivityPub.SideEffects,
[],
- [handle: fn o, m -> {:ok, o, m} end]
+ [
+ handle: fn o, m -> {:ok, o, m} end,
+ handle_after_transaction: fn m -> m end
+ ]
},
{
Pleroma.Web.Federator,
@@ -71,7 +74,7 @@ defmodule Pleroma.Web.ActivityPub.PipelineTest do
{
Pleroma.Web.ActivityPub.SideEffects,
[],
- [handle: fn o, m -> {:ok, o, m} end]
+ [handle: fn o, m -> {:ok, o, m} end, handle_after_transaction: fn m -> m end]
},
{
Pleroma.Web.Federator,
@@ -110,7 +113,7 @@ defmodule Pleroma.Web.ActivityPub.PipelineTest do
{
Pleroma.Web.ActivityPub.SideEffects,
[],
- [handle: fn o, m -> {:ok, o, m} end]
+ [handle: fn o, m -> {:ok, o, m} end, handle_after_transaction: fn m -> m end]
},
{
Pleroma.Web.Federator,
diff --git a/test/web/activity_pub/side_effects_test.exs b/test/web/activity_pub/side_effects_test.exs
index 40df664eb..43ffe1337 100644
--- a/test/web/activity_pub/side_effects_test.exs
+++ b/test/web/activity_pub/side_effects_test.exs
@@ -22,6 +22,47 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do
import Pleroma.Factory
import Mock
+ describe "handle_after_transaction" do
+ test "it streams out notifications" do
+ author = insert(:user, local: true)
+ recipient = insert(:user, local: true)
+
+ {:ok, chat_message_data, _meta} = Builder.chat_message(author, recipient.ap_id, "hey")
+
+ {:ok, create_activity_data, _meta} =
+ Builder.create(author, chat_message_data["id"], [recipient.ap_id])
+
+ {:ok, create_activity, _meta} = ActivityPub.persist(create_activity_data, local: false)
+
+ {:ok, _create_activity, meta} =
+ SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
+
+ assert [notification] = meta[:created_notifications]
+
+ with_mocks([
+ {
+ Pleroma.Web.Streamer,
+ [],
+ [
+ stream: fn _, _ -> nil end
+ ]
+ },
+ {
+ Pleroma.Web.Push,
+ [],
+ [
+ send: fn _ -> nil end
+ ]
+ }
+ ]) do
+ SideEffects.handle_after_transaction(meta)
+
+ assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
+ assert called(Pleroma.Web.Push.send(notification))
+ end
+ end
+ end
+
describe "delete objects" do
setup do
user = insert(:user)
@@ -361,22 +402,47 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do
{:ok, create_activity, _meta} = ActivityPub.persist(create_activity_data, local: false)
- {:ok, _create_activity, _meta} =
- SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
+ with_mocks([
+ {
+ Pleroma.Web.Streamer,
+ [],
+ [
+ stream: fn _, _ -> nil end
+ ]
+ },
+ {
+ Pleroma.Web.Push,
+ [],
+ [
+ send: fn _ -> nil end
+ ]
+ }
+ ]) do
+ {:ok, _create_activity, meta} =
+ SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
- chat = Chat.get(author.id, recipient.ap_id)
+ # The notification gets created
+ assert [notification] = meta[:created_notifications]
+ assert notification.activity_id == create_activity.id
- [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all()
+ # But it is not sent out
+ refute called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
+ refute called(Pleroma.Web.Push.send(notification))
- assert cm_ref.object.data["content"] == "hey"
- assert cm_ref.unread == false
+ chat = Chat.get(author.id, recipient.ap_id)
- chat = Chat.get(recipient.id, author.ap_id)
+ [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all()
- [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all()
+ assert cm_ref.object.data["content"] == "hey"
+ assert cm_ref.unread == false
- assert cm_ref.object.data["content"] == "hey"
- assert cm_ref.unread == true
+ chat = Chat.get(recipient.id, author.ap_id)
+
+ [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all()
+
+ assert cm_ref.object.data["content"] == "hey"
+ assert cm_ref.unread == true
+ end
end
test "it creates a Chat for the local users and bumps the unread count" do
diff --git a/test/web/common_api/common_api_test.exs b/test/web/common_api/common_api_test.exs
index 611a9ae66..63b59820e 100644
--- a/test/web/common_api/common_api_test.exs
+++ b/test/web/common_api/common_api_test.exs
@@ -7,6 +7,7 @@ defmodule Pleroma.Web.CommonAPITest do
alias Pleroma.Activity
alias Pleroma.Chat
alias Pleroma.Conversation.Participation
+ alias Pleroma.Notification
alias Pleroma.Object
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
@@ -39,15 +40,41 @@ defmodule Pleroma.Web.CommonAPITest do
{:ok, upload} = ActivityPub.upload(file, actor: author.ap_id)
- {:ok, activity} =
- CommonAPI.post_chat_message(
- author,
- recipient,
- nil,
- media_id: upload.id
- )
-
- assert activity
+ with_mocks([
+ {
+ Pleroma.Web.Streamer,
+ [],
+ [
+ stream: fn _, _ ->
+ nil
+ end
+ ]
+ },
+ {
+ Pleroma.Web.Push,
+ [],
+ [
+ send: fn _ -> nil end
+ ]
+ }
+ ]) do
+ {:ok, activity} =
+ CommonAPI.post_chat_message(
+ author,
+ recipient,
+ nil,
+ media_id: upload.id
+ )
+
+ notification =
+ Notification.for_user_and_activity(recipient, activity)
+ |> Repo.preload(:activity)
+
+ assert called(Pleroma.Web.Push.send(notification))
+ assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
+
+ assert activity
+ end
end
test "it adds html newlines" do