summaryrefslogtreecommitdiff
path: root/priv/repo/migrations/20210128092834_remove_duplicates_from_activity_expiration_queue.exs
blob: a666d5cd490a14897d0a8e9c5c8cfe798236dda8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Pleroma: A lightweight social networking server
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only

defmodule Pleroma.Repo.Migrations.RemoveDuplicatesFromActivityExpirationQueue do
  use Ecto.Migration

  import Ecto.Query, only: [from: 2]

  def up do
    duplicate_ids =
      from(j in Oban.Job,
        where: j.queue == "activity_expiration",
        where: j.worker == "Pleroma.Workers.PurgeExpiredActivity",
        where: j.state == "scheduled",
        select:
          {fragment("(?)->>'activity_id'", j.args), fragment("array_agg(?)", j.id), count(j.id)},
        group_by: fragment("(?)->>'activity_id'", j.args),
        having: count(j.id) > 1
      )
      |> Pleroma.Repo.all()
      |> Enum.map(fn {_, ids, _} ->
        max_id = Enum.max(ids)
        List.delete(ids, max_id)
      end)
      |> List.flatten()

    from(j in Oban.Job, where: j.id in ^duplicate_ids)
    |> Pleroma.Repo.delete_all()
  end

  def down, do: :noop
end