path: root/lib/pleroma/web/fed_sockets/fed_registry.ex
diff options
authorrinpatch <>2020-11-12 12:34:48 +0000
committerrinpatch <>2020-11-12 12:34:48 +0000
commit1172844ed18d94d84724dc6f11c6e9f72e0ba6ec (patch)
tree7d48a259e08856ab6db0eba255f20c0c19410463 /lib/pleroma/web/fed_sockets/fed_registry.ex
parenta0f5e8b27edbe2224d9c2c3997ad5b8ea484244b (diff)
parentb4c6b262d6dc12362f0014a864e8aed6c727c39c (diff)
Merge branch 'release/2.2.0' into 'stable'v2.2.0
Release/2.2.0 See merge request pleroma/secteam/pleroma!19
Diffstat (limited to 'lib/pleroma/web/fed_sockets/fed_registry.ex')
1 files changed, 185 insertions, 0 deletions
diff --git a/lib/pleroma/web/fed_sockets/fed_registry.ex b/lib/pleroma/web/fed_sockets/fed_registry.ex
new file mode 100644
index 000000000..e00ea69c0
--- /dev/null
+++ b/lib/pleroma/web/fed_sockets/fed_registry.ex
@@ -0,0 +1,185 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2020 Pleroma Authors <>
+# SPDX-License-Identifier: AGPL-3.0-only
+defmodule Pleroma.Web.FedSockets.FedRegistry do
+ @moduledoc """
+ The FedRegistry stores the active FedSockets for quick retrieval.
+ The storage and retrieval portion of the FedRegistry is done in process through
+ elixir's `Registry` module for speed and its ability to monitor for terminated processes.
+ Dropped connections will be caught by `Registry` and deleted. Since the next
+ message will initiate a new connection there is no reason to try and reconnect at that point.
+ Normally outside modules should have no need to call or use the FedRegistry themselves.
+ """
+ alias Pleroma.Web.FedSockets.FedSocket
+ alias Pleroma.Web.FedSockets.SocketInfo
+ require Logger
+ @default_rejection_duration 15 * 60 * 1000
+ @rejections :fed_socket_rejections
+ @doc """
+ Retrieves a FedSocket from the Registry given it's origin.
+ The origin is expected to be a string identifying the endpoint "" or ""
+ Will return:
+ * {:ok, fed_socket} for working FedSockets
+ * {:error, :rejected} for origins that have been tried and refused within the rejection duration interval
+ * {:error, some_reason} usually :missing for unknown origins
+ """
+ def get_fed_socket(origin) do
+ case get_registry_data(origin) do
+ {:error, reason} ->
+ {:error, reason}
+ {:ok, %{state: :connected} = socket_info} ->
+ {:ok, socket_info}
+ end
+ end
+ @doc """
+ Adds a connected FedSocket to the Registry.
+ Always returns {:ok, fed_socket}
+ """
+ def add_fed_socket(origin, pid \\ nil) do
+ origin
+ |>
+ |> SocketInfo.connect()
+ |> add_socket_info
+ end
+ defp add_socket_info(%{origin: origin, state: :connected} = socket_info) do
+ case Registry.register(FedSockets.Registry, origin, socket_info) do
+ {:ok, _owner} ->
+ clear_prior_rejection(origin)
+ Logger.debug("fedsocket added: #{inspect(origin)}")
+ {:ok, socket_info}
+ {:error, {:already_registered, _pid}} ->
+ FedSocket.close(socket_info)
+ existing_socket_info = Registry.lookup(FedSockets.Registry, origin)
+ {:ok, existing_socket_info}
+ _ ->
+ {:error, :error_adding_socket}
+ end
+ end
+ @doc """
+ Mark this origin as having rejected a connection attempt.
+ This will keep it from getting additional connection attempts
+ for a period of time specified in the config.
+ Always returns {:ok, new_reg_data}
+ """
+ def set_host_rejected(uri) do
+ new_reg_data =
+ uri
+ |> SocketInfo.origin()
+ |> get_or_create_registry_data()
+ |> set_to_rejected()
+ |> save_registry_data()
+ {:ok, new_reg_data}
+ end
+ @doc """
+ Retrieves the FedRegistryData from the Registry given it's origin.
+ The origin is expected to be a string identifying the endpoint "" or ""
+ Will return:
+ * {:ok, fed_registry_data} for known origins
+ * {:error, :missing} for uniknown origins
+ * {:error, :cache_error} indicating some low level runtime issues
+ """
+ def get_registry_data(origin) do
+ case Registry.lookup(FedSockets.Registry, origin) do
+ [] ->
+ if is_rejected?(origin) do
+ Logger.debug("previously rejected fedsocket requested")
+ {:error, :rejected}
+ else
+ {:error, :missing}
+ end
+ [{_pid, %{state: :connected} = socket_info}] ->
+ {:ok, socket_info}
+ _ ->
+ {:error, :cache_error}
+ end
+ end
+ @doc """
+ Retrieves a map of all sockets from the Registry. The keys are the origins and the values are the corresponding SocketInfo
+ """
+ def list_all do
+ (list_all_connected() ++ list_all_rejected())
+ |> Enum.into(%{})
+ end
+ defp list_all_connected do
+ FedSockets.Registry
+ |>[{{:"$1", :_, :"$3"}, [], [{{:"$1", :"$3"}}]}])
+ end
+ defp list_all_rejected do
+ {:ok, keys} = Cachex.keys(@rejections)
+ {:ok, registry_data} =
+ Cachex.execute(@rejections, fn worker ->
+, fn k -> {k, Cachex.get!(worker, k)} end)
+ end)
+ registry_data
+ end
+ defp clear_prior_rejection(origin),
+ do: Cachex.del(@rejections, origin)
+ defp is_rejected?(origin) do
+ case Cachex.get(@rejections, origin) do
+ {:ok, nil} ->
+ false
+ {:ok, _} ->
+ true
+ end
+ end
+ defp get_or_create_registry_data(origin) do
+ case get_registry_data(origin) do
+ {:error, :missing} ->
+ %SocketInfo{origin: origin}
+ {:ok, socket_info} ->
+ socket_info
+ end
+ end
+ defp save_registry_data(%SocketInfo{origin: origin, state: :connected} = socket_info) do
+ {:ok, true} = Registry.update_value(FedSockets.Registry, origin, fn _ -> socket_info end)
+ socket_info
+ end
+ defp save_registry_data(%SocketInfo{origin: origin, state: :rejected} = socket_info) do
+ rejection_expiration =
+ Pleroma.Config.get([:fed_sockets, :rejection_duration], @default_rejection_duration)
+ {:ok, true} = Cachex.put(@rejections, origin, socket_info, ttl: rejection_expiration)
+ socket_info
+ end
+ defp set_to_rejected(%SocketInfo{} = socket_info),
+ do: %SocketInfo{socket_info | state: :rejected}