summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorrinpatch <rin@patch.cx>2021-02-24 17:28:49 +0300
committerrinpatch <rin@patch.cx>2021-02-24 18:52:39 +0300
commit4286a383dfff02525ee07285094b7110ce0554fd (patch)
tree5f9038315e8cb57da01cdd36e8641d10b83aa60e
parentd113ed94e7b6b30eecc0a5415a69d1b321e625e5 (diff)
Improve user deletion consistency
An attempt to ensure something like https://git.pleroma.social/pleroma/pleroma/-/issues/1415 does not happen or is at least debuggable. - Deactivate the user before deletion to ensure no new posts/follows can be made during it - Run the deletion in a transaction. This should reduce performance impact of a deletion since it will only use a single connection. Also makes sure an account cannot get stuck in a weird state between deleted and active. Made it possible to disable though, in case someone hits the issue mentioned above. - Log more errors
-rw-r--r--config/config.exs4
-rw-r--r--config/description.exs14
-rw-r--r--config/test.exs4
-rw-r--r--lib/pleroma/user.ex148
-rw-r--r--lib/pleroma/web/activity_pub/pipeline.ex6
-rw-r--r--lib/pleroma/web/streamer.ex9
-rw-r--r--lib/pleroma/workers/background_worker.ex41
7 files changed, 190 insertions, 36 deletions
diff --git a/config/config.exs b/config/config.exs
index 66aee3264..c62f5ca1f 100644
--- a/config/config.exs
+++ b/config/config.exs
@@ -652,7 +652,9 @@ config :pleroma, :oauth2,
issue_new_refresh_token: true,
clean_expired_tokens: false
-config :pleroma, :database, rum_enabled: false
+config :pleroma, :database,
+ rum_enabled: false,
+ rollback_on_activity_deletion_errors: true
config :pleroma, :env, Mix.env()
diff --git a/config/description.exs b/config/description.exs
index d9b15e684..5e99e3846 100644
--- a/config/description.exs
+++ b/config/description.exs
@@ -72,6 +72,20 @@ frontend_options = [
config :pleroma, :config_description, [
%{
group: :pleroma,
+ key: :database,
+ type: :group,
+ description: "Database settings",
+ children: [
+ %{
+ key: :rollback_on_activity_deletion_errors,
+ type: :boolean,
+ description:
+ "Rollback the transaction if Pleroma fails to delete an activity during user deletion. If you need to disable this, please report the issue you were having on the bugtracker."
+ }
+ ]
+ },
+ %{
+ group: :pleroma,
key: Pleroma.Upload,
type: :group,
description: "Upload general settings",
diff --git a/config/test.exs b/config/test.exs
index 87396a88d..d87047496 100644
--- a/config/test.exs
+++ b/config/test.exs
@@ -133,6 +133,10 @@ config :pleroma, :side_effects,
ap_streamer: Pleroma.Web.ActivityPub.ActivityPubMock,
logger: Pleroma.LoggerMock
+# Disable transaction check by default unless the test wants otherwise
+# because all tests run in a transaction.
+config :pleroma, Pleroma.Workers.BackgroundWorker, ignore_transaction_check: true
+
if File.exists?("./config/test.secret.exs") do
import_config "test.secret.exs"
else
diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex
index 51f5bc8ea..9837166ea 100644
--- a/lib/pleroma/user.ex
+++ b/lib/pleroma/user.ex
@@ -1072,7 +1072,19 @@ defmodule Pleroma.User do
def update_and_set_cache(changeset) do
with {:ok, user} <- Repo.update(changeset, stale_error_field: :id) do
- set_cache(user)
+ BackgroundWorker.execute_or_enqueue_if_in_transaction(fn
+ false ->
+ set_cache(user)
+
+ # If the function has been enqueued, there is a chance something changed
+ # before the worker got to executing it, so refetch the user from the database
+ true ->
+ user.id
+ |> get_by_id()
+ |> set_cache()
+ end)
+
+ {:ok, user}
end
end
@@ -1339,7 +1351,7 @@ defmodule Pleroma.User do
user
|> follow_information_changeset(%{follower_count: follower_count})
- |> update_and_set_cache
+ |> update_and_set_cache()
else
{:ok, maybe_fetch_follow_information(user)}
end
@@ -1747,27 +1759,67 @@ defmodule Pleroma.User do
@spec perform(atom(), User.t()) :: {:ok, User.t()}
def perform(:delete, %User{} = user) do
- # Remove all relationships
- user
- |> get_followers()
- |> Enum.each(fn follower ->
- ActivityPub.unfollow(follower, user)
- unfollow(follower, user)
- end)
+ # Deactivate the user before starting the deletion
+ # to make sure they are not able to make new posts/follows during it
+ {:ok, user} = set_activation_status(user, false)
+
+ Repo.transaction(
+ fn ->
+ # Remove all relationships
+ # No need to handle errors from ActivityPub.unfollow because
+ # they will automatically rollback the transaction.
+ user
+ |> get_followers()
+ |> Enum.each(fn follower ->
+ ActivityPub.unfollow(follower, user)
+ unfollow(follower, user)
+ end)
- user
- |> get_friends()
- |> Enum.each(fn followed ->
- ActivityPub.unfollow(user, followed)
- unfollow(user, followed)
- end)
+ user
+ |> get_friends()
+ |> Enum.each(fn followed ->
+ ActivityPub.unfollow(user, followed)
+ unfollow(user, followed)
+ end)
+
+ rollback_on_activity_deletion_errors =
+ Config.get([:database, :rollback_on_activity_deletion_errors], true)
- delete_user_activities(user)
- delete_notifications_from_user_activities(user)
+ case {delete_user_activities(user), rollback_on_activity_deletion_errors} do
+ {res, rollback} when res == :ok or rollback == false ->
+ case res do
+ {:error, _} ->
+ Logger.warn(fn ->
+ "Deleting #{user.ap_id}: Failed deleting some of the activities, proceeding anyway."
+ end)
- delete_outgoing_pending_follow_requests(user)
+ _ ->
+ :noop
+ end
- delete_or_deactivate(user)
+ delete_notifications_from_user_activities(user)
+
+ delete_outgoing_pending_follow_requests(user)
+
+ case delete_or_deactivate(user) do
+ {:ok, user} -> user
+ {:error, e} -> Repo.rollback(e)
+ end
+
+ {{:error, e}, true} ->
+ Logger.error(fn ->
+ """
+ Deleting #{user.ap_id}: Failed deleting some of the activities, rolling back.
+ Set `config :pleroma, :database, rollback_on_activity_deletion_errors: true`
+ and restart the deletion if you want to continue anyway. Please report this on Pleroma bugtracker.
+ """
+ end)
+
+ Repo.rollback({:deleting_activities, e})
+ end
+ end,
+ timeout: :infinity
+ )
end
def perform(:set_activation_async, user, status), do: set_activation(user, status)
@@ -1807,16 +1859,48 @@ defmodule Pleroma.User do
|> Repo.delete_all()
end
+ @type activity_id :: String.t()
+ @spec delete_user_activities(User.t()) ::
+ :ok | {:error, [{:error, activity_id(), any()}]}
def delete_user_activities(%User{ap_id: ap_id} = user) do
- ap_id
- |> Activity.Queries.by_actor()
- |> Repo.chunk_stream(50, :batches)
- |> Stream.each(fn activities ->
- Enum.each(activities, fn activity -> delete_activity(activity, user) end)
- end)
- |> Stream.run()
+ errors =
+ ap_id
+ |> Activity.Queries.by_actor()
+ |> Repo.chunk_stream(50)
+ |> Stream.flat_map(fn activity ->
+ case delete_activity(activity, user) do
+ {:ok, _activity, _meta} ->
+ []
+
+ {:error, error} ->
+ Logger.error(fn ->
+ "Deleting #{ap_id}: could not delete or undo #{activity.data["id"]}.\n Reason: #{
+ inspect(error)
+ }"
+ end)
+
+ [{:error, activity.id, error}]
+
+ :noop ->
+ Logger.debug(fn ->
+ "Deleting #{ap_id}: nothing to do for #{activity.data["id"]} of type #{
+ activity.data["type"]
+ }"
+ end)
+
+ []
+ end
+ end)
+ |> Enum.to_list()
+
+ case errors do
+ [] -> :ok
+ errors -> {:error, errors}
+ end
end
+ @spec delete_activity(Pleroma.Activity.t(), User.t()) ::
+ {:ok, Activity.t(), keyword()} | {:error, any()} | :noop
defp delete_activity(%{data: %{"type" => "Create", "object" => object}} = activity, user) do
with {_, %Object{}} <- {:find_object, Object.get_by_ap_id(object)},
{:ok, delete_data, _} <- Builder.delete(user, object) do
@@ -1831,18 +1915,20 @@ defmodule Pleroma.User do
end
e ->
- Logger.error("Could not delete #{object} created by #{activity.data["ap_id"]}")
- Logger.error("Error: #{inspect(e)}")
+ e
end
end
defp delete_activity(%{data: %{"type" => type}} = activity, user)
when type in ["Like", "Announce"] do
- {:ok, undo, _} = Builder.undo(user, activity)
- Pipeline.common_pipeline(undo, local: user.local)
+ with {:ok, undo, _} <- Builder.undo(user, activity) do
+ Pipeline.common_pipeline(undo, local: user.local)
+ else
+ e -> e
+ end
end
- defp delete_activity(_activity, _user), do: "Doing nothing"
+ defp delete_activity(_activity, _user), do: :noop
defp delete_outgoing_pending_follow_requests(user) do
user
diff --git a/lib/pleroma/web/activity_pub/pipeline.ex b/lib/pleroma/web/activity_pub/pipeline.ex
index 195596f94..405649fb1 100644
--- a/lib/pleroma/web/activity_pub/pipeline.ex
+++ b/lib/pleroma/web/activity_pub/pipeline.ex
@@ -13,6 +13,7 @@ defmodule Pleroma.Web.ActivityPub.Pipeline do
alias Pleroma.Web.ActivityPub.SideEffects
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.Federator
+ alias Pleroma.Workers.BackgroundWorker
@side_effects Config.get([:pipeline, :side_effects], SideEffects)
@federator Config.get([:pipeline, :federator], Federator)
@@ -26,7 +27,10 @@ defmodule Pleroma.Web.ActivityPub.Pipeline do
def common_pipeline(object, meta) do
case Repo.transaction(fn -> do_common_pipeline(object, meta) end) do
{:ok, {:ok, activity, meta}} ->
- @side_effects.handle_after_transaction(meta)
+ BackgroundWorker.execute_or_enqueue_if_in_transaction(fn ->
+ @side_effects.handle_after_transaction(meta)
+ end)
+
{:ok, activity, meta}
{:ok, value} ->
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex
index fc3bbb130..f63f16a79 100644
--- a/lib/pleroma/web/streamer.ex
+++ b/lib/pleroma/web/streamer.ex
@@ -18,6 +18,7 @@ defmodule Pleroma.Web.Streamer do
alias Pleroma.Web.OAuth.Token
alias Pleroma.Web.Plugs.OAuthScopesPlug
alias Pleroma.Web.StreamerView
+ alias Pleroma.Workers.BackgroundWorker
@mix_env Mix.env()
@registry Pleroma.Web.StreamerRegistry
@@ -135,9 +136,11 @@ defmodule Pleroma.Web.Streamer do
def stream(topics, items) do
if should_env_send?() do
- for topic <- List.wrap(topics), item <- List.wrap(items) do
- spawn(fn -> do_stream(topic, item) end)
- end
+ BackgroundWorker.execute_or_enqueue_if_in_transaction(fn ->
+ for topic <- List.wrap(topics), item <- List.wrap(items) do
+ spawn(fn -> do_stream(topic, item) end)
+ end
+ end)
end
end
diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex
index 1e28384cb..0465fd06f 100644
--- a/lib/pleroma/workers/background_worker.ex
+++ b/lib/pleroma/workers/background_worker.ex
@@ -38,4 +38,45 @@ defmodule Pleroma.Workers.BackgroundWorker do
Pleroma.FollowingRelationship.move_following(origin, target)
end
+
+ def perform(%Job{args: %{"op" => "transaction_side_effects", "function" => encoded_function}}) do
+ function =
+ encoded_function
+ |> Base.decode64!()
+ |> :erlang.binary_to_term()
+
+ maybe_execute_function_with_worker_info(function, true)
+ :ok
+ end
+
+ @doc "Executes a function right away if not running in transaction. Otherwise enqueues it to be executed by BackgroundWorker after transaction commit. Intended for side effects that can not be rolled back. If the function has an arity of 1, the first argument will be a boolean indicating whether it is run by BackgroundWorker or not."
+ @spec execute_or_enqueue_if_in_transaction((() -> any()) | (boolean() -> any())) ::
+ {:ok, {:enqueued, Oban.Job.t()}}
+ | {:error, {:enqueue, Oban.job_changeset()}}
+ | {:error, {:enqueue, term()}}
+ | {:ok, {:executed, term()}}
+ def execute_or_enqueue_if_in_transaction(function) do
+ if Pleroma.Repo.in_transaction?() and
+ !Pleroma.Config.get([__MODULE__, :ignore_transaction_check], false) do
+ encoded_function =
+ function
+ |> :erlang.term_to_binary()
+ |> Base.encode64()
+
+ case enqueue("transaction_side_effects", %{"function" => encoded_function}) do
+ {:ok, job} -> {:ok, {:enqueued, job}}
+ {:error, e} -> {:error, {:enqueue, e}}
+ end
+ else
+ {:ok, {:executed, maybe_execute_function_with_worker_info(function, false)}}
+ end
+ end
+
+ defp maybe_execute_function_with_worker_info(function, executed_by_worker) do
+ if :erlang.fun_info(function)[:arity] == 1 do
+ function.(executed_by_worker)
+ else
+ function.()
+ end
+ end
end