From 608c964220c362c5f26df88a3a958cced7a2dc58 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Wed, 29 Dec 2021 18:33:53 +0000 Subject: Finch everywhere --- config/config.exs | 50 +------ config/test.exs | 4 - lib/mix/pleroma.ex | 38 ++--- lib/mix/tasks/pleroma/benchmark.ex | 36 ----- lib/pleroma/application.ex | 59 +------- lib/pleroma/config/deprecation_warnings.ex | 46 ------ lib/pleroma/config/transfer_task.ex | 5 +- lib/pleroma/emoji/pack.ex | 2 +- lib/pleroma/frontend.ex | 2 +- lib/pleroma/helpers/media_helper.ex | 4 +- lib/pleroma/http.ex | 34 +---- lib/pleroma/http/adapter_helper.ex | 14 +- lib/pleroma/http/ex_aws.ex | 2 - lib/pleroma/http/tzdata.ex | 4 - lib/pleroma/instances/instance.ex | 2 +- lib/pleroma/reverse_proxy.ex | 154 ++++++--------------- lib/pleroma/telemetry/logger.ex | 108 +++++++-------- lib/pleroma/uploaders/s3.ex | 23 +-- .../activity_pub/mrf/media_proxy_warming_policy.ex | 1 - .../web/media_proxy/media_proxy_controller.ex | 2 +- lib/pleroma/web/plugs/uploaded_media.ex | 3 +- lib/pleroma/web/rel_me.ex | 1 - lib/pleroma/web/rich_media/helpers.ex | 1 - mix.exs | 2 +- test/pleroma/config/deprecation_warnings_test.exs | 44 ------ test/pleroma/reverse_proxy_test.exs | 38 ----- test/test_helper.exs | 3 - 27 files changed, 123 insertions(+), 559 deletions(-) diff --git a/config/config.exs b/config/config.exs index 821166fd9..273027938 100644 --- a/config/config.exs +++ b/config/config.exs @@ -168,7 +168,7 @@ "application/ld+json" => ["activity+json"] } -config :tesla, adapter: Tesla.Adapter.Hackney +config :tesla, :adapter, {Tesla.Adapter.Finch, name: MyFinch} # Configures http settings, upstream proxy etc. config :pleroma, :http, @@ -435,8 +435,7 @@ # Note: max_read_duration defaults to Pleroma.ReverseProxy.max_read_duration_default/1 max_read_duration: 30_000, http: [ - follow_redirect: true, - pool: :media + follow_redirect: true ] ], whitelist: [] @@ -782,51 +781,6 @@ parameters: [gin_fuzzy_search_limit: "500"], prepare: :unnamed -config :pleroma, :connections_pool, - reclaim_multiplier: 0.1, - connection_acquisition_wait: 250, - connection_acquisition_retries: 5, - max_connections: 250, - max_idle_time: 30_000, - retry: 0, - connect_timeout: 5_000 - -config :pleroma, :pools, - federation: [ - size: 50, - max_waiting: 10, - recv_timeout: 10_000 - ], - media: [ - size: 50, - max_waiting: 20, - recv_timeout: 15_000 - ], - upload: [ - size: 25, - max_waiting: 5, - recv_timeout: 15_000 - ], - default: [ - size: 10, - max_waiting: 2, - recv_timeout: 5_000 - ] - -config :pleroma, :hackney_pools, - federation: [ - max_connections: 50, - timeout: 150_000 - ], - media: [ - max_connections: 50, - timeout: 150_000 - ], - upload: [ - max_connections: 25, - timeout: 300_000 - ] - config :pleroma, :majic_pool, size: 2 private_instance? = :if_instance_is_private diff --git a/config/test.exs b/config/test.exs index 78303eb1d..2234db9be 100644 --- a/config/test.exs +++ b/config/test.exs @@ -101,12 +101,8 @@ config :joken, default_signer: "yU8uHKq+yyAkZ11Hx//jcdacWc8yQ1bxAAGrplzB0Zwwjkp35v0RK9SO8WTPr6QZ" -config :pleroma, Pleroma.ReverseProxy.Client, Pleroma.ReverseProxy.ClientMock - config :pleroma, :modules, runtime_dir: "test/fixtures/modules" -config :pleroma, Pleroma.Gun, Pleroma.GunMock - config :pleroma, Pleroma.Emails.NewUsersDigestEmail, enabled: true config :pleroma, Pleroma.Web.Plugs.RemoteIp, enabled: false diff --git a/lib/mix/pleroma.ex b/lib/mix/pleroma.ex index 2976085ba..5cb006d98 100644 --- a/lib/mix/pleroma.ex +++ b/lib/mix/pleroma.ex @@ -28,16 +28,7 @@ def start_pleroma do Logger.remove_backend(:console) end - adapter = Application.get_env(:tesla, :adapter) - - apps = - if adapter == Tesla.Adapter.Gun do - [:gun | @apps] - else - [:hackney | @apps] - end - - Enum.each(apps, &Application.ensure_all_started/1) + Enum.each(@apps, &Application.ensure_all_started/1) oban_config = [ crontab: [], @@ -47,17 +38,15 @@ def start_pleroma do plugins: [] ] - children = - [ - Pleroma.Repo, - Pleroma.Emoji, - {Pleroma.Config.TransferTask, false}, - Pleroma.Web.Endpoint, - {Oban, oban_config}, - {Majic.Pool, - [name: Pleroma.MajicPool, pool_size: Pleroma.Config.get([:majic_pool, :size], 2)]} - ] ++ - http_children(adapter) + children = [ + Pleroma.Repo, + Pleroma.Emoji, + {Pleroma.Config.TransferTask, false}, + Pleroma.Web.Endpoint, + {Oban, oban_config}, + {Majic.Pool, + [name: Pleroma.MajicPool, pool_size: Pleroma.Config.get([:majic_pool, :size], 2)]} + ] cachex_children = Enum.map(@cachex_children, &Pleroma.Application.build_cachex(&1, [])) @@ -129,11 +118,4 @@ def mix_shell?, do: :erlang.function_exported(Mix, :shell, 0) def escape_sh_path(path) do ~S(') <> String.replace(path, ~S('), ~S(\')) <> ~S(') end - - defp http_children(Tesla.Adapter.Gun) do - Pleroma.Gun.ConnectionPool.children() ++ - [{Task, &Pleroma.HTTP.AdapterHelper.Gun.limiter_setup/0}] - end - - defp http_children(_), do: [] end diff --git a/lib/mix/tasks/pleroma/benchmark.ex b/lib/mix/tasks/pleroma/benchmark.ex index f32492169..7a4680812 100644 --- a/lib/mix/tasks/pleroma/benchmark.ex +++ b/lib/mix/tasks/pleroma/benchmark.ex @@ -74,40 +74,4 @@ def run(["render_timeline", nickname | _] = args) do inputs: inputs ) end - - def run(["adapters"]) do - start_pleroma() - - :ok = - Pleroma.Gun.Conn.open( - "https://httpbin.org/stream-bytes/1500", - :gun_connections - ) - - Process.sleep(1_500) - - Benchee.run( - %{ - "Without conn and without pool" => fn -> - {:ok, %Tesla.Env{}} = - Pleroma.HTTP.get("https://httpbin.org/stream-bytes/1500", [], - pool: :no_pool, - receive_conn: false - ) - end, - "Without conn and with pool" => fn -> - {:ok, %Tesla.Env{}} = - Pleroma.HTTP.get("https://httpbin.org/stream-bytes/1500", [], receive_conn: false) - end, - "With reused conn and without pool" => fn -> - {:ok, %Tesla.Env{}} = - Pleroma.HTTP.get("https://httpbin.org/stream-bytes/1500", [], pool: :no_pool) - end, - "With reused conn and with pool" => fn -> - {:ok, %Tesla.Env{}} = Pleroma.HTTP.get("https://httpbin.org/stream-bytes/1500") - end - }, - parallel: 10 - ) - end end diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 1c1db8c10..48ee29ebf 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -59,34 +59,8 @@ def start(_type, _args) do Pleroma.Docs.JSON.compile() limiters_setup() - adapter = Application.get_env(:tesla, :adapter) - - if match?({Tesla.Adapter.Finch, _}, adapter) do - Logger.info("Starting Finch") - Finch.start_link(name: MyFinch) - end - - if adapter == Tesla.Adapter.Gun do - if version = Pleroma.OTPVersion.version() do - [major, minor] = - version - |> String.split(".") - |> Enum.map(&String.to_integer/1) - |> Enum.take(2) - - if (major == 22 and minor < 2) or major < 22 do - raise " - !!!OTP VERSION WARNING!!! - You are using gun adapter with OTP version #{version}, which doesn't support correct handling of unordered certificates chains. Please update your Erlang/OTP to at least 22.2. - " - end - else - raise " - !!!OTP VERSION WARNING!!! - To support correct handling of unordered certificates chains - OTP version must be > 22.2. - " - end - end + Logger.info("Starting Finch") + Finch.start_link(name: MyFinch) # Define workers and child supervisors to be supervised children = @@ -98,7 +72,6 @@ def start(_type, _args) do {Task.Supervisor, name: Pleroma.TaskSupervisor} ] ++ cachex_children() ++ - http_children(adapter, @mix_env) ++ [ Pleroma.Stats, Pleroma.JobQueueMonitor, @@ -289,34 +262,6 @@ defp task_children(_) do ] end - # start hackney and gun pools in tests - defp http_children(_, :test) do - http_children(Tesla.Adapter.Hackney, nil) ++ http_children(Tesla.Adapter.Gun, nil) - end - - defp http_children(Tesla.Adapter.Hackney, _) do - pools = [:federation, :media] - - pools = - if Config.get([Pleroma.Upload, :proxy_remote]) do - [:upload | pools] - else - pools - end - - for pool <- pools do - options = Config.get([:hackney_pools, pool]) - :hackney_pool.child_spec(pool, options) - end - end - - defp http_children(Tesla.Adapter.Gun, _) do - Pleroma.Gun.ConnectionPool.children() ++ - [{Task, &Pleroma.HTTP.AdapterHelper.Gun.limiter_setup/0}] - end - - defp http_children(_, _), do: [] - @spec limiters_setup() :: :ok def limiters_setup do config = Config.get(ConcurrentLimiter, []) diff --git a/lib/pleroma/config/deprecation_warnings.ex b/lib/pleroma/config/deprecation_warnings.ex index b53b15d95..aad67c51d 100644 --- a/lib/pleroma/config/deprecation_warnings.ex +++ b/lib/pleroma/config/deprecation_warnings.ex @@ -210,7 +210,6 @@ def warn do check_old_mrf_config(), check_media_proxy_whitelist_config(), check_welcome_message_config(), - check_gun_pool_options(), check_activity_expiration_config(), check_remote_ip_plug_name(), check_uploders_s3_public_endpoint(), @@ -295,51 +294,6 @@ def check_media_proxy_whitelist_config do end end - def check_gun_pool_options do - pool_config = Config.get(:connections_pool) - - if timeout = pool_config[:await_up_timeout] do - Logger.warn(""" - !!!DEPRECATION WARNING!!! - Your config is using old setting `config :pleroma, :connections_pool, await_up_timeout`. Please change to `config :pleroma, :connections_pool, connect_timeout` to ensure compatibility with future releases. - """) - - Config.put(:connections_pool, Keyword.put_new(pool_config, :connect_timeout, timeout)) - end - - pools_configs = Config.get(:pools) - - warning_preface = """ - !!!DEPRECATION WARNING!!! - Your config is using old setting name `timeout` instead of `recv_timeout` in pool settings. The setting will not take effect until updated. - """ - - updated_config = - Enum.reduce(pools_configs, [], fn {pool_name, config}, acc -> - if timeout = config[:timeout] do - Keyword.put(acc, pool_name, Keyword.put_new(config, :recv_timeout, timeout)) - else - acc - end - end) - - if updated_config != [] do - pool_warnings = - updated_config - |> Keyword.keys() - |> Enum.map(fn pool_name -> - "\n* `:timeout` options in #{pool_name} pool is now `:recv_timeout`" - end) - - Logger.warn(Enum.join([warning_preface | pool_warnings])) - - Config.put(:pools, updated_config) - :error - else - :ok - end - end - @spec check_activity_expiration_config() :: :ok | nil def check_activity_expiration_config do warning_preface = """ diff --git a/lib/pleroma/config/transfer_task.ex b/lib/pleroma/config/transfer_task.ex index 44a984019..3d6be1f75 100644 --- a/lib/pleroma/config/transfer_task.ex +++ b/lib/pleroma/config/transfer_task.ex @@ -15,14 +15,11 @@ defmodule Pleroma.Config.TransferTask do defp reboot_time_keys, do: [ - {:pleroma, :hackney_pools}, {:pleroma, :shout}, {:pleroma, Oban}, {:pleroma, :rate_limit}, {:pleroma, :markup}, - {:pleroma, :streamer}, - {:pleroma, :pools}, - {:pleroma, :connections_pool} + {:pleroma, :streamer} ] defp reboot_time_subkeys, diff --git a/lib/pleroma/emoji/pack.ex b/lib/pleroma/emoji/pack.ex index a361ea200..008830b81 100644 --- a/lib/pleroma/emoji/pack.ex +++ b/lib/pleroma/emoji/pack.ex @@ -542,7 +542,7 @@ defp get_filename(pack, shortcode) do defp http_get(%URI{} = url), do: url |> to_string() |> http_get() defp http_get(url) do - with {:ok, %{body: body}} <- Pleroma.HTTP.get(url, [], pool: :default) do + with {:ok, %{body: body}} <- Pleroma.HTTP.get(url, [], []) do Jason.decode(body) end end diff --git a/lib/pleroma/frontend.ex b/lib/pleroma/frontend.ex index ec72fb6a4..5f6621cb0 100644 --- a/lib/pleroma/frontend.ex +++ b/lib/pleroma/frontend.ex @@ -92,7 +92,7 @@ defp download_build(frontend_info, dest) do url = String.replace(frontend_info["build_url"], "${ref}", frontend_info["ref"]) with {:ok, %{status: 200, body: zip_body}} <- - Pleroma.HTTP.get(url, [], pool: :media, recv_timeout: 120_000) do + Pleroma.HTTP.get(url, [], recv_timeout: 120_000) do unzip(zip_body, dest) else {:error, e} -> {:error, e} diff --git a/lib/pleroma/helpers/media_helper.ex b/lib/pleroma/helpers/media_helper.ex index 24c845fcd..4b0618435 100644 --- a/lib/pleroma/helpers/media_helper.ex +++ b/lib/pleroma/helpers/media_helper.ex @@ -24,7 +24,7 @@ def missing_dependencies do def image_resize(url, options) do with executable when is_binary(executable) <- System.find_executable("convert"), {:ok, args} <- prepare_image_resize_args(options), - {:ok, env} <- HTTP.get(url, [], pool: :media), + {:ok, env} <- HTTP.get(url, [], []), {:ok, fifo_path} <- mkfifo() do args = List.flatten([fifo_path, args]) run_fifo(fifo_path, env, executable, args) @@ -73,7 +73,7 @@ defp prepare_image_resize_args(_), do: {:error, :missing_options} # Note: video thumbnail is intentionally not resized (always has original dimensions) def video_framegrab(url) do with executable when is_binary(executable) <- System.find_executable("ffmpeg"), - {:ok, env} <- HTTP.get(url, [], pool: :media), + {:ok, env} <- HTTP.get(url, [], []), {:ok, fifo_path} <- mkfifo(), args = [ "-y", diff --git a/lib/pleroma/http.ex b/lib/pleroma/http.ex index d41061538..822c41a0d 100644 --- a/lib/pleroma/http.ex +++ b/lib/pleroma/http.ex @@ -66,17 +66,9 @@ def request(method, url, body, headers, options) when is_binary(url) do params = options[:params] || [] request = build_request(method, headers, options, url, body, params) - adapter = Application.get_env(:tesla, :adapter) + client = Tesla.client([Tesla.Middleware.FollowRedirects]) - client = Tesla.client(adapter_middlewares(adapter), adapter) - - maybe_limit( - fn -> - request(client, request) - end, - adapter, - adapter_opts - ) + request(client, request) end @spec request(Client.t(), keyword()) :: {:ok, Env.t()} | {:error, any()} @@ -92,26 +84,4 @@ defp build_request(method, headers, options, url, body, params) do |> Builder.add_param(:query, :query, params) |> Builder.convert_to_keyword() end - - @prefix Pleroma.Gun.ConnectionPool - defp maybe_limit(fun, Tesla.Adapter.Gun, opts) do - ConcurrentLimiter.limit(:"#{@prefix}.#{opts[:pool] || :default}", fun) - end - - defp maybe_limit(fun, _, _) do - fun.() - end - - defp adapter_middlewares(Tesla.Adapter.Gun) do - [Tesla.Middleware.FollowRedirects, Pleroma.Tesla.Middleware.ConnectionPool] - end - - defp adapter_middlewares(_) do - if Pleroma.Config.get(:env) == :test do - # Emulate redirects in test env, which are handled by adapters in other environments - [Tesla.Middleware.FollowRedirects] - else - [] - end - end end diff --git a/lib/pleroma/http/adapter_helper.ex b/lib/pleroma/http/adapter_helper.ex index 252a6aba5..613443292 100644 --- a/lib/pleroma/http/adapter_helper.ex +++ b/lib/pleroma/http/adapter_helper.ex @@ -6,7 +6,7 @@ defmodule Pleroma.HTTP.AdapterHelper do @moduledoc """ Configure Tesla.Client with default and customized adapter options. """ - @defaults [pool: :federation, connect_timeout: 5_000, recv_timeout: 5_000] + @defaults [name: MyFinch, connect_timeout: 5_000, recv_timeout: 5_000] @type proxy_type() :: :socks4 | :socks5 @type host() :: charlist() | :inet.ip_address() @@ -43,17 +43,7 @@ def maybe_add_proxy(opts, proxy), do: Keyword.put_new(opts, :proxy, proxy) def options(%URI{} = uri, opts \\ []) do @defaults |> Keyword.merge(opts) - |> adapter_helper().options(uri) - end - - defp adapter, do: Application.get_env(:tesla, :adapter) - - defp adapter_helper do - case adapter() do - Tesla.Adapter.Gun -> AdapterHelper.Gun - Tesla.Adapter.Hackney -> AdapterHelper.Hackney - _ -> AdapterHelper.Default - end + |> AdapterHelper.Default.options(uri) end @spec parse_proxy(String.t() | tuple() | nil) :: diff --git a/lib/pleroma/http/ex_aws.ex b/lib/pleroma/http/ex_aws.ex index 469c13819..30299da53 100644 --- a/lib/pleroma/http/ex_aws.ex +++ b/lib/pleroma/http/ex_aws.ex @@ -11,8 +11,6 @@ defmodule Pleroma.HTTP.ExAws do @impl true def request(method, url, body \\ "", headers \\ [], http_opts \\ []) do - http_opts = Keyword.put_new(http_opts, :pool, :upload) - case HTTP.request(method, url, body, headers, http_opts) do {:ok, env} -> {:ok, %{status_code: env.status, headers: env.headers, body: env.body}} diff --git a/lib/pleroma/http/tzdata.ex b/lib/pleroma/http/tzdata.ex index 5d2529c08..75586ad2c 100644 --- a/lib/pleroma/http/tzdata.ex +++ b/lib/pleroma/http/tzdata.ex @@ -11,8 +11,6 @@ defmodule Pleroma.HTTP.Tzdata do @impl true def get(url, headers, options) do - options = Keyword.put_new(options, :pool, :default) - with {:ok, %Tesla.Env{} = env} <- HTTP.get(url, headers, options) do {:ok, {env.status, env.headers, env.body}} end @@ -20,8 +18,6 @@ def get(url, headers, options) do @impl true def head(url, headers, options) do - options = Keyword.put_new(options, :pool, :default) - with {:ok, %Tesla.Env{} = env} <- HTTP.head(url, headers, options) do {:ok, {env.status, env.headers}} end diff --git a/lib/pleroma/instances/instance.ex b/lib/pleroma/instances/instance.ex index a5529ad44..d5fb61fc2 100644 --- a/lib/pleroma/instances/instance.ex +++ b/lib/pleroma/instances/instance.ex @@ -170,7 +170,7 @@ defp scrape_favicon(%URI{} = instance_uri) do try do with {_, true} <- {:reachable, reachable?(instance_uri.host)}, {:ok, %Tesla.Env{body: html}} <- - Pleroma.HTTP.get(to_string(instance_uri), [{"accept", "text/html"}], pool: :media), + Pleroma.HTTP.get(to_string(instance_uri), [{"accept", "text/html"}], []), {_, [favicon_rel | _]} when is_binary(favicon_rel) <- {:parse, html |> Floki.parse_document!() |> Floki.attribute("link[rel=icon]", "href")}, diff --git a/lib/pleroma/reverse_proxy.ex b/lib/pleroma/reverse_proxy.ex index 2248c2713..a6d5e88af 100644 --- a/lib/pleroma/reverse_proxy.ex +++ b/lib/pleroma/reverse_proxy.ex @@ -59,11 +59,7 @@ def default_cache_control_header, do: @default_cache_control_header * `req_headers`, `resp_headers` additional headers. - * `http`: options for [hackney](https://github.com/benoitc/hackney) or [gun](https://github.com/ninenines/gun). - """ - @default_options [pool: :media] - @inline_content_types [ "image/gif", "image/jpeg", @@ -94,7 +90,7 @@ def default_cache_control_header, do: @default_cache_control_header def call(_conn, _url, _opts \\ []) def call(conn = %{method: method}, url, opts) when method in @methods do - client_opts = Keyword.merge(@default_options, Keyword.get(opts, :http, [])) + client_opts = Keyword.get(opts, :http, []) req_headers = build_req_headers(conn.req_headers, opts) @@ -106,32 +102,39 @@ def call(conn = %{method: method}, url, opts) when method in @methods do end with {:ok, nil} <- @cachex.get(:failed_proxy_url_cache, url), - {:ok, code, headers, client} <- request(method, url, req_headers, client_opts), + {:ok, status, headers, body} <- request(method, url, req_headers, client_opts), :ok <- header_length_constraint( headers, Keyword.get(opts, :max_body_length, @max_body_length) ) do - response(conn, client, url, code, headers, opts) + conn + |> put_private(:proxied_url, url) + |> response(body, status, headers, opts) else {:ok, true} -> conn - |> error_or_redirect(url, 500, "Request failed", opts) + |> error_or_redirect(500, "Request failed", opts) |> halt() - {:ok, code, headers} -> - head_response(conn, url, code, headers, opts) + {:ok, status, headers} -> + conn + |> put_private(:proxied_url, url) + |> head_response(status, headers, opts) |> halt() - {:error, {:invalid_http_response, code}} -> - Logger.error("#{__MODULE__}: request to #{inspect(url)} failed with HTTP status #{code}") - track_failed_url(url, code, opts) + {:error, {:invalid_http_response, status}} -> + Logger.error( + "#{__MODULE__}: request to #{inspect(url)} failed with HTTP status #{status}" + ) + + track_failed_url(url, status, opts) conn + |> put_private(:proxied_url, url) |> error_or_redirect( - url, - code, - "Request failed: " <> Plug.Conn.Status.reason_phrase(code), + status, + "Request failed: " <> Plug.Conn.Status.reason_phrase(status), opts ) |> halt() @@ -141,7 +144,8 @@ def call(conn = %{method: method}, url, opts) when method in @methods do track_failed_url(url, error, opts) conn - |> error_or_redirect(url, 500, "Request failed", opts) + |> put_private(:proxied_url, url) + |> error_or_redirect(500, "Request failed", opts) |> halt() end end @@ -156,93 +160,48 @@ defp request(method, url, headers, opts) do Logger.debug("#{__MODULE__} #{method} #{url} #{inspect(headers)}") method = method |> String.downcase() |> String.to_existing_atom() - case client().request(method, url, headers, "", opts) do - {:ok, code, headers, client} when code in @valid_resp_codes -> - {:ok, code, downcase_headers(headers), client} + opts = opts ++ [receive_timeout: @max_read_duration] - {:ok, code, headers} when code in @valid_resp_codes -> - {:ok, code, downcase_headers(headers)} + case Pleroma.HTTP.request(method, url, "", headers, opts) do + {:ok, %Tesla.Env{status: status, headers: headers, body: body}} + when status in @valid_resp_codes -> + {:ok, status, downcase_headers(headers), body} - {:ok, code, _, _} -> - {:error, {:invalid_http_response, code}} + {:ok, %Tesla.Env{status: status, headers: headers}} when status in @valid_resp_codes -> + {:ok, status, downcase_headers(headers)} - {:ok, code, _} -> - {:error, {:invalid_http_response, code}} + {:ok, %Tesla.Env{status: status}} -> + {:error, {:invalid_http_response, status}} {:error, error} -> {:error, error} end end - defp response(conn, client, url, status, headers, opts) do - Logger.debug("#{__MODULE__} #{status} #{url} #{inspect(headers)}") - - result = - conn - |> put_resp_headers(build_resp_headers(headers, opts)) - |> send_chunked(status) - |> chunk_reply(client, opts) - - case result do - {:ok, conn} -> - halt(conn) - - {:error, :closed, conn} -> - client().close(client) - halt(conn) + defp response(conn, body, status, headers, opts) do + Logger.debug("#{__MODULE__} #{status} #{conn.private[:proxied_url]} #{inspect(headers)}") - {:error, error, conn} -> - Logger.warn( - "#{__MODULE__} request to #{url} failed while reading/chunking: #{inspect(error)}" - ) - - client().close(client) - halt(conn) - end - end - - defp chunk_reply(conn, client, opts) do - chunk_reply(conn, client, opts, 0, 0) - end - - defp chunk_reply(conn, client, opts, sent_so_far, duration) do - with {:ok, duration} <- - check_read_duration( - duration, - Keyword.get(opts, :max_read_duration, @max_read_duration) - ), - {:ok, data, client} <- client().stream_body(client), - {:ok, duration} <- increase_read_duration(duration), - sent_so_far = sent_so_far + byte_size(data), - :ok <- - body_size_constraint( - sent_so_far, - Keyword.get(opts, :max_body_length, @max_body_length) - ), - {:ok, conn} <- chunk(conn, data) do - chunk_reply(conn, client, opts, sent_so_far, duration) - else - :done -> {:ok, conn} - {:error, error} -> {:error, error, conn} - end + conn + |> put_resp_headers(build_resp_headers(headers, opts)) + |> send_resp(status, body) end - defp head_response(conn, url, code, headers, opts) do - Logger.debug("#{__MODULE__} #{code} #{url} #{inspect(headers)}") + defp head_response(conn, status, headers, opts) do + Logger.debug("#{__MODULE__} #{status} #{conn.private[:proxied_url]} #{inspect(headers)}") conn |> put_resp_headers(build_resp_headers(headers, opts)) - |> send_resp(code, "") + |> send_resp(status, "") end - defp error_or_redirect(conn, url, code, body, opts) do + defp error_or_redirect(conn, status, body, opts) do if Keyword.get(opts, :redirect_on_failure, false) do conn - |> Phoenix.Controller.redirect(external: url) + |> Phoenix.Controller.redirect(external: conn.private[:proxied_url]) |> halt() else conn - |> send_resp(code, body) + |> send_resp(status, body) |> halt end end @@ -382,37 +341,6 @@ defp header_length_constraint(headers, limit) when is_integer(limit) and limit > defp header_length_constraint(_, _), do: :ok - defp body_size_constraint(size, limit) when is_integer(limit) and limit > 0 and size >= limit do - {:error, :body_too_large} - end - - defp body_size_constraint(_, _), do: :ok - - defp check_read_duration(nil = _duration, max), do: check_read_duration(@max_read_duration, max) - - defp check_read_duration(duration, max) - when is_integer(duration) and is_integer(max) and max > 0 do - if duration > max do - {:error, :read_duration_exceeded} - else - {:ok, {duration, :erlang.system_time(:millisecond)}} - end - end - - defp check_read_duration(_, _), do: {:ok, :no_duration_limit, :no_duration_limit} - - defp increase_read_duration({previous_duration, started}) - when is_integer(previous_duration) and is_integer(started) do - duration = :erlang.system_time(:millisecond) - started - {:ok, previous_duration + duration} - end - - defp increase_read_duration(_) do - {:ok, :no_duration_limit, :no_duration_limit} - end - - defp client, do: Pleroma.ReverseProxy.Client.Wrapper - defp track_failed_url(url, error, opts) do ttl = unless error in [:body_too_large, 400, 204] do diff --git a/lib/pleroma/telemetry/logger.ex b/lib/pleroma/telemetry/logger.ex index 384c70fbc..b237f6811 100644 --- a/lib/pleroma/telemetry/logger.ex +++ b/lib/pleroma/telemetry/logger.ex @@ -8,11 +8,7 @@ defmodule Pleroma.Telemetry.Logger do require Logger @events [ - [:pleroma, :connection_pool, :reclaim, :start], - [:pleroma, :connection_pool, :reclaim, :stop], - [:pleroma, :connection_pool, :provision_failure], - [:pleroma, :connection_pool, :client, :dead], - [:pleroma, :connection_pool, :client, :add] + [:pleroma, :repo, :query] ] def attach do :telemetry.attach_many("pleroma-logger", @events, &handle_event/4, []) @@ -23,68 +19,62 @@ def attach do # out anyway due to higher log level configured def handle_event( - [:pleroma, :connection_pool, :reclaim, :start], - _, - %{max_connections: max_connections, reclaim_max: reclaim_max}, - _ + [:pleroma, :repo, :query] = _name, + %{query_time: query_time} = measurements, + %{source: source} = metadata, + config ) do - Logger.debug(fn -> - "Connection pool is exhausted (reached #{max_connections} connections). Starting idle connection cleanup to reclaim as much as #{reclaim_max} connections" - end) - end + logging_config = Pleroma.Config.get([:telemetry, :slow_queries_logging], []) - def handle_event( - [:pleroma, :connection_pool, :reclaim, :stop], - %{reclaimed_count: 0}, - _, - _ - ) do - Logger.error(fn -> - "Connection pool failed to reclaim any connections due to all of them being in use. It will have to drop requests for opening connections to new hosts" - end) + if logging_config[:enabled] && + logging_config[:min_duration] && + query_time > logging_config[:min_duration] and + (is_nil(logging_config[:exclude_sources]) or + source not in logging_config[:exclude_sources]) do + log_slow_query(measurements, metadata, config) + else + :ok + end end - def handle_event( - [:pleroma, :connection_pool, :reclaim, :stop], - %{reclaimed_count: reclaimed_count}, - _, - _ - ) do - Logger.debug(fn -> "Connection pool cleaned up #{reclaimed_count} idle connections" end) - end + defp log_slow_query( + %{query_time: query_time} = _measurements, + %{source: _source, query: query, params: query_params, repo: repo} = _metadata, + _config + ) do + sql_explain = + with {:ok, %{rows: explain_result_rows}} <- + repo.query("EXPLAIN " <> query, query_params, log: false) do + Enum.map_join(explain_result_rows, "\n", & &1) + end - def handle_event( - [:pleroma, :connection_pool, :provision_failure], - %{opts: [key | _]}, - _, - _ - ) do - Logger.error(fn -> - "Connection pool had to refuse opening a connection to #{key} due to connection limit exhaustion" - end) - end + {:current_stacktrace, stacktrace} = Process.info(self(), :current_stacktrace) + + pleroma_stacktrace = + Enum.filter(stacktrace, fn + {__MODULE__, _, _, _} -> + false + + {mod, _, _, _} -> + mod + |> to_string() + |> String.starts_with?("Elixir.Pleroma.") + end) - def handle_event( - [:pleroma, :connection_pool, :client, :dead], - %{client_pid: client_pid, reason: reason}, - %{key: key}, - _ - ) do Logger.warn(fn -> - "Pool worker for #{key}: Client #{inspect(client_pid)} died before releasing the connection with #{inspect(reason)}" - end) - end + """ + Slow query! - def handle_event( - [:pleroma, :connection_pool, :client, :add], - %{clients: [_, _ | _] = clients}, - %{key: key, protocol: :http}, - _ - ) do - Logger.info(fn -> - "Pool worker for #{key}: #{length(clients)} clients are using an HTTP1 connection at the same time, head-of-line blocking might occur." + Total time: #{round(query_time / 1_000)} ms + + #{query} + + #{inspect(query_params, limit: :infinity)} + + #{sql_explain} + + #{Exception.format_stacktrace(pleroma_stacktrace)} + """ end) end - - def handle_event([:pleroma, :connection_pool, :client, :add], _, _, _), do: :ok end diff --git a/lib/pleroma/uploaders/s3.ex b/lib/pleroma/uploaders/s3.ex index 19287c532..aca9697b5 100644 --- a/lib/pleroma/uploaders/s3.ex +++ b/lib/pleroma/uploaders/s3.ex @@ -30,23 +30,12 @@ def put_file(%Pleroma.Upload{} = upload) do op = if streaming do - op = - upload.tempfile - |> ExAws.S3.Upload.stream_file() - |> ExAws.S3.upload(bucket, s3_name, [ - {:acl, :public_read}, - {:content_type, upload.content_type} - ]) - - if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do - # set s3 upload timeout to respect :upload pool timeout - # timeout should be slightly larger, so s3 can retry upload on fail - timeout = Pleroma.HTTP.AdapterHelper.Gun.pool_timeout(:upload) + 1_000 - opts = Keyword.put(op.opts, :timeout, timeout) - Map.put(op, :opts, opts) - else - op - end + upload.tempfile + |> ExAws.S3.Upload.stream_file() + |> ExAws.S3.upload(bucket, s3_name, [ + {:acl, :public_read}, + {:content_type, upload.content_type} + ]) else {:ok, file_data} = File.read(upload.tempfile) diff --git a/lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex b/lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex index c95d35bb9..d517f671b 100644 --- a/lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex +++ b/lib/pleroma/web/activity_pub/mrf/media_proxy_warming_policy.ex @@ -12,7 +12,6 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do require Logger @adapter_options [ - pool: :media, recv_timeout: 10_000 ] diff --git a/lib/pleroma/web/media_proxy/media_proxy_controller.ex b/lib/pleroma/web/media_proxy/media_proxy_controller.ex index d2ad62c13..85aa7dea5 100644 --- a/lib/pleroma/web/media_proxy/media_proxy_controller.ex +++ b/lib/pleroma/web/media_proxy/media_proxy_controller.ex @@ -54,7 +54,7 @@ defp handle_preview(conn, url) do media_proxy_url = MediaProxy.url(url) with {:ok, %{status: status} = head_response} when status in 200..299 <- - Pleroma.HTTP.request("HEAD", media_proxy_url, [], [], pool: :media) do + Pleroma.HTTP.request("HEAD", media_proxy_url, [], [], name: MyFinch) do content_type = Tesla.get_header(head_response, "content-type") content_length = Tesla.get_header(head_response, "content-length") content_length = content_length && String.to_integer(content_length) diff --git a/lib/pleroma/web/plugs/uploaded_media.ex b/lib/pleroma/web/plugs/uploaded_media.ex index ad8143234..3af4f93dc 100644 --- a/lib/pleroma/web/plugs/uploaded_media.ex +++ b/lib/pleroma/web/plugs/uploaded_media.ex @@ -89,8 +89,7 @@ defp get_media(conn, {:static_dir, directory}, _, opts) do defp get_media(conn, {:url, url}, true, _) do proxy_opts = [ http: [ - follow_redirect: true, - pool: :upload + follow_redirect: true ] ] diff --git a/lib/pleroma/web/rel_me.ex b/lib/pleroma/web/rel_me.ex index 98fbc1c59..847b99091 100644 --- a/lib/pleroma/web/rel_me.ex +++ b/lib/pleroma/web/rel_me.ex @@ -4,7 +4,6 @@ defmodule Pleroma.Web.RelMe do @options [ - pool: :media, max_body: 2_000_000, recv_timeout: 2_000 ] diff --git a/lib/pleroma/web/rich_media/helpers.ex b/lib/pleroma/web/rich_media/helpers.ex index 0488df30e..4bd2e521e 100644 --- a/lib/pleroma/web/rich_media/helpers.ex +++ b/lib/pleroma/web/rich_media/helpers.ex @@ -10,7 +10,6 @@ defmodule Pleroma.Web.RichMedia.Helpers do alias Pleroma.Web.RichMedia.Parser @options [ - pool: :media, max_body: 2_000_000, recv_timeout: 2_000 ] diff --git a/mix.exs b/mix.exs index 3f8d2c3a7..c95187212 100644 --- a/mix.exs +++ b/mix.exs @@ -138,7 +138,7 @@ defp deps do {:calendar, "~> 1.0"}, {:cachex, "~> 3.2"}, {:poison, "~> 3.0", override: true}, - {:tesla, "~> 1.4.0", override: true}, + {:tesla, "~> 1.4.4", override: true}, {:castore, "~> 0.1"}, {:cowlib, "~> 2.9", override: true}, {:gun, "~> 2.0.0-rc.1", override: true}, diff --git a/test/pleroma/config/deprecation_warnings_test.exs b/test/pleroma/config/deprecation_warnings_test.exs index f3453ddb0..55d03b70f 100644 --- a/test/pleroma/config/deprecation_warnings_test.exs +++ b/test/pleroma/config/deprecation_warnings_test.exs @@ -336,50 +336,6 @@ test "check_uploders_s3_public_endpoint/0" do "Your config is using the old setting for controlling the URL of media uploaded to your S3 bucket." end - describe "check_gun_pool_options/0" do - test "await_up_timeout" do - config = Config.get(:connections_pool) - clear_config(:connections_pool, Keyword.put(config, :await_up_timeout, 5_000)) - - assert capture_log(fn -> - DeprecationWarnings.check_gun_pool_options() - end) =~ - "Your config is using old setting `config :pleroma, :connections_pool, await_up_timeout`." - end - - test "pool timeout" do - old_config = [ - federation: [ - size: 50, - max_waiting: 10, - timeout: 10_000 - ], - media: [ - size: 50, - max_waiting: 10, - timeout: 10_000 - ], - upload: [ - size: 25, - max_waiting: 5, - timeout: 15_000 - ], - default: [ - size: 10, - max_waiting: 2, - timeout: 5_000 - ] - ] - - clear_config(:pools, old_config) - - assert capture_log(fn -> - DeprecationWarnings.check_gun_pool_options() - end) =~ - "Your config is using old setting name `timeout` instead of `recv_timeout` in pool settings" - end - end - test "check_old_chat_shoutbox/0" do clear_config([:instance, :chat_limit], 1_000) clear_config([:chat, :enabled], true) diff --git a/test/pleroma/reverse_proxy_test.exs b/test/pleroma/reverse_proxy_test.exs index 0bd4db8d1..54e860b6c 100644 --- a/test/pleroma/reverse_proxy_test.exs +++ b/test/pleroma/reverse_proxy_test.exs @@ -8,44 +8,10 @@ defmodule Pleroma.ReverseProxyTest do import Mox alias Pleroma.ReverseProxy - alias Pleroma.ReverseProxy.ClientMock alias Plug.Conn - setup_all do - {:ok, _} = Registry.start_link(keys: :unique, name: ClientMock) - :ok - end - - setup :verify_on_exit! - - defp request_mock(invokes) do - ClientMock - |> expect(:request, fn :get, url, headers, _body, _opts -> - Registry.register(ClientMock, url, 0) - body = headers |> Enum.into(%{}) |> Jason.encode!() - - {:ok, 200, - [ - {"content-type", "application/json"}, - {"content-length", byte_size(body) |> to_string()} - ], %{url: url, body: body}} - end) - |> expect(:stream_body, invokes, fn %{url: url, body: body} = client -> - case Registry.lookup(ClientMock, url) do - [{_, 0}] -> - Registry.update_value(ClientMock, url, &(&1 + 1)) - {:ok, body, client} - - [{_, 1}] -> - Registry.unregister(ClientMock, url) - :done - end - end) - end - describe "reverse proxy" do test "do not track successful request", %{conn: conn} do - request_mock(2) url = "/success" conn = ReverseProxy.call(conn, url) @@ -56,8 +22,6 @@ test "do not track successful request", %{conn: conn} do end test "use Pleroma's user agent in the request; don't pass the client's", %{conn: conn} do - request_mock(2) - conn = conn |> Plug.Conn.put_req_header("user-agent", "fake/1.0") @@ -110,8 +74,6 @@ defp stream_mock(invokes, with_close? \\ false) do describe "max_body" do test "length returns error if content-length more than option", %{conn: conn} do - request_mock(0) - assert capture_log(fn -> ReverseProxy.call(conn, "/huge-file", max_body_length: 4) end) =~ diff --git a/test/test_helper.exs b/test/test_helper.exs index 60a61484f..d2016ff53 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -7,9 +7,6 @@ Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, :manual) -Mox.defmock(Pleroma.ReverseProxy.ClientMock, for: Pleroma.ReverseProxy.Client) -Mox.defmock(Pleroma.GunMock, for: Pleroma.Gun) - {:ok, _} = Application.ensure_all_started(:ex_machina) ExUnit.after_suite(fn _results -> -- cgit v1.2.3