summaryrefslogtreecommitdiff
path: root/lib/pleroma/workers/attachments_cleanup_worker.ex
blob: aa8ee26052597e5ddb258b6b774965d5e2ba6707 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only

defmodule Pleroma.Workers.AttachmentsCleanupWorker do
  import Ecto.Query

  alias Pleroma.Object
  alias Pleroma.Repo

  use Pleroma.Workers.WorkerHelper, queue: "attachments_cleanup"

  @batch_size 500

  @impl Oban.Worker
  def perform(%Job{
        args: %{
          "op" => "cleanup_attachments",
          "object" => %{"data" => %{"attachment" => [_ | _] = attachments, "actor" => actor}}
        }
      }) do
    attachments
    |> Enum.flat_map(fn item -> Enum.map(item["url"], & &1["href"]) end)
    |> fetch_objects(actor, Enum.map(attachments, & &1["name"]))
    |> filter_objects
    |> do_clean

    {:ok, :success}
  end

  def perform(%Job{args: %{"op" => "cleanup_attachments", "object" => _object}}), do: {:ok, :skip}

  defp do_clean({object_ids, attachment_urls}) do
    uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])

    prefix =
      case Pleroma.Config.get([Pleroma.Upload, :base_url]) do
        nil -> "media"
        _ -> ""
      end

    base_url =
      String.trim_trailing(
        Pleroma.Config.get([Pleroma.Upload, :base_url], Pleroma.Web.base_url()),
        "/"
      )

    Enum.each(attachment_urls, fn href ->
      href
      |> String.trim_leading("#{base_url}/#{prefix}")
      |> uploader.delete_file()
    end)

    delete_objects(object_ids)
  end

  defp delete_objects([_ | _] = object_ids) do
    Repo.delete_all(from(o in Object, where: o.id in ^object_ids))
  end

  defp delete_objects(_), do: :ok

  # we should delete 1 object for any given attachment, but don't delete
  # files if there are more than 1 object for it
  defp filter_objects(objects) do
    Enum.reduce(objects, {[], []}, fn {href, %{id: id, count: count}}, {ids, hrefs} ->
      with 1 <- count do
        {ids ++ [id], hrefs ++ [href]}
      else
        _ -> {ids ++ [id], hrefs}
      end
    end)
  end

  defp prepare_objects(init, objects, actor, names) do
    Enum.reduce(objects, init, fn %{
                                    id: id,
                                    data: %{
                                      "url" => [%{"href" => href}],
                                      "actor" => obj_actor,
                                      "name" => name
                                    }
                                  },
                                  acc ->
      Map.update(acc, href, %{id: id, count: 1}, fn val ->
        case obj_actor == actor and name in names do
          true ->
            # set id of the actor's object that will be deleted
            %{val | id: id, count: val.count + 1}

          false ->
            # another actor's object, just increase count to not delete file
            %{val | count: val.count + 1}
        end
      end)
    end)
  end

  defp fetch_objects(hrefs, actor, names) do
    from(
      o in Object,
      where: fragment("object_attachment_urls(?) && (?)", o.data, ^hrefs)
    )
    |> Pleroma.RepoStreamer.chunk_stream(@batch_size, timeout: :infinity)
    |> Stream.transform(%{}, fn objs, acc ->
      res = prepare_objects(acc, objs, actor, names)
      {res, res}
    end)
    |> Enum.to_list()
  end
end