diff options
Diffstat (limited to 'lib/pleroma/workers')
-rw-r--r-- | lib/pleroma/workers/attachments_cleanup_worker.ex | 158 | ||||
-rw-r--r-- | lib/pleroma/workers/background_worker.ex | 34 | ||||
-rw-r--r-- | lib/pleroma/workers/cron/clear_oauth_token_worker.ex | 4 | ||||
-rw-r--r-- | lib/pleroma/workers/cron/digest_emails_worker.ex | 4 | ||||
-rw-r--r-- | lib/pleroma/workers/cron/new_users_digest_worker.ex | 4 | ||||
-rw-r--r-- | lib/pleroma/workers/cron/purge_expired_activities_worker.ex | 6 | ||||
-rw-r--r-- | lib/pleroma/workers/cron/stats_worker.ex | 3 | ||||
-rw-r--r-- | lib/pleroma/workers/mailer_worker.ex | 2 | ||||
-rw-r--r-- | lib/pleroma/workers/publisher_worker.ex | 6 | ||||
-rw-r--r-- | lib/pleroma/workers/receiver_worker.ex | 2 | ||||
-rw-r--r-- | lib/pleroma/workers/remote_fetcher_worker.ex | 8 | ||||
-rw-r--r-- | lib/pleroma/workers/scheduled_activity_worker.ex | 4 | ||||
-rw-r--r-- | lib/pleroma/workers/transmogrifier_worker.ex | 2 | ||||
-rw-r--r-- | lib/pleroma/workers/web_pusher_worker.ex | 2 | ||||
-rw-r--r-- | lib/pleroma/workers/worker_helper.ex | 4 |
15 files changed, 134 insertions, 109 deletions
diff --git a/lib/pleroma/workers/attachments_cleanup_worker.ex b/lib/pleroma/workers/attachments_cleanup_worker.ex index 3c5820a86..58226b395 100644 --- a/lib/pleroma/workers/attachments_cleanup_worker.ex +++ b/lib/pleroma/workers/attachments_cleanup_worker.ex @@ -11,85 +11,103 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do use Pleroma.Workers.WorkerHelper, queue: "attachments_cleanup" @impl Oban.Worker - def perform( - %{ + def perform(%Job{ + args: %{ "op" => "cleanup_attachments", "object" => %{"data" => %{"attachment" => [_ | _] = attachments, "actor" => actor}} - }, - _job - ) do - hrefs = - Enum.flat_map(attachments, fn attachment -> - Enum.map(attachment["url"], & &1["href"]) - end) + } + }) do + attachments + |> Enum.flat_map(fn item -> Enum.map(item["url"], & &1["href"]) end) + |> fetch_objects + |> prepare_objects(actor, Enum.map(attachments, & &1["name"])) + |> filter_objects + |> do_clean + + {:ok, :success} + end - names = Enum.map(attachments, & &1["name"]) + def perform(%Job{args: %{"op" => "cleanup_attachments", "object" => _object}}), do: {:ok, :skip} + defp do_clean({object_ids, attachment_urls}) do uploader = Pleroma.Config.get([Pleroma.Upload, :uploader]) - # find all objects for copies of the attachments, name and actor doesn't matter here - delete_ids = - from(o in Object, - where: - fragment( - "to_jsonb(array(select jsonb_array_elements((?)#>'{url}') ->> 'href' where jsonb_typeof((?)#>'{url}') = 'array'))::jsonb \\?| (?)", - o.data, - o.data, - ^hrefs - ) + prefix = + case Pleroma.Config.get([Pleroma.Upload, :base_url]) do + nil -> "media" + _ -> "" + end + + base_url = + String.trim_trailing( + Pleroma.Config.get([Pleroma.Upload, :base_url], Pleroma.Web.base_url()), + "/" ) - # The query above can be time consumptive on large instances until we - # refactor how uploads are stored - |> Repo.all(timeout: :infinity) - # we should delete 1 object for any given attachment, but don't delete - # files if there are more than 1 object for it - |> Enum.reduce(%{}, fn %{ - id: id, - data: %{ - "url" => [%{"href" => href}], - "actor" => obj_actor, - "name" => name - } - }, - acc -> - Map.update(acc, href, %{id: id, count: 1}, fn val -> - case obj_actor == actor and name in names do - true -> - # set id of the actor's object that will be deleted - %{val | id: id, count: val.count + 1} - - false -> - # another actor's object, just increase count to not delete file - %{val | count: val.count + 1} - end - end) - end) - |> Enum.map(fn {href, %{id: id, count: count}} -> - # only delete files that have single instance - with 1 <- count do - prefix = - case Pleroma.Config.get([Pleroma.Upload, :base_url]) do - nil -> "media" - _ -> "" - end - - base_url = - String.trim_trailing( - Pleroma.Config.get([Pleroma.Upload, :base_url], Pleroma.Web.base_url()), - "/" - ) - - file_path = String.trim_leading(href, "#{base_url}/#{prefix}") - - uploader.delete_file(file_path) - end - id - end) + Enum.each(attachment_urls, fn href -> + href + |> String.trim_leading("#{base_url}/#{prefix}") + |> uploader.delete_file() + end) + + delete_objects(object_ids) + end - from(o in Object, where: o.id in ^delete_ids) - |> Repo.delete_all() + defp delete_objects([_ | _] = object_ids) do + Repo.delete_all(from(o in Object, where: o.id in ^object_ids)) end - def perform(%{"op" => "cleanup_attachments", "object" => _object}, _job), do: :ok + defp delete_objects(_), do: :ok + + # we should delete 1 object for any given attachment, but don't delete + # files if there are more than 1 object for it + defp filter_objects(objects) do + Enum.reduce(objects, {[], []}, fn {href, %{id: id, count: count}}, {ids, hrefs} -> + with 1 <- count do + {ids ++ [id], hrefs ++ [href]} + else + _ -> {ids ++ [id], hrefs} + end + end) + end + + defp prepare_objects(objects, actor, names) do + objects + |> Enum.reduce(%{}, fn %{ + id: id, + data: %{ + "url" => [%{"href" => href}], + "actor" => obj_actor, + "name" => name + } + }, + acc -> + Map.update(acc, href, %{id: id, count: 1}, fn val -> + case obj_actor == actor and name in names do + true -> + # set id of the actor's object that will be deleted + %{val | id: id, count: val.count + 1} + + false -> + # another actor's object, just increase count to not delete file + %{val | count: val.count + 1} + end + end) + end) + end + + defp fetch_objects(hrefs) do + from(o in Object, + where: + fragment( + "to_jsonb(array(select jsonb_array_elements((?)#>'{url}') ->> 'href' where jsonb_typeof((?)#>'{url}') = 'array'))::jsonb \\?| (?)", + o.data, + o.data, + ^hrefs + ) + ) + # The query above can be time consumptive on large instances until we + # refactor how uploads are stored + |> Repo.all(timeout: :infinity) + end end diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex index 57c3a9c3a..cec5a7462 100644 --- a/lib/pleroma/workers/background_worker.ex +++ b/lib/pleroma/workers/background_worker.ex @@ -11,59 +11,59 @@ defmodule Pleroma.Workers.BackgroundWorker do @impl Oban.Worker - def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}, _job) do + def perform(%Job{args: %{"op" => "deactivate_user", "user_id" => user_id, "status" => status}}) do user = User.get_cached_by_id(user_id) User.perform(:deactivate_async, user, status) end - def perform(%{"op" => "delete_user", "user_id" => user_id}, _job) do + def perform(%Job{args: %{"op" => "delete_user", "user_id" => user_id}}) do user = User.get_cached_by_id(user_id) User.perform(:delete, user) end - def perform(%{"op" => "force_password_reset", "user_id" => user_id}, _job) do + def perform(%Job{args: %{"op" => "force_password_reset", "user_id" => user_id}}) do user = User.get_cached_by_id(user_id) User.perform(:force_password_reset, user) end - def perform( - %{ + def perform(%Job{ + args: %{ "op" => "blocks_import", "blocker_id" => blocker_id, "blocked_identifiers" => blocked_identifiers - }, - _job - ) do + } + }) do blocker = User.get_cached_by_id(blocker_id) {:ok, User.perform(:blocks_import, blocker, blocked_identifiers)} end - def perform( - %{ + def perform(%Job{ + args: %{ "op" => "follow_import", "follower_id" => follower_id, "followed_identifiers" => followed_identifiers - }, - _job - ) do + } + }) do follower = User.get_cached_by_id(follower_id) {:ok, User.perform(:follow_import, follower, followed_identifiers)} end - def perform(%{"op" => "media_proxy_preload", "message" => message}, _job) do + def perform(%Job{args: %{"op" => "media_proxy_preload", "message" => message}}) do MediaProxyWarmingPolicy.perform(:preload, message) end - def perform(%{"op" => "media_proxy_prefetch", "url" => url}, _job) do + def perform(%Job{args: %{"op" => "media_proxy_prefetch", "url" => url}}) do MediaProxyWarmingPolicy.perform(:prefetch, url) end - def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id}, _job) do + def perform(%Job{args: %{"op" => "fetch_data_for_activity", "activity_id" => activity_id}}) do activity = Activity.get_by_id(activity_id) Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity) end - def perform(%{"op" => "move_following", "origin_id" => origin_id, "target_id" => target_id}, _) do + def perform(%Job{ + args: %{"op" => "move_following", "origin_id" => origin_id, "target_id" => target_id} + }) do origin = User.get_cached_by_id(origin_id) target = User.get_cached_by_id(target_id) diff --git a/lib/pleroma/workers/cron/clear_oauth_token_worker.ex b/lib/pleroma/workers/cron/clear_oauth_token_worker.ex index 341eff054..276f47efc 100644 --- a/lib/pleroma/workers/cron/clear_oauth_token_worker.ex +++ b/lib/pleroma/workers/cron/clear_oauth_token_worker.ex @@ -13,9 +13,11 @@ defmodule Pleroma.Workers.Cron.ClearOauthTokenWorker do alias Pleroma.Web.OAuth.Token @impl Oban.Worker - def perform(_opts, _job) do + def perform(_job) do if Config.get([:oauth2, :clean_expired_tokens], false) do Token.delete_expired_tokens() end + + :ok end end diff --git a/lib/pleroma/workers/cron/digest_emails_worker.ex b/lib/pleroma/workers/cron/digest_emails_worker.ex index dd13c3b17..0c56f00fb 100644 --- a/lib/pleroma/workers/cron/digest_emails_worker.ex +++ b/lib/pleroma/workers/cron/digest_emails_worker.ex @@ -19,7 +19,7 @@ defmodule Pleroma.Workers.Cron.DigestEmailsWorker do require Logger @impl Oban.Worker - def perform(_opts, _job) do + def perform(_job) do config = Config.get([:email_notifications, :digest]) if config[:active] do @@ -38,6 +38,8 @@ defmodule Pleroma.Workers.Cron.DigestEmailsWorker do |> Repo.all() |> send_emails end + + :ok end def send_emails(users) do diff --git a/lib/pleroma/workers/cron/new_users_digest_worker.ex b/lib/pleroma/workers/cron/new_users_digest_worker.ex index 9bd0a5621..8bbaed83d 100644 --- a/lib/pleroma/workers/cron/new_users_digest_worker.ex +++ b/lib/pleroma/workers/cron/new_users_digest_worker.ex @@ -12,7 +12,7 @@ defmodule Pleroma.Workers.Cron.NewUsersDigestWorker do use Pleroma.Workers.WorkerHelper, queue: "new_users_digest" @impl Oban.Worker - def perform(_args, _job) do + def perform(_job) do if Pleroma.Config.get([Pleroma.Emails.NewUsersDigestEmail, :enabled]) do today = NaiveDateTime.utc_now() |> Timex.beginning_of_day() @@ -57,5 +57,7 @@ defmodule Pleroma.Workers.Cron.NewUsersDigestWorker do |> Enum.each(&Pleroma.Emails.Mailer.deliver/1) end end + + :ok end end diff --git a/lib/pleroma/workers/cron/purge_expired_activities_worker.ex b/lib/pleroma/workers/cron/purge_expired_activities_worker.ex index b8953dd7f..6549207fc 100644 --- a/lib/pleroma/workers/cron/purge_expired_activities_worker.ex +++ b/lib/pleroma/workers/cron/purge_expired_activities_worker.ex @@ -20,10 +20,12 @@ defmodule Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker do @interval :timer.minutes(1) @impl Oban.Worker - def perform(_opts, _job) do + def perform(_job) do if Config.get([ActivityExpiration, :enabled]) do Enum.each(ActivityExpiration.due_expirations(@interval), &delete_activity/1) end + after + :ok end def delete_activity(%ActivityExpiration{activity_id: activity_id}) do @@ -39,7 +41,7 @@ defmodule Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker do {:user, _} -> Logger.error( - "#{__MODULE__} Couldn't delete expired activity: not found actorof ##{activity_id}" + "#{__MODULE__} Couldn't delete expired activity: not found actor of ##{activity_id}" ) end end diff --git a/lib/pleroma/workers/cron/stats_worker.ex b/lib/pleroma/workers/cron/stats_worker.ex index e9b8d59c4..6a79540bc 100644 --- a/lib/pleroma/workers/cron/stats_worker.ex +++ b/lib/pleroma/workers/cron/stats_worker.ex @@ -10,7 +10,8 @@ defmodule Pleroma.Workers.Cron.StatsWorker do use Oban.Worker, queue: "background" @impl Oban.Worker - def perform(_opts, _job) do + def perform(_job) do Pleroma.Stats.do_collect() + :ok end end diff --git a/lib/pleroma/workers/mailer_worker.ex b/lib/pleroma/workers/mailer_worker.ex index 6955338a5..32273cfa5 100644 --- a/lib/pleroma/workers/mailer_worker.ex +++ b/lib/pleroma/workers/mailer_worker.ex @@ -6,7 +6,7 @@ defmodule Pleroma.Workers.MailerWorker do use Pleroma.Workers.WorkerHelper, queue: "mailer" @impl Oban.Worker - def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do + def perform(%Job{args: %{"op" => "email", "encoded_email" => encoded_email, "config" => config}}) do encoded_email |> Base.decode64!() |> :erlang.binary_to_term() diff --git a/lib/pleroma/workers/publisher_worker.ex b/lib/pleroma/workers/publisher_worker.ex index daf79efc0..e739c3cd0 100644 --- a/lib/pleroma/workers/publisher_worker.ex +++ b/lib/pleroma/workers/publisher_worker.ex @@ -8,17 +8,17 @@ defmodule Pleroma.Workers.PublisherWorker do use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing" - def backoff(attempt) when is_integer(attempt) do + def backoff(%Job{attempt: attempt}) when is_integer(attempt) do Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5) end @impl Oban.Worker - def perform(%{"op" => "publish", "activity_id" => activity_id}, _job) do + def perform(%Job{args: %{"op" => "publish", "activity_id" => activity_id}}) do activity = Activity.get_by_id(activity_id) Federator.perform(:publish, activity) end - def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}, _job) do + def perform(%Job{args: %{"op" => "publish_one", "module" => module_name, "params" => params}}) do params = Map.new(params, fn {k, v} -> {String.to_atom(k), v} end) Federator.perform(:publish_one, String.to_atom(module_name), params) end diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex index f7a7124f3..1b97af1a8 100644 --- a/lib/pleroma/workers/receiver_worker.ex +++ b/lib/pleroma/workers/receiver_worker.ex @@ -8,7 +8,7 @@ defmodule Pleroma.Workers.ReceiverWorker do use Pleroma.Workers.WorkerHelper, queue: "federator_incoming" @impl Oban.Worker - def perform(%{"op" => "incoming_ap_doc", "params" => params}, _job) do + def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do Federator.perform(:incoming_ap_doc, params) end end diff --git a/lib/pleroma/workers/remote_fetcher_worker.ex b/lib/pleroma/workers/remote_fetcher_worker.ex index ec6534f21..27e2e3386 100644 --- a/lib/pleroma/workers/remote_fetcher_worker.ex +++ b/lib/pleroma/workers/remote_fetcher_worker.ex @@ -8,13 +8,7 @@ defmodule Pleroma.Workers.RemoteFetcherWorker do use Pleroma.Workers.WorkerHelper, queue: "remote_fetcher" @impl Oban.Worker - def perform( - %{ - "op" => "fetch_remote", - "id" => id - } = args, - _job - ) do + def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do {:ok, _object} = Fetcher.fetch_object_from_id(id, depth: args["depth"]) end end diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex index 8905f4ad0..dd9986fe4 100644 --- a/lib/pleroma/workers/scheduled_activity_worker.ex +++ b/lib/pleroma/workers/scheduled_activity_worker.ex @@ -17,7 +17,7 @@ defmodule Pleroma.Workers.ScheduledActivityWorker do require Logger @impl Oban.Worker - def perform(%{"activity_id" => activity_id}, _job) do + def perform(%Job{args: %{"activity_id" => activity_id}}) do if Config.get([ScheduledActivity, :enabled]) do case Pleroma.Repo.get(ScheduledActivity, activity_id) do %ScheduledActivity{} = scheduled_activity -> @@ -30,6 +30,8 @@ defmodule Pleroma.Workers.ScheduledActivityWorker do end defp post_activity(%ScheduledActivity{user_id: user_id, params: params} = scheduled_activity) do + params = Map.new(params, fn {key, value} -> {String.to_existing_atom(key), value} end) + with {:delete, {:ok, _}} <- {:delete, ScheduledActivity.delete(scheduled_activity)}, {:user, %User{} = user} <- {:user, User.get_cached_by_id(user_id)}, {:post, {:ok, _}} <- {:post, CommonAPI.post(user, params)} do diff --git a/lib/pleroma/workers/transmogrifier_worker.ex b/lib/pleroma/workers/transmogrifier_worker.ex index 11239ca5e..15f36375c 100644 --- a/lib/pleroma/workers/transmogrifier_worker.ex +++ b/lib/pleroma/workers/transmogrifier_worker.ex @@ -8,7 +8,7 @@ defmodule Pleroma.Workers.TransmogrifierWorker do use Pleroma.Workers.WorkerHelper, queue: "transmogrifier" @impl Oban.Worker - def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do + def perform(%Job{args: %{"op" => "user_upgrade", "user_id" => user_id}}) do user = User.get_cached_by_id(user_id) Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user) end diff --git a/lib/pleroma/workers/web_pusher_worker.ex b/lib/pleroma/workers/web_pusher_worker.ex index 58ad25e39..0cfdc6a6f 100644 --- a/lib/pleroma/workers/web_pusher_worker.ex +++ b/lib/pleroma/workers/web_pusher_worker.ex @@ -9,7 +9,7 @@ defmodule Pleroma.Workers.WebPusherWorker do use Pleroma.Workers.WorkerHelper, queue: "web_push" @impl Oban.Worker - def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do + def perform(%Job{args: %{"op" => "web_push", "notification_id" => notification_id}}) do notification = Notification |> Repo.get(notification_id) diff --git a/lib/pleroma/workers/worker_helper.ex b/lib/pleroma/workers/worker_helper.ex index d1f90c35b..7d1289be2 100644 --- a/lib/pleroma/workers/worker_helper.ex +++ b/lib/pleroma/workers/worker_helper.ex @@ -32,6 +32,8 @@ defmodule Pleroma.Workers.WorkerHelper do queue: unquote(queue), max_attempts: 1 + alias Oban.Job + def enqueue(op, params, worker_args \\ []) do params = Map.merge(%{"op" => op}, params) queue_atom = String.to_atom(unquote(queue)) @@ -39,7 +41,7 @@ defmodule Pleroma.Workers.WorkerHelper do unquote(caller_module) |> apply(:new, [params, worker_args]) - |> Pleroma.Repo.insert() + |> Oban.insert() end end end |