summaryrefslogtreecommitdiff
path: root/benchmarks/load_testing/activities.ex
diff options
context:
space:
mode:
Diffstat (limited to 'benchmarks/load_testing/activities.ex')
-rw-r--r--benchmarks/load_testing/activities.ex595
1 files changed, 595 insertions, 0 deletions
diff --git a/benchmarks/load_testing/activities.ex b/benchmarks/load_testing/activities.ex
new file mode 100644
index 000000000..f5c7bfce8
--- /dev/null
+++ b/benchmarks/load_testing/activities.ex
@@ -0,0 +1,595 @@
+defmodule Pleroma.LoadTesting.Activities do
+ @moduledoc """
+ Module for generating different activities.
+ """
+ import Ecto.Query
+ import Pleroma.LoadTesting.Helper, only: [to_sec: 1]
+
+ alias Ecto.UUID
+ alias Pleroma.Constants
+ alias Pleroma.LoadTesting.Users
+ alias Pleroma.Repo
+ alias Pleroma.Web.CommonAPI
+
+ require Constants
+
+ @defaults [
+ iterations: 170,
+ friends_used: 20,
+ non_friends_used: 20
+ ]
+
+ @max_concurrency 10
+
+ @visibility ~w(public private direct unlisted)
+ @types [
+ :simple,
+ :simple_filtered,
+ :emoji,
+ :mentions,
+ :hell_thread,
+ :attachment,
+ :tag,
+ :like,
+ :reblog,
+ :simple_thread
+ ]
+ @groups [:friends_local, :friends_remote, :non_friends_local, :non_friends_local]
+ @remote_groups [:friends_remote, :non_friends_remote]
+ @friends_groups [:friends_local, :friends_remote]
+ @non_friends_groups [:non_friends_local, :non_friends_remote]
+
+ @spec generate(User.t(), keyword()) :: :ok
+ def generate(user, opts \\ []) do
+ {:ok, _} =
+ Agent.start_link(fn -> %{} end,
+ name: :benchmark_state
+ )
+
+ opts = Keyword.merge(@defaults, opts)
+
+ users = Users.prepare_users(user, opts)
+
+ {:ok, _} = Agent.start_link(fn -> users[:non_friends_remote] end, name: :non_friends_remote)
+
+ task_data =
+ for visibility <- @visibility,
+ type <- @types,
+ group <- [:user | @groups],
+ do: {visibility, type, group}
+
+ IO.puts("Starting generating #{opts[:iterations]} iterations of activities...")
+
+ public_long_thread = fn ->
+ generate_long_thread("public", users, opts)
+ end
+
+ private_long_thread = fn ->
+ generate_long_thread("private", users, opts)
+ end
+
+ iterations = opts[:iterations]
+
+ {time, _} =
+ :timer.tc(fn ->
+ Enum.each(
+ 1..iterations,
+ fn
+ i when i == iterations - 2 ->
+ spawn(public_long_thread)
+ spawn(private_long_thread)
+ generate_activities(users, Enum.shuffle(task_data), opts)
+
+ _ ->
+ generate_activities(users, Enum.shuffle(task_data), opts)
+ end
+ )
+ end)
+
+ IO.puts("Generating iterations of activities took #{to_sec(time)} sec.\n")
+ :ok
+ end
+
+ def generate_power_intervals(opts \\ []) do
+ count = Keyword.get(opts, :count, 20)
+ power = Keyword.get(opts, :power, 2)
+ IO.puts("Generating #{count} intervals for a power #{power} series...")
+ counts = Enum.map(1..count, fn n -> :math.pow(n, power) end)
+ sum = Enum.sum(counts)
+
+ densities =
+ Enum.map(counts, fn c ->
+ c / sum
+ end)
+
+ densities
+ |> Enum.reduce(0, fn density, acc ->
+ if acc == 0 do
+ [{0, density}]
+ else
+ [{_, lower} | _] = acc
+ [{lower, lower + density} | acc]
+ end
+ end)
+ |> Enum.reverse()
+ end
+
+ def generate_tagged_activities(opts \\ []) do
+ tag_count = Keyword.get(opts, :tag_count, 20)
+ users = Keyword.get(opts, :users, Repo.all(Pleroma.User))
+ activity_count = Keyword.get(opts, :count, 200_000)
+
+ intervals = generate_power_intervals(count: tag_count)
+
+ IO.puts(
+ "Generating #{activity_count} activities using #{tag_count} different tags of format `tag_n`, starting at tag_0"
+ )
+
+ Enum.each(1..activity_count, fn _ ->
+ random = :rand.uniform()
+ i = Enum.find_index(intervals, fn {lower, upper} -> lower <= random && upper > random end)
+ CommonAPI.post(Enum.random(users), %{status: "a post with the tag #tag_#{i}"})
+ end)
+ end
+
+ defp generate_long_thread(visibility, users, _opts) do
+ group =
+ if visibility == "public",
+ do: :friends_local,
+ else: :user
+
+ tasks = get_reply_tasks(visibility, group) |> Stream.cycle() |> Enum.take(50)
+
+ {:ok, activity} =
+ CommonAPI.post(users[:user], %{
+ status: "Start of #{visibility} long thread",
+ visibility: visibility
+ })
+
+ Agent.update(:benchmark_state, fn state ->
+ key =
+ if visibility == "public",
+ do: :public_thread,
+ else: :private_thread
+
+ Map.put(state, key, activity)
+ end)
+
+ acc = {activity.id, ["@" <> users[:user].nickname, "reply to long thread"]}
+ insert_replies_for_long_thread(tasks, visibility, users, acc)
+ IO.puts("Generating #{visibility} long thread ended\n")
+ end
+
+ defp insert_replies_for_long_thread(tasks, visibility, users, acc) do
+ Enum.reduce(tasks, acc, fn
+ :user, {id, data} ->
+ user = users[:user]
+ insert_reply(user, List.delete(data, "@" <> user.nickname), id, visibility)
+
+ group, {id, data} ->
+ replier = Enum.random(users[group])
+ insert_reply(replier, List.delete(data, "@" <> replier.nickname), id, visibility)
+ end)
+ end
+
+ defp generate_activities(users, task_data, opts) do
+ Task.async_stream(
+ task_data,
+ fn {visibility, type, group} ->
+ insert_activity(type, visibility, group, users, opts)
+ end,
+ max_concurrency: @max_concurrency,
+ timeout: 30_000
+ )
+ |> Stream.run()
+ end
+
+ defp insert_local_activity(visibility, group, users, status) do
+ {:ok, _} =
+ group
+ |> get_actor(users)
+ |> CommonAPI.post(%{status: status, visibility: visibility})
+ end
+
+ defp insert_remote_activity(visibility, group, users, status) do
+ actor = get_actor(group, users)
+ {act_data, obj_data} = prepare_activity_data(actor, visibility, users[:user])
+ {activity_data, object_data} = other_data(actor, status)
+
+ activity_data
+ |> Map.merge(act_data)
+ |> Map.put("object", Map.merge(object_data, obj_data))
+ |> Pleroma.Web.ActivityPub.ActivityPub.insert(false)
+ end
+
+ defp user_mentions(users) do
+ user_mentions =
+ Enum.reduce(
+ @groups,
+ [],
+ fn group, acc ->
+ acc ++ get_random_mentions(users[group], Enum.random(0..2))
+ end
+ )
+
+ if Enum.random([true, false]),
+ do: ["@" <> users[:user].nickname | user_mentions],
+ else: user_mentions
+ end
+
+ defp hell_thread_mentions(users) do
+ with {:ok, nil} <- Cachex.get(:user_cache, "hell_thread_mentions") do
+ cached =
+ @groups
+ |> Enum.reduce([users[:user]], fn group, acc ->
+ acc ++ Enum.take(users[group], 5)
+ end)
+ |> Enum.map(&"@#{&1.nickname}")
+ |> Enum.join(", ")
+
+ Cachex.put(:user_cache, "hell_thread_mentions", cached)
+ cached
+ else
+ {:ok, cached} -> cached
+ end
+ end
+
+ defp insert_activity(:simple, visibility, group, users, _opts)
+ when group in @remote_groups do
+ insert_remote_activity(visibility, group, users, "Remote status")
+ end
+
+ defp insert_activity(:simple, visibility, group, users, _opts) do
+ insert_local_activity(visibility, group, users, "Simple status")
+ end
+
+ defp insert_activity(:simple_filtered, visibility, group, users, _opts)
+ when group in @remote_groups do
+ insert_remote_activity(visibility, group, users, "Remote status which must be filtered")
+ end
+
+ defp insert_activity(:simple_filtered, visibility, group, users, _opts) do
+ insert_local_activity(visibility, group, users, "Simple status which must be filtered")
+ end
+
+ defp insert_activity(:emoji, visibility, group, users, _opts)
+ when group in @remote_groups do
+ insert_remote_activity(visibility, group, users, "Remote status with emoji :firefox:")
+ end
+
+ defp insert_activity(:emoji, visibility, group, users, _opts) do
+ insert_local_activity(visibility, group, users, "Simple status with emoji :firefox:")
+ end
+
+ defp insert_activity(:mentions, visibility, group, users, _opts)
+ when group in @remote_groups do
+ mentions = user_mentions(users)
+
+ status = Enum.join(mentions, ", ") <> " remote status with mentions"
+
+ insert_remote_activity(visibility, group, users, status)
+ end
+
+ defp insert_activity(:mentions, visibility, group, users, _opts) do
+ mentions = user_mentions(users)
+
+ status = Enum.join(mentions, ", ") <> " simple status with mentions"
+ insert_remote_activity(visibility, group, users, status)
+ end
+
+ defp insert_activity(:hell_thread, visibility, group, users, _)
+ when group in @remote_groups do
+ mentions = hell_thread_mentions(users)
+ insert_remote_activity(visibility, group, users, mentions <> " remote hell thread status")
+ end
+
+ defp insert_activity(:hell_thread, visibility, group, users, _opts) do
+ mentions = hell_thread_mentions(users)
+
+ insert_local_activity(visibility, group, users, mentions <> " hell thread status")
+ end
+
+ defp insert_activity(:attachment, visibility, group, users, _opts) do
+ actor = get_actor(group, users)
+
+ obj_data = %{
+ "actor" => actor.ap_id,
+ "name" => "4467-11.jpg",
+ "type" => "Document",
+ "url" => [
+ %{
+ "href" =>
+ "#{Pleroma.Web.base_url()}/media/b1b873552422a07bf53af01f3c231c841db4dfc42c35efde681abaf0f2a4eab7.jpg",
+ "mediaType" => "image/jpeg",
+ "type" => "Link"
+ }
+ ]
+ }
+
+ object = Repo.insert!(%Pleroma.Object{data: obj_data})
+
+ {:ok, _activity} =
+ CommonAPI.post(actor, %{
+ status: "Post with attachment",
+ visibility: visibility,
+ media_ids: [object.id]
+ })
+ end
+
+ defp insert_activity(:tag, visibility, group, users, _opts) do
+ insert_local_activity(visibility, group, users, "Status with #tag")
+ end
+
+ defp insert_activity(:like, visibility, group, users, opts) do
+ actor = get_actor(group, users)
+
+ with activity_id when not is_nil(activity_id) <- get_random_create_activity_id(),
+ {:ok, _activity} <- CommonAPI.favorite(actor, activity_id) do
+ :ok
+ else
+ {:error, _} ->
+ insert_activity(:like, visibility, group, users, opts)
+
+ nil ->
+ Process.sleep(15)
+ insert_activity(:like, visibility, group, users, opts)
+ end
+ end
+
+ defp insert_activity(:reblog, visibility, group, users, opts) do
+ actor = get_actor(group, users)
+
+ with activity_id when not is_nil(activity_id) <- get_random_create_activity_id(),
+ {:ok, _activity} <- CommonAPI.repeat(activity_id, actor) do
+ :ok
+ else
+ {:error, _} ->
+ insert_activity(:reblog, visibility, group, users, opts)
+
+ nil ->
+ Process.sleep(15)
+ insert_activity(:reblog, visibility, group, users, opts)
+ end
+ end
+
+ defp insert_activity(:simple_thread, "direct", group, users, _opts) do
+ actor = get_actor(group, users)
+ tasks = get_reply_tasks("direct", group)
+
+ list =
+ case group do
+ :user ->
+ group = Enum.random(@friends_groups)
+ Enum.take(users[group], 3)
+
+ _ ->
+ Enum.take(users[group], 3)
+ end
+
+ data = Enum.map(list, &("@" <> &1.nickname))
+
+ {:ok, activity} =
+ CommonAPI.post(actor, %{
+ status: Enum.join(data, ", ") <> "simple status",
+ visibility: "direct"
+ })
+
+ acc = {activity.id, ["@" <> users[:user].nickname | data] ++ ["reply to status"]}
+ insert_direct_replies(tasks, users[:user], list, acc)
+ end
+
+ defp insert_activity(:simple_thread, visibility, group, users, _opts) do
+ actor = get_actor(group, users)
+ tasks = get_reply_tasks(visibility, group)
+
+ {:ok, activity} =
+ CommonAPI.post(users[:user], %{status: "Simple status", visibility: visibility})
+
+ acc = {activity.id, ["@" <> actor.nickname, "reply to status"]}
+ insert_replies(tasks, visibility, users, acc)
+ end
+
+ defp get_actor(:user, %{user: user}), do: user
+ defp get_actor(group, users), do: Enum.random(users[group])
+
+ defp other_data(actor, content) do
+ %{host: host} = URI.parse(actor.ap_id)
+ datetime = DateTime.utc_now()
+ context_id = "https://#{host}/contexts/#{UUID.generate()}"
+ activity_id = "https://#{host}/activities/#{UUID.generate()}"
+ object_id = "https://#{host}/objects/#{UUID.generate()}"
+
+ activity_data = %{
+ "actor" => actor.ap_id,
+ "context" => context_id,
+ "id" => activity_id,
+ "published" => datetime,
+ "type" => "Create",
+ "directMessage" => false
+ }
+
+ object_data = %{
+ "actor" => actor.ap_id,
+ "attachment" => [],
+ "attributedTo" => actor.ap_id,
+ "bcc" => [],
+ "bto" => [],
+ "content" => content,
+ "context" => context_id,
+ "conversation" => context_id,
+ "emoji" => %{},
+ "id" => object_id,
+ "published" => datetime,
+ "sensitive" => false,
+ "summary" => "",
+ "tag" => [],
+ "to" => ["https://www.w3.org/ns/activitystreams#Public"],
+ "type" => "Note"
+ }
+
+ {activity_data, object_data}
+ end
+
+ defp prepare_activity_data(actor, "public", _mention) do
+ obj_data = %{
+ "cc" => [actor.follower_address],
+ "to" => [Constants.as_public()]
+ }
+
+ act_data = %{
+ "cc" => [actor.follower_address],
+ "to" => [Constants.as_public()]
+ }
+
+ {act_data, obj_data}
+ end
+
+ defp prepare_activity_data(actor, "private", _mention) do
+ obj_data = %{
+ "cc" => [],
+ "to" => [actor.follower_address]
+ }
+
+ act_data = %{
+ "cc" => [],
+ "to" => [actor.follower_address]
+ }
+
+ {act_data, obj_data}
+ end
+
+ defp prepare_activity_data(actor, "unlisted", _mention) do
+ obj_data = %{
+ "cc" => [Constants.as_public()],
+ "to" => [actor.follower_address]
+ }
+
+ act_data = %{
+ "cc" => [Constants.as_public()],
+ "to" => [actor.follower_address]
+ }
+
+ {act_data, obj_data}
+ end
+
+ defp prepare_activity_data(_actor, "direct", mention) do
+ %{host: mentioned_host} = URI.parse(mention.ap_id)
+
+ obj_data = %{
+ "cc" => [],
+ "content" =>
+ "<span class=\"h-card\"><a class=\"u-url mention\" href=\"#{mention.ap_id}\" rel=\"ugc\">@<span>#{
+ mention.nickname
+ }</span></a></span> direct message",
+ "tag" => [
+ %{
+ "href" => mention.ap_id,
+ "name" => "@#{mention.nickname}@#{mentioned_host}",
+ "type" => "Mention"
+ }
+ ],
+ "to" => [mention.ap_id]
+ }
+
+ act_data = %{
+ "cc" => [],
+ "directMessage" => true,
+ "to" => [mention.ap_id]
+ }
+
+ {act_data, obj_data}
+ end
+
+ defp get_reply_tasks("public", :user) do
+ [:friends_local, :friends_remote, :non_friends_local, :non_friends_remote, :user]
+ end
+
+ defp get_reply_tasks("public", group) when group in @friends_groups do
+ [:non_friends_local, :non_friends_remote, :user, :friends_local, :friends_remote]
+ end
+
+ defp get_reply_tasks("public", group) when group in @non_friends_groups do
+ [:user, :friends_local, :friends_remote, :non_friends_local, :non_friends_remote]
+ end
+
+ defp get_reply_tasks(visibility, :user) when visibility in ["unlisted", "private"] do
+ [:friends_local, :friends_remote, :user, :friends_local, :friends_remote]
+ end
+
+ defp get_reply_tasks(visibility, group)
+ when visibility in ["unlisted", "private"] and group in @friends_groups do
+ [:user, :friends_remote, :friends_local, :user]
+ end
+
+ defp get_reply_tasks(visibility, group)
+ when visibility in ["unlisted", "private"] and
+ group in @non_friends_groups,
+ do: []
+
+ defp get_reply_tasks("direct", :user), do: [:friends_local, :user, :friends_remote]
+
+ defp get_reply_tasks("direct", group) when group in @friends_groups,
+ do: [:user, group, :user]
+
+ defp get_reply_tasks("direct", group) when group in @non_friends_groups do
+ [:user, :non_friends_remote, :user, :non_friends_local]
+ end
+
+ defp insert_replies(tasks, visibility, users, acc) do
+ Enum.reduce(tasks, acc, fn
+ :user, {id, data} ->
+ insert_reply(users[:user], data, id, visibility)
+
+ group, {id, data} ->
+ replier = Enum.random(users[group])
+ insert_reply(replier, data, id, visibility)
+ end)
+ end
+
+ defp insert_direct_replies(tasks, user, list, acc) do
+ Enum.reduce(tasks, acc, fn
+ :user, {id, data} ->
+ {reply_id, _} = insert_reply(user, List.delete(data, "@" <> user.nickname), id, "direct")
+ {reply_id, data}
+
+ _, {id, data} ->
+ actor = Enum.random(list)
+
+ {reply_id, _} =
+ insert_reply(actor, List.delete(data, "@" <> actor.nickname), id, "direct")
+
+ {reply_id, data}
+ end)
+ end
+
+ defp insert_reply(actor, data, activity_id, visibility) do
+ {:ok, reply} =
+ CommonAPI.post(actor, %{
+ status: Enum.join(data, ", "),
+ visibility: visibility,
+ in_reply_to_status_id: activity_id
+ })
+
+ {reply.id, ["@" <> actor.nickname | data]}
+ end
+
+ defp get_random_mentions(_users, count) when count == 0, do: []
+
+ defp get_random_mentions(users, count) do
+ users
+ |> Enum.shuffle()
+ |> Enum.take(count)
+ |> Enum.map(&"@#{&1.nickname}")
+ end
+
+ defp get_random_create_activity_id do
+ Repo.one(
+ from(a in Pleroma.Activity,
+ where: fragment("(?)->>'type' = ?", a.data, ^"Create"),
+ order_by: fragment("RANDOM()"),
+ limit: 1,
+ select: a.id
+ )
+ )
+ end
+end