summaryrefslogtreecommitdiff
path: root/lib/pleroma/web/mastodon_api/websocket_handler.ex
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pleroma/web/mastodon_api/websocket_handler.ex')
-rw-r--r--lib/pleroma/web/mastodon_api/websocket_handler.ex67
1 files changed, 49 insertions, 18 deletions
diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex
index b1aebe014..94e4595d8 100644
--- a/lib/pleroma/web/mastodon_api/websocket_handler.ex
+++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex
@@ -12,8 +12,12 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
@behaviour :cowboy_websocket
- # Handled by periodic keepalive in Pleroma.Web.Streamer.Ping.
- @timeout :infinity
+ # Client ping period.
+ @tick :timer.seconds(30)
+ # Cowboy timeout period.
+ @timeout :timer.seconds(60)
+ # Hibernate every X messages
+ @hibernate_every 100
def init(%{qs: qs} = req, state) do
with params <- Enum.into(:cow_qs.parse_qs(qs), %{}),
@@ -28,7 +32,8 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
req
end
- {:cowboy_websocket, req, %{user: user, topic: topic}, %{idle_timeout: @timeout}}
+ {:cowboy_websocket, req, %{user: user, topic: topic, count: 0, timer: nil},
+ %{idle_timeout: @timeout}}
else
{:error, :bad_topic} ->
Logger.debug("#{__MODULE__} bad topic #{inspect(req)}")
@@ -43,28 +48,54 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
end
def websocket_init(state) do
- send(self(), :subscribe)
- {:ok, state}
- end
-
- # We never receive messages.
- def websocket_handle(_frame, state) do
- {:ok, state}
- end
-
- def websocket_info(:subscribe, state) do
Logger.debug(
"#{__MODULE__} accepted websocket connection for user #{
(state.user || %{id: "anonymous"}).id
}, topic #{state.topic}"
)
- Streamer.add_socket(state.topic, streamer_socket(state))
+ Streamer.add_socket(state.topic, state.user)
+ {:ok, %{state | timer: timer()}}
+ end
+
+ # Client's Pong frame.
+ def websocket_handle(:pong, state) do
+ if state.timer, do: Process.cancel_timer(state.timer)
+ {:ok, %{state | timer: timer()}}
+ end
+
+ # We never receive messages.
+ def websocket_handle(frame, state) do
+ Logger.error("#{__MODULE__} received frame: #{inspect(frame)}")
{:ok, state}
end
+ def websocket_info({:render_with_user, view, template, item}, state) do
+ user = %User{} = User.get_cached_by_ap_id(state.user.ap_id)
+
+ unless Streamer.filtered_by_user?(user, item) do
+ websocket_info({:text, view.render(template, item, user)}, %{state | user: user})
+ else
+ {:ok, state}
+ end
+ end
+
def websocket_info({:text, message}, state) do
- {:reply, {:text, message}, state}
+ # If the websocket processed X messages, force an hibernate/GC.
+ # We don't hibernate at every message to balance CPU usage/latency with RAM usage.
+ if state.count > @hibernate_every do
+ {:reply, {:text, message}, %{state | count: 0}, :hibernate}
+ else
+ {:reply, {:text, message}, %{state | count: state.count + 1}}
+ end
+ end
+
+ # Ping tick. We don't re-queue a timer there, it is instead queued when :pong is received.
+ # As we hibernate there, reset the count to 0.
+ # If the client misses :pong, Cowboy will automatically timeout the connection after
+ # `@idle_timeout`.
+ def websocket_info(:tick, state) do
+ {:reply, :ping, %{state | timer: nil, count: 0}, :hibernate}
end
def terminate(reason, _req, state) do
@@ -74,7 +105,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
}, topic #{state.topic || "?"}: #{inspect(reason)}"
)
- Streamer.remove_socket(state.topic, streamer_socket(state))
+ Streamer.remove_socket(state.topic)
:ok
end
@@ -96,7 +127,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
end
end
- defp streamer_socket(state) do
- %{transport_pid: self(), assigns: state}
+ defp timer do
+ Process.send_after(self(), :tick, @tick)
end
end