summaryrefslogtreecommitdiff
path: root/lib/pleroma/workers/attachments_cleanup_worker.ex
blob: 24694cc67f412eb85f9d3f8e24e92a1442f77f1b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only

defmodule Pleroma.Workers.AttachmentsCleanupWorker do
  import Ecto.Query

  alias Pleroma.Object
  alias Pleroma.Repo
  alias Pleroma.Web.MediaProxy

  use Pleroma.Workers.WorkerHelper, queue: "attachments_cleanup"

  @impl Oban.Worker
  def perform(
        %{
          "op" => "cleanup_attachments",
          "object" => %{"data" => %{"attachment" => [_ | _] = attachments, "actor" => actor}}
        },
        _job
      ) do
    hrefs =
      Enum.flat_map(attachments, fn attachment ->
        Enum.map(attachment["url"], & &1["href"])
      end)

    uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])

    prefix =
      case Pleroma.Config.get([Pleroma.Upload, :base_url]) do
        nil -> "media"
        _ -> ""
      end

    base_url =
      String.trim_trailing(
        Pleroma.Config.get([Pleroma.Upload, :base_url], Pleroma.Web.base_url()),
        "/"
      )

    # find all objects for copies of the attachments, name and actor doesn't matter here
    {object_ids, attachment_urls, exclude_urls} =
      hrefs
      |> fetch_objects
      |> prepare_objects(actor, Enum.map(attachments, & &1["name"]))
      |> Enum.reduce({[], [], []}, fn {href, %{id: id, count: count}},
                                      {ids, hrefs, exclude_urls} ->
        with 1 <- count do
          {ids ++ [id], hrefs ++ [href], exclude_urls}
        else
          _ -> {ids ++ [id], hrefs, exclude_urls ++ [href]}
        end
      end)

    lock_attachments(MediaProxy.Invalidation.enabled?(), hrefs -- exclude_urls)

    Enum.each(attachment_urls, fn href ->
      href
      |> String.trim_leading("#{base_url}/#{prefix}")
      |> uploader.delete_file()
    end)

    delete_objects(object_ids)

    cache_purge(MediaProxy.Invalidation.enabled?(), hrefs -- exclude_urls)

    {:ok, :success}
  end

  def perform(%{"op" => "cleanup_attachments", "object" => _object}, _job), do: {:ok, :skip}

  defp delete_objects([_ | _] = object_ids) do
    Repo.delete_all(from(o in Object, where: o.id in ^object_ids))
  end

  defp delete_objects(_), do: :ok

  defp cache_purge(true, [_ | _] = urls), do: MediaProxy.Invalidation.purge(urls)
  defp cache_purge(_, _), do: :ok

  defp lock_attachments(true, [_ | _] = urls) do
    urls
    |> Enum.filter(&MediaProxy.is_url_proxiable?(&1))
    |> MediaProxy.put_in_deleted_urls()
  end

  defp lock_attachments(_, _), do: :ok

  # we should delete 1 object for any given attachment, but don't delete
  # files if there are more than 1 object for it
  def prepare_objects(objects, actor, names) do
    objects
    |> Enum.reduce(%{}, fn %{
                             id: id,
                             data: %{
                               "url" => [%{"href" => href}],
                               "actor" => obj_actor,
                               "name" => name
                             }
                           },
                           acc ->
      Map.update(acc, href, %{id: id, count: 1}, fn val ->
        case obj_actor == actor and name in names do
          true ->
            # set id of the actor's object that will be deleted
            %{val | id: id, count: val.count + 1}

          false ->
            # another actor's object, just increase count to not delete file
            %{val | count: val.count + 1}
        end
      end)
    end)
  end

  def fetch_objects(hrefs) do
    from(o in Object,
      where:
        fragment(
          "to_jsonb(array(select jsonb_array_elements((?)#>'{url}') ->> 'href' where jsonb_typeof((?)#>'{url}') = 'array'))::jsonb \\?| (?)",
          o.data,
          o.data,
          ^hrefs
        )
    )
    # The query above can be time consumptive on large instances until we
    # refactor how uploads are stored
    |> Repo.all(timeout: :infinity)
  end
end