diff options
authorIvan Tashkinov <>2021-03-12 12:18:11 +0300
committerIvan Tashkinov <>2021-03-12 12:18:11 +0300
commit3edf45021eb6c3fba06bc083b346f7db54cd073f (patch)
parentfbcddd812627283cbf320608cf692b38ff64c0b9 (diff)
[#3213] Background migration infrastructure refactoring.
Extracted BaseMigrator and BaseMigratorState.
5 files changed, 385 insertions, 321 deletions
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index 2ff7562e2..06d399b2e 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -103,10 +103,7 @@ defmodule Pleroma.Application do
task_children(@mix_env) ++
dont_run_in_test(@mix_env) ++
chat_child(chat_enabled?()) ++
- [
- Pleroma.Migrators.HashtagsTableMigrator,
- Pleroma.Gopher.Server
- ]
+ [Pleroma.Gopher.Server]
# See
# for other strategies and supported options
@@ -231,6 +228,12 @@ defmodule Pleroma.Application do
keys: :duplicate,
partitions: System.schedulers_online()
+ ] ++ background_migrators()
+ end
+ defp background_migrators do
+ [
+ Pleroma.Migrators.HashtagsTableMigrator
diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex
index 6123c88e0..b84058e11 100644
--- a/lib/pleroma/migrators/hashtags_table_migrator.ex
+++ b/lib/pleroma/migrators/hashtags_table_migrator.ex
@@ -3,88 +3,27 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Migrators.HashtagsTableMigrator do
- use GenServer
+ defmodule State do
+ use Pleroma.Migrators.Support.BaseMigratorState
- require Logger
+ @impl Pleroma.Migrators.Support.BaseMigratorState
+ defdelegate data_migration(), to: Pleroma.DataMigration, as: :populate_hashtags_table
+ end
- import Ecto.Query
+ use Pleroma.Migrators.Support.BaseMigrator
- alias __MODULE__.State
- alias Pleroma.Config
alias Pleroma.Hashtag
+ alias Pleroma.Migrators.Support.BaseMigrator
alias Pleroma.Object
- alias Pleroma.Repo
- defdelegate data_migration(), to: Pleroma.DataMigration, as: :populate_hashtags_table
- defdelegate data_migration_id(), to: State
- defdelegate state(), to: State
- defdelegate persist_state(), to: State, as: :persist_to_db
- defdelegate get_stat(key, value \\ nil), to: State, as: :get_data_key
- defdelegate put_stat(key, value), to: State, as: :put_data_key
- defdelegate increment_stat(key, increment), to: State, as: :increment_data_key
- @feature_config_path [:features, :improved_hashtag_timeline]
- @reg_name {:global, __MODULE__}
- def whereis, do: GenServer.whereis(@reg_name)
- def feature_state, do: Config.get(@feature_config_path)
- def start_link(_) do
- case whereis() do
- nil ->
- GenServer.start_link(__MODULE__, nil, name: @reg_name)
- pid ->
- {:ok, pid}
- end
- end
- @impl true
- def init(_) do
- {:ok, nil, {:continue, :init_state}}
- end
- @impl true
- def handle_continue(:init_state, _state) do
- {:ok, _} = State.start_link(nil)
+ @impl BaseMigrator
+ def feature_config_path, do: [:features, :improved_hashtag_timeline]
- data_migration = data_migration()
- manual_migrations = Config.get([:instance, :manual_data_migrations], [])
- cond do
- Config.get(:env) == :test ->
- update_status(:noop)
- is_nil(data_migration) ->
- message = "Data migration does not exist."
- update_status(:failed, message)
- Logger.error("#{__MODULE__}: #{message}")
- data_migration.state == :manual or in manual_migrations ->
- message = "Data migration is in manual execution or manual fix mode."
- update_status(:manual, message)
- Logger.warn("#{__MODULE__}: #{message}")
- data_migration.state == :complete ->
- on_complete(data_migration)
- true ->
- send(self(), :migrate_hashtags)
- end
- {:noreply, nil}
- end
- @impl true
- def handle_info(:migrate_hashtags, state) do
- State.reinit()
- update_status(:running)
- put_stat(:iteration_processed_count, 0)
- put_stat(:started_at, NaiveDateTime.utc_now())
+ @impl BaseMigrator
+ def fault_rate_allowance, do: Config.get([:populate_hashtags_table, :fault_rate_allowance], 0)
+ @impl BaseMigrator
+ def perform do
data_migration_id = data_migration_id()
max_processed_id = get_stat(:max_processed_id, 0)
@@ -103,7 +42,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
|> Enum.filter(&(elem(&1, 0) == :error))
|>, 1))
- # Count of objects with hashtags (`{:noop, id}` is returned for objects having other AS2 tags)
+ # Count of objects with hashtags: `{:noop, id}` is returned for objects having other AS2 tags
chunk_affected_count =
|> Enum.filter(&(elem(&1, 0) == :ok))
@@ -140,84 +79,10 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
- fault_rate = fault_rate()
- put_stat(:fault_rate, fault_rate)
- fault_rate_allowance = Config.get([:populate_hashtags_table, :fault_rate_allowance], 0)
- cond do
- fault_rate == 0 ->
- set_complete()
- is_float(fault_rate) and fault_rate <= fault_rate_allowance ->
- message = """
- Done with fault rate of #{fault_rate} which doesn't exceed #{fault_rate_allowance}.
- Putting data migration to manual fix mode. Check `retry_failed/0`.
- """
- Logger.warn("#{__MODULE__}: #{message}")
- update_status(:manual, message)
- on_complete(data_migration())
- true ->
- message = "Too many failures. Check data_migration_failed_ids records / `retry_failed/0`."
- Logger.error("#{__MODULE__}: #{message}")
- update_status(:failed, message)
- end
- persist_state()
- {:noreply, state}
- end
- def fault_rate do
- with failures_count when is_integer(failures_count) <- failures_count() do
- failures_count / Enum.max([get_stat(:affected_count, 0), 1])
- else
- _ -> :error
- end
- end
- defp records_per_second do
- get_stat(:iteration_processed_count, 0) / Enum.max([running_time(), 1])
- end
- defp running_time do
- NaiveDateTime.diff(NaiveDateTime.utc_now(), get_stat(:started_at, NaiveDateTime.utc_now()))
- end
- @hashtags_objects_cleanup_query """
- DELETE FROM hashtags_objects WHERE object_id IN
- JOIN hashtags_objects ON hashtags_objects.object_id = LEFT JOIN activities
- ON COALESCE(>'object'->>'id',>>'object') =
- (>>'id')
- AND>>'type' = 'Create'
- """
- @hashtags_cleanup_query """
- DELETE FROM hashtags WHERE id IN
- (SELECT FROM hashtags
- LEFT OUTER JOIN hashtags_objects
- ON hashtags_objects.hashtag_id =
- WHERE hashtags_objects.hashtag_id IS NULL);
- """
- @doc """
- Deletes `hashtags_objects` for legacy objects not asoociated with Create activity.
- Also deletes unreferenced `hashtags` records (might occur after deletion of `hashtags_objects`).
- """
- def delete_non_create_activities_hashtags do
- {:ok, %{num_rows: hashtags_objects_count}} =
- Repo.query(@hashtags_objects_cleanup_query, [], timeout: :infinity)
- {:ok, %{num_rows: hashtags_count}} =
- Repo.query(@hashtags_cleanup_query, [], timeout: :infinity)
- {:ok, hashtags_objects_count, hashtags_count}
- defp query do
+ @impl BaseMigrator
+ def query do
# Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out)
# Note: not checking activity type, expecting remove_non_create_objects_hashtags/_ to clean up
@@ -276,54 +141,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
- @doc "Approximate count for current iteration (including processed records count)"
- def count(force \\ false, timeout \\ :infinity) do
- stored_count = get_stat(:count)
- if stored_count && !force do
- stored_count
- else
- processed_count = get_stat(:processed_count, 0)
- max_processed_id = get_stat(:max_processed_id, 0)
- query = where(query(), [object], > ^max_processed_id)
- count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count
- put_stat(:count, count)
- persist_state()
- count
- end
- end
- defp on_complete(data_migration) do
- if data_migration.feature_lock || feature_state() == :disabled do
- Logger.warn("#{__MODULE__}: migration complete but feature is locked; consider enabling.")
- :noop
- else
- Config.put(@feature_config_path, :enabled)
- :ok
- end
- end
- def failed_objects_query do
- from(o in Object)
- |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"),
- on: dmf.record_id ==
- )
- |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id())
- |> order_by([o], asc:
- end
- def failures_count do
- with {:ok, %{rows: [[count]]}} <-
- Repo.query(
- "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;",
- [data_migration_id()]
- ) do
- count
- end
- end
+ @impl BaseMigrator
def retry_failed do
data_migration_id = data_migration_id()
@@ -347,23 +165,44 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
- def force_continue do
- send(whereis(), :migrate_hashtags)
+ defp failed_objects_query do
+ from(o in Object)
+ |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"),
+ on: dmf.record_id ==
+ )
+ |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id())
+ |> order_by([o], asc:
- def force_restart do
- :ok = State.reset()
- force_continue()
- end
+ @doc """
+ Service func to delete `hashtags_objects` for legacy objects not associated with Create activity.
+ Also deletes unreferenced `hashtags` records (might occur after deletion of `hashtags_objects`).
+ """
+ def delete_non_create_activities_hashtags do
+ hashtags_objects_cleanup_query = """
+ DELETE FROM hashtags_objects WHERE object_id IN
+ JOIN hashtags_objects ON hashtags_objects.object_id = LEFT JOIN activities
+ ON COALESCE(>'object'->>'id',>>'object') =
+ (>>'id')
+ AND>>'type' = 'Create'
+ """
+ hashtags_cleanup_query = """
+ DELETE FROM hashtags WHERE id IN
+ (SELECT FROM hashtags
+ LEFT OUTER JOIN hashtags_objects
+ ON hashtags_objects.hashtag_id =
+ WHERE hashtags_objects.hashtag_id IS NULL);
+ """
- def set_complete do
- update_status(:complete)
- persist_state()
- on_complete(data_migration())
- end
+ {:ok, %{num_rows: hashtags_objects_count}} =
+ Repo.query(hashtags_objects_cleanup_query, [], timeout: :infinity)
- defp update_status(status, message \\ nil) do
- put_stat(:state, status)
- put_stat(:message, message)
+ {:ok, %{num_rows: hashtags_count}} =
+ Repo.query(hashtags_cleanup_query, [], timeout: :infinity)
+ {:ok, hashtags_objects_count, hashtags_count}
diff --git a/lib/pleroma/migrators/hashtags_table_migrator/state.ex b/lib/pleroma/migrators/hashtags_table_migrator/state.ex
deleted file mode 100644
index ee0009b2e..000000000
--- a/lib/pleroma/migrators/hashtags_table_migrator/state.ex
+++ /dev/null
@@ -1,104 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2021 Pleroma Authors <>
-# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.Migrators.HashtagsTableMigrator.State do
- use Agent
- alias Pleroma.DataMigration
- defdelegate data_migration(), to: Pleroma.Migrators.HashtagsTableMigrator
- @reg_name {:global, __MODULE__}
- def start_link(_) do
- Agent.start_link(fn -> load_state_from_db() end, name: @reg_name)
- end
- defp load_state_from_db do
- data_migration = data_migration()
- data =
- if data_migration do
-, fn {k, v} -> {String.to_atom(k), v} end)
- else
- %{}
- end
- %{
- data_migration_id: data_migration &&,
- data: data
- }
- end
- def persist_to_db do
- %{data_migration_id: data_migration_id, data: data} = state()
- if data_migration_id do
- DataMigration.update_one_by_id(data_migration_id, data: data)
- else
- {:error, :nil_data_migration_id}
- end
- end
- def reset do
- %{data_migration_id: data_migration_id} = state()
- with false <- is_nil(data_migration_id),
- :ok <-
- DataMigration.update_one_by_id(data_migration_id,
- state: :pending,
- data: %{}
- ) do
- reinit()
- else
- true -> {:error, :nil_data_migration_id}
- e -> e
- end
- end
- def reinit do
- Agent.update(@reg_name, fn _state -> load_state_from_db() end)
- end
- def state do
- Agent.get(@reg_name, & &1)
- end
- def get_data_key(key, default \\ nil) do
- get_in(state(), [:data, key]) || default
- end
- def put_data_key(key, value) do
- _ = persist_non_data_change(key, value)
- Agent.update(@reg_name, fn state ->
- put_in(state, [:data, key], value)
- end)
- end
- def increment_data_key(key, increment \\ 1) do
- Agent.update(@reg_name, fn state ->
- initial_value = get_in(state, [:data, key]) || 0
- updated_value = initial_value + increment
- put_in(state, [:data, key], updated_value)
- end)
- end
- defp persist_non_data_change(:state, value) do
- with true <- get_data_key(:state) != value,
- true <- value in Pleroma.DataMigration.State.__valid_values__(),
- %{data_migration_id: data_migration_id} when not is_nil(data_migration_id) <- state() do
- DataMigration.update_one_by_id(data_migration_id, state: value)
- else
- false -> :ok
- _ -> {:error, :nil_data_migration_id}
- end
- end
- defp persist_non_data_change(_, _) do
- nil
- end
- def data_migration_id, do: Map.get(state(), :data_migration_id)
diff --git a/lib/pleroma/migrators/support/base_migrator.ex b/lib/pleroma/migrators/support/base_migrator.ex
new file mode 100644
index 000000000..1f8a5402b
--- /dev/null
+++ b/lib/pleroma/migrators/support/base_migrator.ex
@@ -0,0 +1,210 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <>
+# SPDX-License-Identifier: AGPL-3.0-only
+defmodule Pleroma.Migrators.Support.BaseMigrator do
+ @moduledoc """
+ Base background migrator functionality.
+ """
+ @callback perform() :: any()
+ @callback retry_failed() :: any()
+ @callback feature_config_path() :: list(atom())
+ @callback query() :: Ecto.Query.t()
+ @callback fault_rate_allowance() :: integer() | float()
+ defmacro __using__(_opts) do
+ quote do
+ use GenServer
+ require Logger
+ import Ecto.Query
+ alias __MODULE__.State
+ alias Pleroma.Config
+ alias Pleroma.Repo
+ @behaviour Pleroma.Migrators.Support.BaseMigrator
+ defdelegate data_migration(), to: State
+ defdelegate data_migration_id(), to: State
+ defdelegate state(), to: State
+ defdelegate persist_state(), to: State, as: :persist_to_db
+ defdelegate get_stat(key, value \\ nil), to: State, as: :get_data_key
+ defdelegate put_stat(key, value), to: State, as: :put_data_key
+ defdelegate increment_stat(key, increment), to: State, as: :increment_data_key
+ @reg_name {:global, __MODULE__}
+ def whereis, do: GenServer.whereis(@reg_name)
+ def start_link(_) do
+ case whereis() do
+ nil ->
+ GenServer.start_link(__MODULE__, nil, name: @reg_name)
+ pid ->
+ {:ok, pid}
+ end
+ end
+ @impl true
+ def init(_) do
+ {:ok, nil, {:continue, :init_state}}
+ end
+ @impl true
+ def handle_continue(:init_state, _state) do
+ {:ok, _} = State.start_link(nil)
+ data_migration = data_migration()
+ manual_migrations = Config.get([:instance, :manual_data_migrations], [])
+ cond do
+ Config.get(:env) == :test ->
+ update_status(:noop)
+ is_nil(data_migration) ->
+ message = "Data migration does not exist."
+ update_status(:failed, message)
+ Logger.error("#{__MODULE__}: #{message}")
+ data_migration.state == :manual or in manual_migrations ->
+ message = "Data migration is in manual execution or manual fix mode."
+ update_status(:manual, message)
+ Logger.warn("#{__MODULE__}: #{message}")
+ data_migration.state == :complete ->
+ on_complete(data_migration)
+ true ->
+ send(self(), :perform)
+ end
+ {:noreply, nil}
+ end
+ @impl true
+ def handle_info(:perform, state) do
+ State.reinit()
+ update_status(:running)
+ put_stat(:iteration_processed_count, 0)
+ put_stat(:started_at, NaiveDateTime.utc_now())
+ perform()
+ fault_rate = fault_rate()
+ put_stat(:fault_rate, fault_rate)
+ fault_rate_allowance = fault_rate_allowance()
+ cond do
+ fault_rate == 0 ->
+ set_complete()
+ is_float(fault_rate) and fault_rate <= fault_rate_allowance ->
+ message = """
+ Done with fault rate of #{fault_rate} which doesn't exceed #{fault_rate_allowance}.
+ Putting data migration to manual fix mode. Try running `#{__MODULE__}.retry_failed/0`.
+ """
+ Logger.warn("#{__MODULE__}: #{message}")
+ update_status(:manual, message)
+ on_complete(data_migration())
+ true ->
+ message = "Too many failures. Try running `#{__MODULE__}.retry_failed/0`."
+ Logger.error("#{__MODULE__}: #{message}")
+ update_status(:failed, message)
+ end
+ persist_state()
+ {:noreply, state}
+ end
+ defp on_complete(data_migration) do
+ if data_migration.feature_lock || feature_state() == :disabled do
+ Logger.warn(
+ "#{__MODULE__}: migration complete but feature is locked; consider enabling."
+ )
+ :noop
+ else
+ Config.put(feature_config_path(), :enabled)
+ :ok
+ end
+ end
+ @doc "Approximate count for current iteration (including processed records count)"
+ def count(force \\ false, timeout \\ :infinity) do
+ stored_count = get_stat(:count)
+ if stored_count && !force do
+ stored_count
+ else
+ processed_count = get_stat(:processed_count, 0)
+ max_processed_id = get_stat(:max_processed_id, 0)
+ query = where(query(), [entity], > ^max_processed_id)
+ count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count
+ put_stat(:count, count)
+ persist_state()
+ count
+ end
+ end
+ def failures_count do
+ with {:ok, %{rows: [[count]]}} <-
+ Repo.query(
+ "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;",
+ [data_migration_id()]
+ ) do
+ count
+ end
+ end
+ def feature_state, do: Config.get(feature_config_path())
+ def force_continue do
+ send(whereis(), :perform)
+ end
+ def force_restart do
+ :ok = State.reset()
+ force_continue()
+ end
+ def set_complete do
+ update_status(:complete)
+ persist_state()
+ on_complete(data_migration())
+ end
+ defp update_status(status, message \\ nil) do
+ put_stat(:state, status)
+ put_stat(:message, message)
+ end
+ defp fault_rate do
+ with failures_count when is_integer(failures_count) <- failures_count() do
+ failures_count / Enum.max([get_stat(:affected_count, 0), 1])
+ else
+ _ -> :error
+ end
+ end
+ defp records_per_second do
+ get_stat(:iteration_processed_count, 0) / Enum.max([running_time(), 1])
+ end
+ defp running_time do
+ NaiveDateTime.diff(
+ NaiveDateTime.utc_now(),
+ get_stat(:started_at, NaiveDateTime.utc_now())
+ )
+ end
+ end
+ end
diff --git a/lib/pleroma/migrators/support/base_migrator_state.ex b/lib/pleroma/migrators/support/base_migrator_state.ex
new file mode 100644
index 000000000..69724ae79
--- /dev/null
+++ b/lib/pleroma/migrators/support/base_migrator_state.ex
@@ -0,0 +1,116 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2021 Pleroma Authors <>
+# SPDX-License-Identifier: AGPL-3.0-only
+defmodule Pleroma.Migrators.Support.BaseMigratorState do
+ @moduledoc """
+ Base background migrator state functionality.
+ """
+ @callback data_migration() :: Pleroma.DataMigration.t()
+ defmacro __using__(_opts) do
+ quote do
+ use Agent
+ alias Pleroma.DataMigration
+ @behaviour Pleroma.Migrators.Support.BaseMigratorState
+ @reg_name {:global, __MODULE__}
+ def start_link(_) do
+ Agent.start_link(fn -> load_state_from_db() end, name: @reg_name)
+ end
+ def data_migration, do: raise("data_migration/0 is not implemented")
+ defoverridable data_migration: 0
+ defp load_state_from_db do
+ data_migration = data_migration()
+ data =
+ if data_migration do
+, fn {k, v} -> {String.to_atom(k), v} end)
+ else
+ %{}
+ end
+ %{
+ data_migration_id: data_migration &&,
+ data: data
+ }
+ end
+ def persist_to_db do
+ %{data_migration_id: data_migration_id, data: data} = state()
+ if data_migration_id do
+ DataMigration.update_one_by_id(data_migration_id, data: data)
+ else
+ {:error, :nil_data_migration_id}
+ end
+ end
+ def reset do
+ %{data_migration_id: data_migration_id} = state()
+ with false <- is_nil(data_migration_id),
+ :ok <-
+ DataMigration.update_one_by_id(data_migration_id,
+ state: :pending,
+ data: %{}
+ ) do
+ reinit()
+ else
+ true -> {:error, :nil_data_migration_id}
+ e -> e
+ end
+ end
+ def reinit do
+ Agent.update(@reg_name, fn _state -> load_state_from_db() end)
+ end
+ def state do
+ Agent.get(@reg_name, & &1)
+ end
+ def get_data_key(key, default \\ nil) do
+ get_in(state(), [:data, key]) || default
+ end
+ def put_data_key(key, value) do
+ _ = persist_non_data_change(key, value)
+ Agent.update(@reg_name, fn state ->
+ put_in(state, [:data, key], value)
+ end)
+ end
+ def increment_data_key(key, increment \\ 1) do
+ Agent.update(@reg_name, fn state ->
+ initial_value = get_in(state, [:data, key]) || 0
+ updated_value = initial_value + increment
+ put_in(state, [:data, key], updated_value)
+ end)
+ end
+ defp persist_non_data_change(:state, value) do
+ with true <- get_data_key(:state) != value,
+ true <- value in Pleroma.DataMigration.State.__valid_values__(),
+ %{data_migration_id: data_migration_id} when not is_nil(data_migration_id) <- state() do
+ DataMigration.update_one_by_id(data_migration_id, state: value)
+ else
+ false -> :ok
+ _ -> {:error, :nil_data_migration_id}
+ end
+ end
+ defp persist_non_data_change(_, _) do
+ nil
+ end
+ def data_migration_id, do: Map.get(state(), :data_migration_id)
+ end
+ end