From b981edad8a7d8f27b231bc6164fc0546efbdb646 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Thu, 18 Feb 2021 20:40:10 +0300 Subject: [#3213] HashtagsTableMigrator: fault rate allowance to enable the feature (defaults to 1%), counting of affected objects, misc. tweaks. --- config/config.exs | 2 + config/description.exs | 7 ++ docs/configuration/cheatsheet.md | 1 + lib/pleroma/migrators/hashtags_table_migrator.ex | 101 +++++++++++++++------ .../migrators/hashtags_table_migrator/state.ex | 4 +- 5 files changed, 84 insertions(+), 31 deletions(-) diff --git a/config/config.exs b/config/config.exs index 0fbca06f3..c371c397c 100644 --- a/config/config.exs +++ b/config/config.exs @@ -657,6 +657,8 @@ config :pleroma, :database, rum_enabled: false +config :pleroma, :populate_hashtags_table, fault_rate_allowance: 0.01 + config :pleroma, :env, Mix.env() config :http_signatures, diff --git a/config/description.exs b/config/description.exs index 29fc5fbd4..6ffc71278 100644 --- a/config/description.exs +++ b/config/description.exs @@ -479,6 +479,13 @@ type: :group, description: "`populate_hashtags_table` background migration settings", children: [ + %{ + key: :fault_rate_allowance, + type: :float, + description: + "Max rate of failed objects to actually processed objects in order to enable the feature (any value from 0.0 which tolerates no errors to 1.0 which will enable the feature even if hashtags transfer failed for all records).", + suggestions: [0.01] + }, %{ key: :sleep_interval_ms, type: :integer, diff --git a/docs/configuration/cheatsheet.md b/docs/configuration/cheatsheet.md index 68a5a3c7f..6a1031f15 100644 --- a/docs/configuration/cheatsheet.md +++ b/docs/configuration/cheatsheet.md @@ -70,6 +70,7 @@ To add configuration to your config file, you can copy it from the base config. ## Background migrations * `populate_hashtags_table/sleep_interval_ms`: Sleep interval between each chunk of processed records in order to decrease the load on the system (defaults to 0 and should be keep default on most instances). +* `populate_hashtags_table/fault_rate_allowance`: Max rate of failed objects to actually processed objects in order to enable the feature (any value from 0.0 which tolerates no errors to 1.0 which will enable the feature even if hashtags transfer failed for all records). ## Welcome * `direct_message`: - welcome message sent as a direct message. diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index ac17f91cc..45dab8470 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -15,7 +15,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do alias Pleroma.Object alias Pleroma.Repo - defdelegate data_migration(), to: State + defdelegate data_migration(), to: Pleroma.DataMigration, as: :populate_hashtags_table + defdelegate data_migration_id(), to: State defdelegate state(), to: State defdelegate persist_state(), to: State, as: :persist_to_db @@ -23,10 +24,13 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do defdelegate put_stat(key, value), to: State, as: :put_data_key defdelegate increment_stat(key, increment), to: State, as: :increment_data_key + @feature_config_path [:database, :improved_hashtag_timeline] @reg_name {:global, __MODULE__} def whereis, do: GenServer.whereis(@reg_name) + def feature_state, do: Config.get(@feature_config_path) + def start_link(_) do case whereis() do nil -> @@ -46,8 +50,6 @@ def init(_) do def handle_continue(:init_state, _state) do {:ok, _} = State.start_link(nil) - update_status(:pending) - data_migration = data_migration() manual_migrations = Config.get([:instance, :manual_data_migrations], []) @@ -56,10 +58,14 @@ def handle_continue(:init_state, _state) do update_status(:noop) is_nil(data_migration) -> - update_status(:failed, "Data migration does not exist.") + message = "Data migration does not exist." + update_status(:failed, message) + Logger.error("#{__MODULE__}: #{message}") data_migration.state == :manual or data_migration.name in manual_migrations -> - update_status(:manual, "Data migration is in manual execution state.") + message = "Data migration is in manual execution or manual fix mode." + update_status(:manual, message) + Logger.warn("#{__MODULE__}: #{message}") data_migration.state == :complete -> on_complete(data_migration) @@ -78,7 +84,7 @@ def handle_info(:migrate_hashtags, state) do update_status(:running) put_stat(:started_at, NaiveDateTime.utc_now()) - %{id: data_migration_id} = data_migration() + data_migration_id = data_migration_id() max_processed_id = get_stat(:max_processed_id, 0) Logger.info("Transferring embedded hashtags to `hashtags` (from oid: #{max_processed_id})...") @@ -89,12 +95,19 @@ def handle_info(:migrate_hashtags, state) do |> Stream.each(fn objects -> object_ids = Enum.map(objects, & &1.id) + results = Enum.map(objects, &transfer_object_hashtags(&1)) + failed_ids = - objects - |> Enum.map(&transfer_object_hashtags(&1)) + results |> Enum.filter(&(elem(&1, 0) == :error)) |> Enum.map(&elem(&1, 1)) + # Count of objects with hashtags (`{:noop, id}` is returned for objects having other AS2 tags) + chunk_affected_count = + results + |> Enum.filter(&(elem(&1, 0) == :ok)) + |> length() + for failed_id <- failed_ids do _ = Repo.query( @@ -116,6 +129,7 @@ def handle_info(:migrate_hashtags, state) do put_stat(:max_processed_id, max_object_id) increment_stat(:processed_count, length(object_ids)) increment_stat(:failed_count, length(failed_ids)) + increment_stat(:affected_count, chunk_affected_count) put_stat(:records_per_second, records_per_second()) persist_state() @@ -125,17 +139,42 @@ def handle_info(:migrate_hashtags, state) do end) |> Stream.run() - with 0 <- failures_count(data_migration_id) do - _ = delete_non_create_activities_hashtags() - set_complete() - else - _ -> - update_status(:failed, "Please check data_migration_failed_ids records.") + fault_rate = fault_rate() + put_stat(:fault_rate, fault_rate) + fault_rate_allowance = Config.get([:populate_hashtags_table, :fault_rate_allowance], 0) + + cond do + fault_rate == 0 -> + set_complete() + + is_float(fault_rate) and fault_rate <= fault_rate_allowance -> + message = """ + Done with fault rate of #{fault_rate} which doesn't exceed #{fault_rate_allowance}. + Putting data migration to manual fix mode. Check `retry_failed/0`. + """ + + Logger.warn("#{__MODULE__}: #{message}") + update_status(:manual, message) + on_complete(data_migration()) + + true -> + message = "Too many failures. Check data_migration_failed_ids records / `retry_failed/0`." + Logger.error("#{__MODULE__}: #{message}") + update_status(:failed, message) end + persist_state() {:noreply, state} end + def fault_rate do + with failures_count when is_integer(failures_count) <- failures_count() do + failures_count / Enum.max([get_stat(:affected_count, 0), 1]) + else + _ -> :error + end + end + defp records_per_second do get_stat(:processed_count, 0) / Enum.max([running_time(), 1]) end @@ -194,6 +233,7 @@ defp query do |> where([_o, hashtags_objects], is_nil(hashtags_objects.object_id)) end + @spec transfer_object_hashtags(Map.t()) :: {:noop | :ok | :error, integer()} defp transfer_object_hashtags(object) do embedded_tags = if Map.has_key?(object, :tag), do: object.tag, else: object.data["tag"] hashtags = Object.object_data_hashtags(%{"tag" => embedded_tags}) @@ -201,7 +241,7 @@ defp transfer_object_hashtags(object) do if Enum.any?(hashtags) do transfer_object_hashtags(object, hashtags) else - {:ok, object.id} + {:noop, object.id} end end @@ -209,13 +249,11 @@ defp transfer_object_hashtags(object, hashtags) do Repo.transaction(fn -> with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do maps = Enum.map(hashtag_records, &%{hashtag_id: &1.id, object_id: object.id}) - expected_rows = length(hashtag_records) - - base_error = - "ERROR when inserting #{expected_rows} hashtags_objects for obj. #{object.id}" + base_error = "ERROR when inserting hashtags_objects for object with id #{object.id}" try do - with {^expected_rows, _} <- Repo.insert_all("hashtags_objects", maps) do + with {rows_count, _} when is_integer(rows_count) <- + Repo.insert_all("hashtags_objects", maps, on_conflict: :nothing) do object.id else e -> @@ -260,11 +298,11 @@ defp on_complete(data_migration) do data_migration.feature_lock -> :noop - not is_nil(Config.get([:database, :improved_hashtag_timeline])) -> + not is_nil(feature_state()) -> :noop true -> - Config.put([:database, :improved_hashtag_timeline], true) + Config.put(@feature_config_path, true) :ok end end @@ -274,38 +312,41 @@ def failed_objects_query do |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"), on: dmf.record_id == o.id ) - |> where([_o, dmf], dmf.data_migration_id == ^data_migration().id) + |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id()) |> order_by([o], asc: o.id) end - def failures_count(data_migration_id \\ nil) do - data_migration_id = data_migration_id || data_migration().id - + def failures_count do with {:ok, %{rows: [[count]]}} <- Repo.query( "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", - [data_migration_id] + [data_migration_id()] ) do count end end def retry_failed do - data_migration = data_migration() + data_migration_id = data_migration_id() failed_objects_query() |> Repo.chunk_stream(100, :one) |> Stream.each(fn object -> - with {:ok, _} <- transfer_object_hashtags(object) do + with {res, _} when res != :error <- transfer_object_hashtags(object) do _ = Repo.query( "DELETE FROM data_migration_failed_ids " <> "WHERE data_migration_id = $1 AND record_id = $2", - [data_migration.id, object.id] + [data_migration_id, object.id] ) end end) |> Stream.run() + + put_stat(:failed_count, failures_count()) + persist_state() + + force_continue() end def force_continue do diff --git a/lib/pleroma/migrators/hashtags_table_migrator/state.ex b/lib/pleroma/migrators/hashtags_table_migrator/state.ex index ed9848824..ee0009b2e 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator/state.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator/state.ex @@ -7,7 +7,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator.State do alias Pleroma.DataMigration - defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table + defdelegate data_migration(), to: Pleroma.Migrators.HashtagsTableMigrator @reg_name {:global, __MODULE__} @@ -99,4 +99,6 @@ defp persist_non_data_change(:state, value) do defp persist_non_data_change(_, _) do nil end + + def data_migration_id, do: Map.get(state(), :data_migration_id) end -- cgit v1.2.3 From 998437d4a4111055e019f28dd84a8af1f9a27047 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Thu, 18 Feb 2021 21:03:06 +0300 Subject: [#3213] Experimental / debug feature: `database: [improved_hashtag_timeline: :preselect_hashtag_ids]`. --- lib/pleroma/web/activity_pub/activity_pub.ex | 47 +++++++++++++++++++++------- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index e012f2779..5392ce7c9 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -787,19 +787,42 @@ defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do end defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do - from( - [_activity, object] in query, - where: - fragment( - """ - EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects - ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?::citext[]) - AND hashtags_objects.object_id = ? LIMIT 1) - """, - ^tags, - object.id + # TODO: refactor: debug / experimental feature + if Config.get([:database, :improved_hashtag_timeline]) == :preselect_hashtag_ids do + hashtag_ids = + from(ht in Pleroma.Hashtag, + where: fragment("name = ANY(?::citext[])", ^tags), + select: ht.id ) - ) + |> Repo.all() + + from( + [_activity, object] in query, + where: + fragment( + """ + EXISTS ( + SELECT 1 FROM hashtags_objects WHERE hashtag_id = ANY(?) AND object_id = ? LIMIT 1) + """, + ^hashtag_ids, + object.id + ) + ) + else + from( + [_activity, object] in query, + where: + fragment( + """ + EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?::citext[]) + AND hashtags_objects.object_id = ? LIMIT 1) + """, + ^tags, + object.id + ) + ) + end end defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do -- cgit v1.2.3