From a0977a1f1e450653bdc48630e9aada550dea1793 Mon Sep 17 00:00:00 2001 From: Alexander Strizhakov Date: Thu, 13 Feb 2020 15:45:35 +0300 Subject: optimization for group reports --- benchmarks/load_testing/fetcher.ex | 70 ++++++++----- benchmarks/load_testing/generator.ex | 48 +++++++++ benchmarks/mix/tasks/pleroma/load_testing.ex | 7 +- lib/pleroma/activity.ex | 30 +++++- lib/pleroma/user.ex | 5 + lib/pleroma/web/activity_pub/activity_pub.ex | 10 +- lib/pleroma/web/activity_pub/utils.ex | 118 +++++++++++++++++++++- lib/pleroma/web/activity_pub/views/user_view.ex | 4 + lib/pleroma/web/admin_api/admin_api_controller.ex | 6 ++ lib/pleroma/web/admin_api/views/report_view.ex | 36 +++++++ lib/pleroma/web/router.ex | 3 +- test/web/admin_api/admin_api_controller_test.exs | 89 ++++++++++++++++ 12 files changed, 392 insertions(+), 34 deletions(-) diff --git a/benchmarks/load_testing/fetcher.ex b/benchmarks/load_testing/fetcher.ex index a45a71d4a..1b71db076 100644 --- a/benchmarks/load_testing/fetcher.ex +++ b/benchmarks/load_testing/fetcher.ex @@ -98,33 +98,35 @@ def query_timelines(user) do end, "Rendering favorites timeline" => fn -> conn = Phoenix.ConnTest.build_conn(:get, "http://localhost:4001/api/v1/favourites", nil) - Pleroma.Web.MastodonAPI.StatusController.favourites( - %Plug.Conn{conn | - assigns: %{user: user}, - query_params: %{"limit" => "0"}, - body_params: %{}, - cookies: %{}, - params: %{}, - path_params: %{}, - private: %{ - Pleroma.Web.Router => {[], %{}}, - phoenix_router: Pleroma.Web.Router, - phoenix_action: :favourites, - phoenix_controller: Pleroma.Web.MastodonAPI.StatusController, - phoenix_endpoint: Pleroma.Web.Endpoint, - phoenix_format: "json", - phoenix_layout: {Pleroma.Web.LayoutView, "app.html"}, - phoenix_recycled: true, - phoenix_view: Pleroma.Web.MastodonAPI.StatusView, - plug_session: %{"user_id" => user.id}, - plug_session_fetch: :done, - plug_session_info: :write, - plug_skip_csrf_protection: true - } + Pleroma.Web.MastodonAPI.StatusController.favourites( + %Plug.Conn{ + conn + | assigns: %{user: user}, + query_params: %{"limit" => "0"}, + body_params: %{}, + cookies: %{}, + params: %{}, + path_params: %{}, + private: %{ + Pleroma.Web.Router => {[], %{}}, + phoenix_router: Pleroma.Web.Router, + phoenix_action: :favourites, + phoenix_controller: Pleroma.Web.MastodonAPI.StatusController, + phoenix_endpoint: Pleroma.Web.Endpoint, + phoenix_format: "json", + phoenix_layout: {Pleroma.Web.LayoutView, "app.html"}, + phoenix_recycled: true, + phoenix_view: Pleroma.Web.MastodonAPI.StatusView, + plug_session: %{"user_id" => user.id}, + plug_session_fetch: :done, + plug_session_info: :write, + plug_skip_csrf_protection: true + } }, - %{}) - end, + %{} + ) + end }) end @@ -257,4 +259,22 @@ def query_long_thread(user, activity) do end }) end + + def query_flags_group_reports do + statuses = Pleroma.Web.ActivityPub.Utils.get_reported_activities() + + Benchee.run(%{ + "Old method" => fn -> + Pleroma.Web.AdminAPI.ReportView.render( + "index_grouped.json", + Pleroma.Web.ActivityPub.Utils.get_reports_grouped_by_status(statuses) + ) + end, + "New method" => fn -> + Pleroma.Web.AdminAPI.ReportView.render("index_grouped_new.json", %{ + groups: Pleroma.Web.ActivityPub.Utils.get_grouped_reports() + }) + end + }) + end end diff --git a/benchmarks/load_testing/generator.ex b/benchmarks/load_testing/generator.ex index 3f88fefd7..303e2c483 100644 --- a/benchmarks/load_testing/generator.ex +++ b/benchmarks/load_testing/generator.ex @@ -1,5 +1,8 @@ defmodule Pleroma.LoadTesting.Generator do use Pleroma.LoadTesting.Helper + + import Ecto.Query + alias Pleroma.Web.CommonAPI def generate_like_activities(user, posts) do @@ -406,4 +409,49 @@ defp do_generate_non_visible_post(not_friend, users) do CommonAPI.post(user, post) end) end + + def generate_flags(remote_users, users) do + IO.puts("Starting generating 100 flag activities...") + + {time, _} = + :timer.tc(fn -> + do_generate_flags(remote_users, users) + end) + + IO.puts("Inserting flag activities take #{to_sec(time)} sec.\n") + end + + defp do_generate_flags(remote_users, users) do + Task.async_stream( + 1..100, + fn _ -> + do_generate_flag(Enum.random(remote_users), Enum.random(users)) + end, + max_concurrency: 30, + timeout: 30_000 + ) + |> Stream.run() + end + + defp do_generate_flag(actor, user) do + limit = Enum.random(1..3) + + activities = + from(a in Pleroma.Activity, + where: a.local == true, + where: a.actor == ^user.ap_id, + order_by: fragment("RANDOM()"), + limit: ^limit + ) + |> Repo.all() + + Pleroma.Web.ActivityPub.ActivityPub.flag(%{ + context: Pleroma.Web.ActivityPub.Utils.generate_context_id(), + actor: actor, + account: user, + statuses: activities, + content: "Some content", + forward: false + }) + end end diff --git a/benchmarks/mix/tasks/pleroma/load_testing.ex b/benchmarks/mix/tasks/pleroma/load_testing.ex index 0a751adac..330be2fd7 100644 --- a/benchmarks/mix/tasks/pleroma/load_testing.ex +++ b/benchmarks/mix/tasks/pleroma/load_testing.ex @@ -101,7 +101,8 @@ def run(args) do generate_remote_activities(user, remote_users) generate_like_activities( - user, Pleroma.Repo.all(Pleroma.Activity.Queries.by_type("Create")) + user, + Pleroma.Repo.all(Pleroma.Activity.Queries.by_type("Create")) ) generate_dms(user, users, opts) @@ -110,6 +111,8 @@ def run(args) do generate_non_visible_message(user, users) + generate_flags(remote_users, users) + IO.puts("Users in DB: #{Repo.aggregate(from(u in User), :count, :id)}") IO.puts("Activities in DB: #{Repo.aggregate(from(a in Pleroma.Activity), :count, :id)}") @@ -127,6 +130,7 @@ def run(args) do query_long_thread(user, activity) Pleroma.Config.put([:instance, :skip_thread_containment], false) query_timelines(user) + query_flags_group_reports() end defp clean_tables do @@ -134,5 +138,6 @@ defp clean_tables do Ecto.Adapters.SQL.query!(Repo, "TRUNCATE users CASCADE;") Ecto.Adapters.SQL.query!(Repo, "TRUNCATE activities CASCADE;") Ecto.Adapters.SQL.query!(Repo, "TRUNCATE objects CASCADE;") + Ecto.Adapters.SQL.query!(Repo, "TRUNCATE oban_jobs CASCADE;") end end diff --git a/lib/pleroma/activity.ex b/lib/pleroma/activity.ex index 72e2256ea..95ca3fc2e 100644 --- a/lib/pleroma/activity.ex +++ b/lib/pleroma/activity.ex @@ -190,9 +190,33 @@ def get_by_id_with_object(id) do |> Repo.one() end + def all_by_ids(ids) do + ids + |> all_by_ids_query() + |> Repo.all() + end + + def all_by_ids_query(query \\ Activity, ids) do + from(a in query, where: a.id in ^ids) + end + def all_by_ids_with_object(ids) do - Activity - |> where([a], a.id in ^ids) + ids + |> all_by_ids_query() + |> with_preloaded_object() + |> Repo.all() + end + + def all_by_ap_ids_query(query \\ Activity, ap_ids) do + from( + a in query, + where: fragment("(?)->>'id' = ANY(?)", a.data, ^ap_ids) + ) + end + + def all_by_ap_ids_with_object(ap_ids) do + ap_ids + |> all_by_ap_ids_query() |> with_preloaded_object() |> Repo.all() end @@ -330,4 +354,6 @@ def direct_conversation_id(activity, for_user) do _ -> nil end end + + def flags_activities_query(query \\ Activity), do: Queries.by_type(query, "Flag") end diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 5ea36fea3..2bca398b7 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -713,6 +713,11 @@ def get_all_by_ids(ids) do |> Repo.all() end + def get_all_by_ap_ids(ap_ids) do + from(u in __MODULE__, where: u.ap_id in ^ap_ids) + |> Repo.all() + end + # This is mostly an SPC migration fix. This guesses the user nickname by taking the last part # of the ap_id and the domain and tries to get that user def get_by_guessed_nickname(ap_id) do diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 5c436941a..0c2d4d6eb 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -236,7 +236,7 @@ def create(%{to: to, actor: actor, context: context, object: object} = params, f # only accept false as false value local = !(params[:local] == false) published = params[:published] - quick_insert? = Pleroma.Config.get([:env]) == :benchmark + quick_insert? = Config.get([:env]) == :benchmark with create_data <- make_create_data( @@ -525,6 +525,8 @@ def flag( additional = params[:additional] || %{} + quick_insert? = Config.get(:env) == :benchmark + additional = if forward do Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]}) @@ -534,6 +536,7 @@ def flag( with flag_data <- make_flag_data(params, additional), {:ok, activity} <- insert(flag_data, local), + {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity}, {:ok, stripped_activity} <- strip_report_status_data(activity), :ok <- maybe_federate(stripped_activity) do Enum.each(User.all_superusers(), fn superuser -> @@ -543,6 +546,9 @@ def flag( end) {:ok, activity} + else + {:quick_insert, true, activity} -> {:ok, activity} + e -> e end end @@ -1321,7 +1327,7 @@ defp normalize_counter(_), do: 0 defp maybe_update_follow_information(data) do with {:enabled, true} <- - {:enabled, Pleroma.Config.get([:instance, :external_user_synchronization])}, + {:enabled, Config.get([:instance, :external_user_synchronization])}, {:ok, info} <- fetch_follow_information_for_user(data) do info = Map.merge(data[:info] || %{}, info) Map.put(data, :info, info) diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex index 10ce5eee8..750d15ac6 100644 --- a/lib/pleroma/web/activity_pub/utils.ex +++ b/lib/pleroma/web/activity_pub/utils.ex @@ -907,12 +907,124 @@ def get_reports_grouped_by_status(activity_ids) do } end - @spec get_reported_activities() :: [ + @spec get_grouped_reports() :: [ %{ - required(:activity) => String.t(), - required(:date) => String.t() + status: Activity.t(), + account: User.t(), + actors: [User.t()], + reports: [Activity.t()] } ] + def get_grouped_reports do + reports = Activity.flags_activities_query() |> Repo.all() + + flags = + Enum.map(reports, fn %{ + id: id, + actor: actor, + data: %{"object" => [account | statuses], "published" => date} + } -> + flag = %{ + report_id: id, + actor: actor, + account: account, + date: date + } + + Enum.map(statuses, fn + status when is_map(status) -> + Map.put(flag, :id, status["id"]) + + activity_id when is_binary(activity_id) -> + Map.put(flag, :id, activity_id) + end) + end) + + ids = %{accounts: [], actors: [], reports: []} + + {ids, groups} = + flags + |> List.flatten() + |> Enum.reduce({ids, %{}}, fn status, {ids, acc} -> + acc = + Map.update( + acc, + status.id, + %{ + account: status.account, + actors: [status.actor], + reports: [status.report_id], + date: status.date + }, + &update_reported_group(&1, status) + ) + + ids = + ids + |> Map.put(:accounts, [status.account | ids.accounts]) + |> Map.put(:actors, [status.actor | ids.actors]) + |> Map.put(:reports, [status.report_id | ids.reports]) + + {ids, acc} + end) + + loaded_activities = + groups + |> Map.keys() + |> Activity.all_by_ap_ids_with_object() + |> Enum.reduce(%{}, fn activity, acc -> + Map.put(acc, activity.data["id"], activity) + end) + + loaded_users = + (ids.accounts ++ ids.actors) + |> Enum.uniq() + |> User.get_all_by_ap_ids() + |> Enum.reduce(%{}, fn user, acc -> Map.put(acc, user.ap_id, user) end) + + loaded_reports = + reports + |> Enum.reduce(%{}, fn report, acc -> Map.put(acc, report.id, report) end) + + Enum.map(groups, fn {activity_id, group} -> + updated_actors = + group.actors + |> Enum.uniq() + |> Enum.map(&Map.get(loaded_users, &1)) + + updated_reports = + group.reports + |> Enum.uniq() + |> Enum.map(&Map.get(loaded_reports, &1)) + + group + |> Map.put( + :status, + Map.get(loaded_activities, activity_id, %{"id" => activity_id, "deleted" => true}) + ) + |> Map.put( + :account, + Map.get(loaded_users, group.account, %{"id" => group.account, "deleted" => true}) + ) + |> Map.put(:actors, updated_actors) + |> Map.put(:reports, updated_reports) + end) + end + + defp update_reported_group(group, status) do + if NaiveDateTime.compare( + NaiveDateTime.from_iso8601!(status.date), + NaiveDateTime.from_iso8601!(group.date) + ) == :gt do + Map.put(group, :date, status.date) + else + group + end + |> Map.put(:actors, [status.actor | group.actors]) + |> Map.put(:reports, [status.report_id | group.reports]) + end + + @spec get_reported_activities() :: [String.t()] def get_reported_activities do reported_activities_query = from(a in Activity, diff --git a/lib/pleroma/web/activity_pub/views/user_view.ex b/lib/pleroma/web/activity_pub/views/user_view.ex index 350c4391d..84b121ec6 100644 --- a/lib/pleroma/web/activity_pub/views/user_view.ex +++ b/lib/pleroma/web/activity_pub/views/user_view.ex @@ -61,6 +61,10 @@ def render("service.json", %{user: user}) do |> Map.merge(Utils.make_json_ld_header()) end + def render("users.json", %{users: users}) do + render_many(users, Pleroma.Web.ActivityPub.UserView, "user.json") + end + # the instance itself is not a Person, but instead an Application def render("user.json", %{user: %User{nickname: nil} = user}), do: render("service.json", %{user: user}) diff --git a/lib/pleroma/web/admin_api/admin_api_controller.ex b/lib/pleroma/web/admin_api/admin_api_controller.ex index c95cd182d..245d820fe 100644 --- a/lib/pleroma/web/admin_api/admin_api_controller.ex +++ b/lib/pleroma/web/admin_api/admin_api_controller.ex @@ -670,6 +670,12 @@ def list_grouped_reports(conn, _params) do |> render("index_grouped.json", Utils.get_reports_grouped_by_status(statuses)) end + def list_grouped_reports_new(conn, _params) do + conn + |> put_view(ReportView) + |> render("index_grouped_new.json", %{groups: Utils.get_grouped_reports()}) + end + def report_show(conn, %{"id" => id}) do with %Activity{} = report <- Activity.get_by_id(id) do conn diff --git a/lib/pleroma/web/admin_api/views/report_view.ex b/lib/pleroma/web/admin_api/views/report_view.ex index 4880d2992..d27f1a21b 100644 --- a/lib/pleroma/web/admin_api/views/report_view.ex +++ b/lib/pleroma/web/admin_api/views/report_view.ex @@ -44,6 +44,42 @@ def render("show.json", %{report: report, user: user, account: account, statuses } end + def render("index_grouped_new.json", %{groups: groups}) do + updated = + Enum.map(groups, fn report -> + status = + case report.status do + %Activity{} = activity -> StatusView.render("show.json", %{activity: activity}) + _ -> report.status + end + |> Map.put_new("deleted", false) + + report + |> Map.replace!( + :actors, + Enum.map(report[:actors], &merge_account_views/1) + ) + |> Map.replace!( + :account, + merge_account_views(report[:account]) + ) + |> Map.replace!( + :status, + status + ) + |> Map.replace!( + :reports, + report[:reports] + |> Enum.map(&Report.extract_report_info(&1)) + |> Enum.map(&render(__MODULE__, "show.json", &1)) + ) + end) + + %{ + reports: updated + } + end + def render("index_grouped.json", %{groups: groups}) do reports = Enum.map(groups, fn group -> diff --git a/lib/pleroma/web/router.ex b/lib/pleroma/web/router.ex index 897215698..5b5facda8 100644 --- a/lib/pleroma/web/router.ex +++ b/lib/pleroma/web/router.ex @@ -184,7 +184,8 @@ defmodule Pleroma.Web.Router do patch("/users/resend_confirmation_email", AdminAPIController, :resend_confirmation_email) get("/reports", AdminAPIController, :list_reports) - get("/grouped_reports", AdminAPIController, :list_grouped_reports) + get("/grouped_reports", AdminAPIController, :list_grouped_reports_new) + get("/grouped_reports_new", AdminAPIController, :list_grouped_reports_new) get("/reports/:id", AdminAPIController, :report_show) patch("/reports", AdminAPIController, :reports_update) post("/reports/:id/notes", AdminAPIController, :report_notes_create) diff --git a/test/web/admin_api/admin_api_controller_test.exs b/test/web/admin_api/admin_api_controller_test.exs index 5fbdf96f6..b7a5f54dd 100644 --- a/test/web/admin_api/admin_api_controller_test.exs +++ b/test/web/admin_api/admin_api_controller_test.exs @@ -1640,6 +1640,95 @@ test "returns 403 when requested by anonymous" do } end + test "returns reports grouped by status new", %{ + conn: conn, + first_status: first_status, + second_status: second_status, + third_status: third_status, + first_status_reports: first_status_reports, + second_status_reports: second_status_reports, + third_status_reports: third_status_reports, + target_user: target_user, + reporter: reporter + } do + response = + conn + |> get("/api/pleroma/admin/grouped_reports_new") + |> json_response(:ok) + + assert length(response["reports"]) == 3 + + first_group = Enum.find(response["reports"], &(&1["status"]["id"] == first_status.id)) + + second_group = Enum.find(response["reports"], &(&1["status"]["id"] == second_status.id)) + + third_group = Enum.find(response["reports"], &(&1["status"]["id"] == third_status.id)) + + assert length(first_group["reports"]) == 3 + assert length(second_group["reports"]) == 2 + assert length(third_group["reports"]) == 1 + + assert first_group["date"] == + Enum.max_by(first_status_reports, fn act -> + NaiveDateTime.from_iso8601!(act.data["published"]) + end).data["published"] + + assert first_group["status"] == + Map.put( + stringify_keys(StatusView.render("show.json", %{activity: first_status})), + "deleted", + false + ) + + assert(first_group["account"]["id"] == target_user.id) + + assert length(first_group["actors"]) == 1 + assert hd(first_group["actors"])["id"] == reporter.id + + assert Enum.map(first_group["reports"], & &1["id"]) -- + Enum.map(first_status_reports, & &1.id) == [] + + assert second_group["date"] == + Enum.max_by(second_status_reports, fn act -> + NaiveDateTime.from_iso8601!(act.data["published"]) + end).data["published"] + + assert second_group["status"] == + Map.put( + stringify_keys(StatusView.render("show.json", %{activity: second_status})), + "deleted", + false + ) + + assert second_group["account"]["id"] == target_user.id + + assert length(second_group["actors"]) == 1 + assert hd(second_group["actors"])["id"] == reporter.id + + assert Enum.map(second_group["reports"], & &1["id"]) -- + Enum.map(second_status_reports, & &1.id) == [] + + assert third_group["date"] == + Enum.max_by(third_status_reports, fn act -> + NaiveDateTime.from_iso8601!(act.data["published"]) + end).data["published"] + + assert third_group["status"] == + Map.put( + stringify_keys(StatusView.render("show.json", %{activity: third_status})), + "deleted", + false + ) + + assert third_group["account"]["id"] == target_user.id + + assert length(third_group["actors"]) == 1 + assert hd(third_group["actors"])["id"] == reporter.id + + assert Enum.map(third_group["reports"], & &1["id"]) -- + Enum.map(third_status_reports, & &1.id) == [] + end + test "returns reports grouped by status", %{ conn: conn, first_status: first_status, -- cgit v1.2.3