summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMaksim Pechnikov <parallel588@gmail.com>2020-08-04 21:17:51 +0300
committerMaksim Pechnikov <parallel588@gmail.com>2020-08-04 21:17:51 +0300
commitaa84f27df64e51afe98db58c036d2ed740715063 (patch)
treeaa46cd1e8f26d6dc941bdefb3b4afe299564dcfd
parenta545c6e1e68ffad18853eeee9868dfafa60a3c23 (diff)
added stream fetch objectsissue/1969
-rw-r--r--lib/pleroma/repo_streamer.ex4
-rw-r--r--lib/pleroma/workers/attachments_cleanup_worker.ex48
2 files changed, 25 insertions, 27 deletions
diff --git a/lib/pleroma/repo_streamer.ex b/lib/pleroma/repo_streamer.ex
index cb4d7bb7a..bfa490765 100644
--- a/lib/pleroma/repo_streamer.ex
+++ b/lib/pleroma/repo_streamer.ex
@@ -6,7 +6,7 @@ defmodule Pleroma.RepoStreamer do
alias Pleroma.Repo
import Ecto.Query
- def chunk_stream(query, chunk_size) do
+ def chunk_stream(query, chunk_size, opts \\ []) do
Stream.unfold(0, fn
:halt ->
{[], :halt}
@@ -16,7 +16,7 @@ defmodule Pleroma.RepoStreamer do
|> order_by(asc: :id)
|> where([r], r.id > ^last_id)
|> limit(^chunk_size)
- |> Repo.all()
+ |> Repo.all(opts)
|> case do
[] ->
{[], :halt}
diff --git a/lib/pleroma/workers/attachments_cleanup_worker.ex b/lib/pleroma/workers/attachments_cleanup_worker.ex
index 58226b395..aa8ee2605 100644
--- a/lib/pleroma/workers/attachments_cleanup_worker.ex
+++ b/lib/pleroma/workers/attachments_cleanup_worker.ex
@@ -10,6 +10,8 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do
use Pleroma.Workers.WorkerHelper, queue: "attachments_cleanup"
+ @batch_size 500
+
@impl Oban.Worker
def perform(%Job{
args: %{
@@ -19,8 +21,7 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do
}) do
attachments
|> Enum.flat_map(fn item -> Enum.map(item["url"], & &1["href"]) end)
- |> fetch_objects
- |> prepare_objects(actor, Enum.map(attachments, & &1["name"]))
+ |> fetch_objects(actor, Enum.map(attachments, & &1["name"]))
|> filter_objects
|> do_clean
@@ -71,17 +72,16 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do
end)
end
- defp prepare_objects(objects, actor, names) do
- objects
- |> Enum.reduce(%{}, fn %{
- id: id,
- data: %{
- "url" => [%{"href" => href}],
- "actor" => obj_actor,
- "name" => name
- }
- },
- acc ->
+ defp prepare_objects(init, objects, actor, names) do
+ Enum.reduce(objects, init, fn %{
+ id: id,
+ data: %{
+ "url" => [%{"href" => href}],
+ "actor" => obj_actor,
+ "name" => name
+ }
+ },
+ acc ->
Map.update(acc, href, %{id: id, count: 1}, fn val ->
case obj_actor == actor and name in names do
true ->
@@ -96,18 +96,16 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do
end)
end
- defp fetch_objects(hrefs) do
- from(o in Object,
- where:
- fragment(
- "to_jsonb(array(select jsonb_array_elements((?)#>'{url}') ->> 'href' where jsonb_typeof((?)#>'{url}') = 'array'))::jsonb \\?| (?)",
- o.data,
- o.data,
- ^hrefs
- )
+ defp fetch_objects(hrefs, actor, names) do
+ from(
+ o in Object,
+ where: fragment("object_attachment_urls(?) && (?)", o.data, ^hrefs)
)
- # The query above can be time consumptive on large instances until we
- # refactor how uploads are stored
- |> Repo.all(timeout: :infinity)
+ |> Pleroma.RepoStreamer.chunk_stream(@batch_size, timeout: :infinity)
+ |> Stream.transform(%{}, fn objs, acc ->
+ res = prepare_objects(acc, objs, actor, names)
+ {res, res}
+ end)
+ |> Enum.to_list()
end
end