summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Tashkinov <ivantashkinov@gmail.com>2021-02-16 23:14:15 +0300
committerIvan Tashkinov <ivantashkinov@gmail.com>2021-02-16 23:14:15 +0300
commit938823c73040f6b55896581daf5baf732f859f02 (patch)
tree73cdd1c1b4961267cb02e62707ea2b6c8c0e63dc
parent1dac7d14623f36744953a523650211540d90d1fc (diff)
[#3213] HashtagsTableMigrator state management refactoring & improvements (proper stats serialization etc.).
-rw-r--r--lib/pleroma/data_migration.ex15
-rw-r--r--lib/pleroma/migrators/hashtags_table_migrator.ex87
-rw-r--r--lib/pleroma/migrators/hashtags_table_migrator/state.ex87
3 files changed, 122 insertions, 67 deletions
diff --git a/lib/pleroma/data_migration.ex b/lib/pleroma/data_migration.ex
index 64fa155ff..1377af16e 100644
--- a/lib/pleroma/data_migration.ex
+++ b/lib/pleroma/data_migration.ex
@@ -10,6 +10,7 @@ defmodule Pleroma.DataMigration do
alias Pleroma.Repo
import Ecto.Changeset
+ import Ecto.Query
schema "data_migrations" do
field(:name, :string)
@@ -28,14 +29,12 @@ defmodule Pleroma.DataMigration do
|> unique_constraint(:name)
end
- def update(data_migration, params \\ %{}) do
- data_migration
- |> changeset(params)
- |> Repo.update()
- end
-
- def update_state(data_migration, new_state) do
- update(data_migration, %{state: new_state})
+ def update_one_by_id(id, params \\ %{}) do
+ with {1, _} <-
+ from(dm in DataMigration, where: dm.id == ^id)
+ |> Repo.update_all(set: params) do
+ :ok
+ end
end
def get_by_name(name) do
diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex
index 432c3401a..a226d9d29 100644
--- a/lib/pleroma/migrators/hashtags_table_migrator.ex
+++ b/lib/pleroma/migrators/hashtags_table_migrator.ex
@@ -11,16 +11,16 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
alias __MODULE__.State
alias Pleroma.Config
- alias Pleroma.DataMigration
alias Pleroma.Hashtag
alias Pleroma.Object
alias Pleroma.Repo
- defdelegate state(), to: State, as: :get
- defdelegate put_stat(key, value), to: State, as: :put
- defdelegate increment_stat(key, increment), to: State, as: :increment
+ defdelegate data_migration(), to: State
- defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table
+ defdelegate state(), to: State
+ defdelegate get_stat(key, value), to: State, as: :get_data_key
+ defdelegate put_stat(key, value), to: State, as: :put_data_key
+ defdelegate increment_stat(key, increment), to: State, as: :increment_data_key
@reg_name {:global, __MODULE__}
@@ -45,7 +45,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
def handle_continue(:init_state, _state) do
{:ok, _} = State.start_link(nil)
- update_status(:init)
+ update_status(:pending)
data_migration = data_migration()
manual_migrations = Config.get([:instance, :manual_data_migrations], [])
@@ -55,13 +55,13 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
update_status(:noop)
is_nil(data_migration) ->
- update_status(:halt, "Data migration does not exist.")
+ update_status(:failed, "Data migration does not exist.")
data_migration.state == :manual or data_migration.name in manual_migrations ->
- update_status(:noop, "Data migration is in manual execution state.")
+ update_status(:manual, "Data migration is in manual execution state.")
data_migration.state == :complete ->
- handle_success(data_migration)
+ on_complete(data_migration)
true ->
send(self(), :migrate_hashtags)
@@ -72,20 +72,15 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
@impl true
def handle_info(:migrate_hashtags, state) do
- State.clear()
+ State.reinit()
update_status(:running)
put_stat(:started_at, NaiveDateTime.utc_now())
- data_migration = data_migration()
- persistent_data = Map.take(data_migration.data, ["max_processed_id"])
-
- {:ok, data_migration} =
- DataMigration.update(data_migration, %{state: :running, data: persistent_data})
-
- Logger.info("Starting transferring object embedded hashtags to `hashtags` table...")
+ %{id: data_migration_id} = data_migration()
+ max_processed_id = get_stat(:max_processed_id, 0)
- max_processed_id = data_migration.data["max_processed_id"] || 0
+ Logger.info("Transferring embedded hashtags to `hashtags` (from oid: #{max_processed_id})...")
query()
|> where([object], object.id > ^max_processed_id)
@@ -104,7 +99,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
Repo.query(
"INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <>
"VALUES ($1, $2) ON CONFLICT DO NOTHING;",
- [data_migration.id, failed_id]
+ [data_migration_id, failed_id]
)
end
@@ -112,7 +107,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
Repo.query(
"DELETE FROM data_migration_failed_ids " <>
"WHERE data_migration_id = $1 AND record_id = ANY($2)",
- [data_migration.id, object_ids -- failed_ids]
+ [data_migration_id, object_ids -- failed_ids]
)
max_object_id = Enum.at(object_ids, -1)
@@ -120,14 +115,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
put_stat(:max_processed_id, max_object_id)
increment_stat(:processed_count, length(object_ids))
increment_stat(:failed_count, length(failed_ids))
-
- put_stat(
- :records_per_second,
- state()[:processed_count] /
- Enum.max([NaiveDateTime.diff(NaiveDateTime.utc_now(), state()[:started_at]), 1])
- )
-
- persist_stats(data_migration)
+ put_stat(:records_per_second, records_per_second())
+ _ = State.persist_to_db()
# A quick and dirty approach to controlling the load this background migration imposes
sleep_interval = Config.get([:populate_hashtags_table, :sleep_interval_ms], 0)
@@ -135,22 +124,25 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
end)
|> Stream.run()
- with 0 <- failures_count(data_migration.id) do
+ with 0 <- failures_count(data_migration_id) do
_ = delete_non_create_activities_hashtags()
-
- {:ok, data_migration} = DataMigration.update_state(data_migration, :complete)
-
- handle_success(data_migration)
+ set_complete()
else
_ ->
- _ = DataMigration.update_state(data_migration, :failed)
-
update_status(:failed, "Please check data_migration_failed_ids records.")
end
{:noreply, state}
end
+ defp records_per_second do
+ get_stat(:processed_count, 0) / Enum.max([running_time(), 1])
+ end
+
+ defp running_time do
+ NaiveDateTime.diff(NaiveDateTime.utc_now(), get_stat(:started_at, NaiveDateTime.utc_now()))
+ end
+
@hashtags_objects_cleanup_query """
DELETE FROM hashtags_objects WHERE object_id IN
(SELECT DISTINCT objects.id FROM objects
@@ -169,6 +161,10 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
WHERE hashtags_objects.hashtag_id IS NULL);
"""
+ @doc """
+ Deletes `hashtags_objects` for legacy objects not asoociated with Create activity.
+ Also deletes unreferenced `hashtags` records (might occur after deletion of `hashtags_objects`).
+ """
def delete_non_create_activities_hashtags do
{:ok, %{num_rows: hashtags_objects_count}} =
Repo.query(@hashtags_objects_cleanup_query, [], timeout: :infinity)
@@ -256,14 +252,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
end
end
- defp persist_stats(data_migration) do
- runner_state = Map.drop(state(), [:status])
- _ = DataMigration.update(data_migration, %{data: runner_state})
- end
-
- defp handle_success(data_migration) do
- update_status(:complete)
-
+ defp on_complete(data_migration) do
cond do
data_migration.feature_lock ->
:noop
@@ -321,18 +310,18 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
end
def force_restart do
- {:ok, _} = DataMigration.update(data_migration(), %{state: :pending, data: %{}})
+ :ok = State.reset()
force_continue()
end
- def force_complete do
- {:ok, data_migration} = DataMigration.update_state(data_migration(), :complete)
-
- handle_success(data_migration)
+ def set_complete do
+ update_status(:complete)
+ _ = State.persist_to_db()
+ on_complete(data_migration())
end
defp update_status(status, message \\ nil) do
- put_stat(:status, status)
+ put_stat(:state, status)
put_stat(:message, message)
end
end
diff --git a/lib/pleroma/migrators/hashtags_table_migrator/state.ex b/lib/pleroma/migrators/hashtags_table_migrator/state.ex
index 901563426..ed9848824 100644
--- a/lib/pleroma/migrators/hashtags_table_migrator/state.ex
+++ b/lib/pleroma/migrators/hashtags_table_migrator/state.ex
@@ -5,31 +5,98 @@
defmodule Pleroma.Migrators.HashtagsTableMigrator.State do
use Agent
- @init_state %{}
+ alias Pleroma.DataMigration
+
+ defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table
+
@reg_name {:global, __MODULE__}
def start_link(_) do
- Agent.start_link(fn -> @init_state end, name: @reg_name)
+ Agent.start_link(fn -> load_state_from_db() end, name: @reg_name)
+ end
+
+ defp load_state_from_db do
+ data_migration = data_migration()
+
+ data =
+ if data_migration do
+ Map.new(data_migration.data, fn {k, v} -> {String.to_atom(k), v} end)
+ else
+ %{}
+ end
+
+ %{
+ data_migration_id: data_migration && data_migration.id,
+ data: data
+ }
end
- def clear do
- Agent.update(@reg_name, fn _state -> @init_state end)
+ def persist_to_db do
+ %{data_migration_id: data_migration_id, data: data} = state()
+
+ if data_migration_id do
+ DataMigration.update_one_by_id(data_migration_id, data: data)
+ else
+ {:error, :nil_data_migration_id}
+ end
+ end
+
+ def reset do
+ %{data_migration_id: data_migration_id} = state()
+
+ with false <- is_nil(data_migration_id),
+ :ok <-
+ DataMigration.update_one_by_id(data_migration_id,
+ state: :pending,
+ data: %{}
+ ) do
+ reinit()
+ else
+ true -> {:error, :nil_data_migration_id}
+ e -> e
+ end
end
- def get do
+ def reinit do
+ Agent.update(@reg_name, fn _state -> load_state_from_db() end)
+ end
+
+ def state do
Agent.get(@reg_name, & &1)
end
- def put(key, value) do
+ def get_data_key(key, default \\ nil) do
+ get_in(state(), [:data, key]) || default
+ end
+
+ def put_data_key(key, value) do
+ _ = persist_non_data_change(key, value)
+
Agent.update(@reg_name, fn state ->
- Map.put(state, key, value)
+ put_in(state, [:data, key], value)
end)
end
- def increment(key, increment \\ 1) do
+ def increment_data_key(key, increment \\ 1) do
Agent.update(@reg_name, fn state ->
- updated_value = (state[key] || 0) + increment
- Map.put(state, key, updated_value)
+ initial_value = get_in(state, [:data, key]) || 0
+ updated_value = initial_value + increment
+ put_in(state, [:data, key], updated_value)
end)
end
+
+ defp persist_non_data_change(:state, value) do
+ with true <- get_data_key(:state) != value,
+ true <- value in Pleroma.DataMigration.State.__valid_values__(),
+ %{data_migration_id: data_migration_id} when not is_nil(data_migration_id) <- state() do
+ DataMigration.update_one_by_id(data_migration_id, state: value)
+ else
+ false -> :ok
+ _ -> {:error, :nil_data_migration_id}
+ end
+ end
+
+ defp persist_non_data_change(_, _) do
+ nil
+ end
end