summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortusooa <tusooa@kazv.moe>2023-03-31 22:55:52 -0400
committertusooa <tusooa@kazv.moe>2023-10-15 17:19:49 -0400
commit273cda63ad79b61f4d37e4b7603694908e894e4f (patch)
tree236ab9d78d98292dc8e0b7b06940fb8b4714b55f
parent2b5636bf127b1987736c281d346a5771c8168c09 (diff)
Allow subscribing to streams
-rw-r--r--lib/pleroma/web/mastodon_api/websocket_handler.ex74
-rw-r--r--lib/pleroma/web/views/streamer_view.ex18
-rw-r--r--test/pleroma/integration/mastodon_websocket_test.exs90
3 files changed, 182 insertions, 0 deletions
diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex
index 97a1f1401..a42a9a63c 100644
--- a/lib/pleroma/web/mastodon_api/websocket_handler.ex
+++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex
@@ -9,6 +9,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
alias Pleroma.User
alias Pleroma.Web.OAuth.Token
alias Pleroma.Web.Streamer
+ alias Pleroma.Web.StreamerView
@behaviour :cowboy_websocket
@@ -73,6 +74,16 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
# We only receive pings for now
def websocket_handle(:ping, state), do: {:ok, state}
+ def websocket_handle({:text, text}, state) do
+ with {:ok, %{} = event} <- Jason.decode(text) do
+ handle_client_event(event, state)
+ else
+ _ ->
+ Logger.error("#{__MODULE__} received non-JSON event: #{inspect(text)}")
+ {:ok, state}
+ end
+ end
+
def websocket_handle(frame, state) do
Logger.error("#{__MODULE__} received frame: #{inspect(frame)}")
{:ok, state}
@@ -144,4 +155,67 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
defp timer do
Process.send_after(self(), :tick, @tick)
end
+
+ defp handle_client_event(%{"type" => "subscribe", "stream" => _topic} = params, state) do
+ with {_, {:ok, topic}} <-
+ {:topic, Streamer.get_topic(params["stream"], state.user, state.oauth_token, params)},
+ {_, false} <- {:subscribed, topic in state.topics} do
+ Streamer.add_socket(topic, state.oauth_token)
+
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{type: "subscribe", result: "success"})}
+ ], %{state | topics: [topic | state.topics]}}
+ else
+ {:subscribed, true} ->
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{type: "subscribe", result: "ignored"})}
+ ], state}
+
+ {:topic, {:error, error}} ->
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{
+ type: "subscribe",
+ result: "error",
+ error: error
+ })}
+ ], state}
+ end
+ end
+
+ defp handle_client_event(%{"type" => "unsubscribe", "stream" => _topic} = params, state) do
+ with {_, {:ok, topic}} <-
+ {:topic, Streamer.get_topic(params["stream"], state.user, state.oauth_token, params)},
+ {_, true} <- {:subscribed, topic in state.topics} do
+ Streamer.remove_socket(topic)
+
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{type: "unsubscribe", result: "success"})}
+ ], %{state | topics: List.delete(state.topics, topic)}}
+ else
+ {:subscribed, false} ->
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{type: "unsubscribe", result: "ignored"})}
+ ], state}
+
+ {:topic, {:error, error}} ->
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{
+ type: "unsubscribe",
+ result: "error",
+ error: error
+ })}
+ ], state}
+ end
+ end
+
+ defp handle_client_event(params, state) do
+ Logger.error("#{__MODULE__} received unknown event: #{inspect(params)}")
+ {[], state}
+ end
end
diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex
index 6a55242b0..19f098783 100644
--- a/lib/pleroma/web/views/streamer_view.ex
+++ b/lib/pleroma/web/views/streamer_view.ex
@@ -135,4 +135,22 @@ defmodule Pleroma.Web.StreamerView do
}
|> Jason.encode!()
end
+
+ def render("pleroma_respond.json", %{type: type, result: result} = params) do
+ %{
+ event: "pleroma.respond",
+ payload:
+ %{
+ result: result,
+ type: type
+ }
+ |> Map.merge(maybe_error(params))
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+
+ defp maybe_error(%{error: :bad_topic}), do: %{error: "bad_topic"}
+ defp maybe_error(%{error: :unauthorized}), do: %{error: "unauthorized"}
+ defp maybe_error(_), do: %{}
end
diff --git a/test/pleroma/integration/mastodon_websocket_test.exs b/test/pleroma/integration/mastodon_websocket_test.exs
index 3b120c10e..9db0f714f 100644
--- a/test/pleroma/integration/mastodon_websocket_test.exs
+++ b/test/pleroma/integration/mastodon_websocket_test.exs
@@ -31,6 +31,13 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
WebsocketClient.start_link(self(), path, headers)
end
+ defp decode_json(json) do
+ with {:ok, %{"event" => event, "payload" => payload_text}} <- Jason.decode(json),
+ {:ok, payload} <- Jason.decode(payload_text) do
+ {:ok, %{"event" => event, "payload" => payload}}
+ end
+ end
+
test "refuses invalid requests" do
capture_log(fn ->
assert {:error, %WebSockex.RequestError{code: 404}} = start_socket("?stream=ncjdk")
@@ -79,6 +86,89 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
assert json == view_json
end
+ describe "subscribing via WebSocket" do
+ test "can subscribe" do
+ user = insert(:user)
+ {:ok, pid} = start_socket()
+ WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
+ assert_receive {:text, raw_json}, 1_000
+
+ assert {:ok,
+ %{
+ "event" => "pleroma.respond",
+ "payload" => %{"type" => "subscribe", "result" => "success"}
+ }} = decode_json(raw_json)
+
+ {:ok, activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
+
+ assert_receive {:text, raw_json}, 1_000
+ assert {:ok, json} = Jason.decode(raw_json)
+
+ assert "update" == json["event"]
+ assert json["payload"]
+ assert {:ok, json} = Jason.decode(json["payload"])
+
+ view_json =
+ Pleroma.Web.MastodonAPI.StatusView.render("show.json", activity: activity, for: nil)
+ |> Jason.encode!()
+ |> Jason.decode!()
+
+ assert json == view_json
+ end
+
+ test "won't double subscribe" do
+ user = insert(:user)
+ {:ok, pid} = start_socket()
+ WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
+ assert_receive {:text, raw_json}, 1_000
+
+ assert {:ok,
+ %{
+ "event" => "pleroma.respond",
+ "payload" => %{"type" => "subscribe", "result" => "success"}
+ }} = decode_json(raw_json)
+
+ WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
+ assert_receive {:text, raw_json}, 1_000
+
+ assert {:ok,
+ %{
+ "event" => "pleroma.respond",
+ "payload" => %{"type" => "subscribe", "result" => "ignored"}
+ }} = decode_json(raw_json)
+
+ {:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
+
+ assert_receive {:text, _}, 1_000
+ refute_receive {:text, _}, 1_000
+ end
+
+ test "can unsubscribe" do
+ user = insert(:user)
+ {:ok, pid} = start_socket()
+ WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
+ assert_receive {:text, raw_json}, 1_000
+
+ assert {:ok,
+ %{
+ "event" => "pleroma.respond",
+ "payload" => %{"type" => "subscribe", "result" => "success"}
+ }} = decode_json(raw_json)
+
+ WebsocketClient.send_text(pid, %{type: "unsubscribe", stream: "public"} |> Jason.encode!())
+ assert_receive {:text, raw_json}, 1_000
+
+ assert {:ok,
+ %{
+ "event" => "pleroma.respond",
+ "payload" => %{"type" => "unsubscribe", "result" => "success"}
+ }} = decode_json(raw_json)
+
+ {:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
+ refute_receive {:text, _}, 1_000
+ end
+ end
+
describe "with a valid user token" do
setup do
{:ok, app} =