From aa84f27df64e51afe98db58c036d2ed740715063 Mon Sep 17 00:00:00 2001 From: Maksim Pechnikov Date: Tue, 4 Aug 2020 21:17:51 +0300 Subject: added stream fetch objects --- lib/pleroma/repo_streamer.ex | 4 +- lib/pleroma/workers/attachments_cleanup_worker.ex | 48 +++++++++++------------ 2 files changed, 25 insertions(+), 27 deletions(-) diff --git a/lib/pleroma/repo_streamer.ex b/lib/pleroma/repo_streamer.ex index cb4d7bb7a..bfa490765 100644 --- a/lib/pleroma/repo_streamer.ex +++ b/lib/pleroma/repo_streamer.ex @@ -6,7 +6,7 @@ defmodule Pleroma.RepoStreamer do alias Pleroma.Repo import Ecto.Query - def chunk_stream(query, chunk_size) do + def chunk_stream(query, chunk_size, opts \\ []) do Stream.unfold(0, fn :halt -> {[], :halt} @@ -16,7 +16,7 @@ def chunk_stream(query, chunk_size) do |> order_by(asc: :id) |> where([r], r.id > ^last_id) |> limit(^chunk_size) - |> Repo.all() + |> Repo.all(opts) |> case do [] -> {[], :halt} diff --git a/lib/pleroma/workers/attachments_cleanup_worker.ex b/lib/pleroma/workers/attachments_cleanup_worker.ex index 58226b395..aa8ee2605 100644 --- a/lib/pleroma/workers/attachments_cleanup_worker.ex +++ b/lib/pleroma/workers/attachments_cleanup_worker.ex @@ -10,6 +10,8 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do use Pleroma.Workers.WorkerHelper, queue: "attachments_cleanup" + @batch_size 500 + @impl Oban.Worker def perform(%Job{ args: %{ @@ -19,8 +21,7 @@ def perform(%Job{ }) do attachments |> Enum.flat_map(fn item -> Enum.map(item["url"], & &1["href"]) end) - |> fetch_objects - |> prepare_objects(actor, Enum.map(attachments, & &1["name"])) + |> fetch_objects(actor, Enum.map(attachments, & &1["name"])) |> filter_objects |> do_clean @@ -71,17 +72,16 @@ defp filter_objects(objects) do end) end - defp prepare_objects(objects, actor, names) do - objects - |> Enum.reduce(%{}, fn %{ - id: id, - data: %{ - "url" => [%{"href" => href}], - "actor" => obj_actor, - "name" => name - } - }, - acc -> + defp prepare_objects(init, objects, actor, names) do + Enum.reduce(objects, init, fn %{ + id: id, + data: %{ + "url" => [%{"href" => href}], + "actor" => obj_actor, + "name" => name + } + }, + acc -> Map.update(acc, href, %{id: id, count: 1}, fn val -> case obj_actor == actor and name in names do true -> @@ -96,18 +96,16 @@ defp prepare_objects(objects, actor, names) do end) end - defp fetch_objects(hrefs) do - from(o in Object, - where: - fragment( - "to_jsonb(array(select jsonb_array_elements((?)#>'{url}') ->> 'href' where jsonb_typeof((?)#>'{url}') = 'array'))::jsonb \\?| (?)", - o.data, - o.data, - ^hrefs - ) + defp fetch_objects(hrefs, actor, names) do + from( + o in Object, + where: fragment("object_attachment_urls(?) && (?)", o.data, ^hrefs) ) - # The query above can be time consumptive on large instances until we - # refactor how uploads are stored - |> Repo.all(timeout: :infinity) + |> Pleroma.RepoStreamer.chunk_stream(@batch_size, timeout: :infinity) + |> Stream.transform(%{}, fn objs, acc -> + res = prepare_objects(acc, objs, actor, names) + {res, res} + end) + |> Enum.to_list() end end -- cgit v1.2.3