1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.AttachmentsCleanupWorker do
import Ecto.Query
alias Pleroma.Object
alias Pleroma.Repo
use Pleroma.Workers.WorkerHelper, queue: "attachments_cleanup"
@batch_size 500
@impl Oban.Worker
def perform(%Job{
args: %{
"op" => "cleanup_attachments",
"object" => %{"data" => %{"attachment" => [_ | _] = attachments, "actor" => actor}}
}
}) do
attachments
|> Enum.flat_map(fn item -> Enum.map(item["url"], & &1["href"]) end)
|> fetch_objects(actor, Enum.map(attachments, & &1["name"]))
|> filter_objects
|> do_clean
{:ok, :success}
end
def perform(%Job{args: %{"op" => "cleanup_attachments", "object" => _object}}), do: {:ok, :skip}
defp do_clean({object_ids, attachment_urls}) do
uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])
prefix =
case Pleroma.Config.get([Pleroma.Upload, :base_url]) do
nil -> "media"
_ -> ""
end
base_url =
String.trim_trailing(
Pleroma.Config.get([Pleroma.Upload, :base_url], Pleroma.Web.base_url()),
"/"
)
Enum.each(attachment_urls, fn href ->
href
|> String.trim_leading("#{base_url}/#{prefix}")
|> uploader.delete_file()
end)
delete_objects(object_ids)
end
defp delete_objects([_ | _] = object_ids) do
Repo.delete_all(from(o in Object, where: o.id in ^object_ids))
end
defp delete_objects(_), do: :ok
# we should delete 1 object for any given attachment, but don't delete
# files if there are more than 1 object for it
defp filter_objects(objects) do
Enum.reduce(objects, {[], []}, fn {href, %{id: id, count: count}}, {ids, hrefs} ->
with 1 <- count do
{ids ++ [id], hrefs ++ [href]}
else
_ -> {ids ++ [id], hrefs}
end
end)
end
defp prepare_objects(init, objects, actor, names) do
Enum.reduce(objects, init, fn %{
id: id,
data: %{
"url" => [%{"href" => href}],
"actor" => obj_actor,
"name" => name
}
},
acc ->
Map.update(acc, href, %{id: id, count: 1}, fn val ->
case obj_actor == actor and name in names do
true ->
# set id of the actor's object that will be deleted
%{val | id: id, count: val.count + 1}
false ->
# another actor's object, just increase count to not delete file
%{val | count: val.count + 1}
end
end)
end)
end
defp fetch_objects(hrefs, actor, names) do
from(
o in Object,
where: fragment("object_attachment_urls(?) && (?)", o.data, ^hrefs)
)
|> Pleroma.RepoStreamer.chunk_stream(@batch_size, timeout: :infinity)
|> Stream.transform(%{}, fn objs, acc ->
res = prepare_objects(acc, objs, actor, names)
{res, res}
end)
|> Enum.to_list()
end
end
|