summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortusooa <tusooa@kazv.moe>2023-10-15 21:35:30 +0000
committertusooa <tusooa@kazv.moe>2023-10-15 21:35:30 +0000
commit340c8812963a71f5f04b12a4c540bad3871f87fd (patch)
tree4875d29472b3c0325b0301c290b35071b3085ec3
parent14b1b9c9b0cc3a799e8aa8ad1e54ec1b6b3243d8 (diff)
parenteb33a03d0ad8571e3f0f3e0c5e9af39158c72a09 (diff)
Merge branch 'tusooa/3018-unified-stream' into 'develop'
Unified streaming endpoint Closes #3018 See merge request pleroma/pleroma!3864
-rw-r--r--changelog.d/unified-streaming.add1
-rw-r--r--docs/development/API/differences_in_mastoapi_responses.md116
-rw-r--r--lib/pleroma/constants.ex4
-rw-r--r--lib/pleroma/web/api_spec.ex10
-rw-r--r--lib/pleroma/web/api_spec/operations/streaming_operation.ex464
-rw-r--r--lib/pleroma/web/mastodon_api/websocket_handler.ex131
-rw-r--r--lib/pleroma/web/streamer.ex34
-rw-r--r--lib/pleroma/web/views/streamer_view.ex61
-rw-r--r--test/pleroma/integration/mastodon_websocket_test.exs352
-rw-r--r--test/pleroma/notification_test.exs4
-rw-r--r--test/pleroma/web/streamer_test.exs72
11 files changed, 1187 insertions, 62 deletions
diff --git a/changelog.d/unified-streaming.add b/changelog.d/unified-streaming.add
new file mode 100644
index 000000000..84821fcc8
--- /dev/null
+++ b/changelog.d/unified-streaming.add
@@ -0,0 +1 @@
+Add unified streaming endpoint
diff --git a/docs/development/API/differences_in_mastoapi_responses.md b/docs/development/API/differences_in_mastoapi_responses.md
index 4007c63c8..48a9c104c 100644
--- a/docs/development/API/differences_in_mastoapi_responses.md
+++ b/docs/development/API/differences_in_mastoapi_responses.md
@@ -357,6 +357,122 @@ The message payload consist of:
- `follower_count`: follower count
- `following_count`: following count
+### Authenticating via `sec-websocket-protocol` header
+
+Pleroma allows to authenticate via the `sec-websocket-protocol` header, for example, if your access token is `your-access-token`, you can authenticate using the following:
+
+```
+sec-websocket-protocol: your-access-token
+```
+
+### Authenticating after connection via `pleroma:authenticate` event
+
+Pleroma allows to authenticate after connection is established, via the `pleroma:authenticate` event. For example, if your access token is `your-access-token`, you can send the following after the connection is established:
+
+```
+{"type": "pleroma:authenticate", "token": "your-access-token"}
+```
+
+### Response to client-sent events
+
+Pleroma will respond to client-sent events that it recognizes. Supported event types are:
+
+- `subscribe`
+- `unsubscribe`
+- `pleroma:authenticate`
+
+The reply will be in the following format:
+
+```
+{
+ "event": "pleroma:respond",
+ "payload": "{\"type\": \"<type of the client-sent event>\", \"result\": \"<result of the action>\", \"error\": \"<error code>\"}"
+}
+```
+
+Result of the action can be either `success`, `ignored` or `error`. If it is `error`, the `error` property will contain the error code. Otherwise, the `error` property will not be present. Below are some examples:
+
+```
+{
+ "event": "pleroma:respond",
+ "payload": "{\"type\": \"pleroma:authenticate\", \"result\": \"success\"}"
+}
+
+{
+ "event": "pleroma:respond",
+ "payload": "{\"type\": \"subscribe\", \"result\": \"ignored\"}"
+}
+
+{
+ "event": "pleroma:respond",
+ "payload": "{\"type\": \"unsubscribe\", \"result\": \"error\", \"error\": \"bad_topic\"}"
+}
+```
+
+If the sent event is not of a type that Pleroma supports, it will not reply.
+
+### The `stream` attribute of a server-sent event
+
+Technically, this is in Mastodon, but its documentation does nothing to specify its format.
+
+This attribute appears on every event type except `pleroma:respond` and `delete`. It helps clients determine where they should display the new statuses.
+
+The value of the attribute is an array containing one or two elements. The first element is the type of the stream. The second is the identifier related to that specific stream, if applicable.
+
+For the following stream types, there is a second element in the array:
+
+- `list`: The second element is the id of the list, as a string.
+- `hashtag`: The second element is the name of the hashtag.
+- `public:remote:media` and `public:remote`: The second element is the domain of the corresponding instance.
+
+For all other stream types, there is no second element.
+
+Some examples of valid `stream` values:
+
+- `["list", "1"]`: List of id 1.
+- `["hashtag", "mew"]`: The hashtag #mew.
+- `["user:notifications"]`: Notifications for the current user.
+- `["user"]`: Home timeline.
+- `["public:remote", "mew.moe"]`: Public posts from the instance mew.moe .
+
+### The unified streaming endpoint
+
+If you do not specify a stream to connect to when requesting `/api/v1/streaming`, you will enter a connection that subscribes to no streams. After the connection is established, you can authenticate and then subscribe to different streams.
+
+### List of supported streams
+
+Below is a list of supported streams by Pleroma. To make a single-stream WebSocket connection, append the string specified in "Query style" to the streaming endpoint url.
+To subscribe to a stream after the connection is established, merge the JSON object specified in "Subscribe style" with `{"type": "subscribe"}`. To unsubscribe, merge it with `{"type": "unsubscribe"}`.
+
+For example, to receive updates on the list 1, you can connect to `/api/v1/streaming/?stream=list&list=1`, or send
+
+```
+{"type": "subscribe", "stream": "list", "list": "1"}
+```
+
+upon establishing the websocket connection.
+
+To unsubscribe to list 1, send
+
+```
+{"type": "unsubscribe", "stream": "list", "list": "1"}
+```
+
+Note that if you specify a stream that requires a logged-in user in the query string (for example, `user` or `list`), you have to specify the access token when you are trying to establish the connection, i.e. in the query string or via the `sec-websocket-protocol` header.
+
+- `list`
+ - Query style: `?stream=list&list=<id>`
+ - Subscribe style: `{"stream": "list", "list": "<id>"}`
+- `public`, `public:local`, `public:media`, `public:local:media`, `user`, `user:pleroma_chat`, `user:notifications`, `direct`
+ - Query style: `?stream=<stream name>`
+ - Subscribe style: `{"stream": "<stream name>"}`
+- `hashtag`
+ - Query style: `?stream=hashtag&tag=<name>`
+ - Subscribe style: `{"stream": "hashtag", "tag": "<name>"}`
+- `public:remote`, `public:remote:media`
+ - Query style: `?stream=<stream name>&instance=<instance domain>`
+ - Subscribe style: `{"stream": "<stream name>", "instance": "<instance domain>"}`
+
## User muting and thread muting
Both user muting and thread muting can be done for only a certain time by adding an `expires_in` parameter to the API calls and giving the expiration time in seconds.
diff --git a/lib/pleroma/constants.ex b/lib/pleroma/constants.ex
index ed60bcc37..77bc4bfac 100644
--- a/lib/pleroma/constants.ex
+++ b/lib/pleroma/constants.ex
@@ -94,4 +94,8 @@ defmodule Pleroma.Constants do
"application/activity+json"
]
)
+
+ const(public_streams,
+ do: ["public", "public:local", "public:media", "public:local:media"]
+ )
end
diff --git a/lib/pleroma/web/api_spec.ex b/lib/pleroma/web/api_spec.ex
index 2d56dc643..163226ce5 100644
--- a/lib/pleroma/web/api_spec.ex
+++ b/lib/pleroma/web/api_spec.ex
@@ -10,6 +10,14 @@ defmodule Pleroma.Web.ApiSpec do
@behaviour OpenApi
+ defp streaming_paths do
+ %{
+ "/api/v1/streaming" => %OpenApiSpex.PathItem{
+ get: Pleroma.Web.ApiSpec.StreamingOperation.streaming_operation()
+ }
+ }
+ end
+
@impl OpenApi
def spec(opts \\ []) do
%OpenApi{
@@ -45,7 +53,7 @@ defmodule Pleroma.Web.ApiSpec do
}
},
# populate the paths from a phoenix router
- paths: OpenApiSpex.Paths.from_router(Router),
+ paths: Map.merge(streaming_paths(), OpenApiSpex.Paths.from_router(Router)),
components: %OpenApiSpex.Components{
parameters: %{
"accountIdOrNickname" =>
diff --git a/lib/pleroma/web/api_spec/operations/streaming_operation.ex b/lib/pleroma/web/api_spec/operations/streaming_operation.ex
new file mode 100644
index 000000000..b580bc2f0
--- /dev/null
+++ b/lib/pleroma/web/api_spec/operations/streaming_operation.ex
@@ -0,0 +1,464 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Web.ApiSpec.StreamingOperation do
+ alias OpenApiSpex.Operation
+ alias OpenApiSpex.Response
+ alias OpenApiSpex.Schema
+ alias Pleroma.Web.ApiSpec.NotificationOperation
+ alias Pleroma.Web.ApiSpec.Schemas.Chat
+ alias Pleroma.Web.ApiSpec.Schemas.Conversation
+ alias Pleroma.Web.ApiSpec.Schemas.FlakeID
+ alias Pleroma.Web.ApiSpec.Schemas.Status
+
+ require Pleroma.Constants
+
+ @spec open_api_operation(atom) :: Operation.t()
+ def open_api_operation(action) do
+ operation = String.to_existing_atom("#{action}_operation")
+ apply(__MODULE__, operation, [])
+ end
+
+ @spec streaming_operation() :: Operation.t()
+ def streaming_operation do
+ %Operation{
+ tags: ["Timelines"],
+ summary: "Establish streaming connection",
+ description: """
+ Receive statuses in real-time via WebSocket.
+
+ You can specify the access token on the query string or through the `sec-websocket-protocol` header. Using
+ the query string to authenticate is considered unsafe and should not be used unless you have to (e.g. to maintain
+ your client's compatibility with Mastodon).
+
+ You may specify a stream on the query string. If you do so and you are connecting to a stream that requires logged-in users,
+ you must specify the access token at the time of the connection (i.e. via query string or header).
+
+ Otherwise, you have the option to authenticate after you have established the connection through client-sent events.
+
+ The "Request body" section below describes what events clients can send through WebSocket, and the "Responses" section
+ describes what events server will send through WebSocket.
+ """,
+ security: [%{"oAuth" => ["read:statuses", "read:notifications"]}],
+ operationId: "WebsocketHandler.streaming",
+ parameters:
+ [
+ Operation.parameter(:connection, :header, %Schema{type: :string}, "connection header",
+ required: true
+ ),
+ Operation.parameter(:upgrade, :header, %Schema{type: :string}, "upgrade header",
+ required: true
+ ),
+ Operation.parameter(
+ :"sec-websocket-key",
+ :header,
+ %Schema{type: :string},
+ "sec-websocket-key header",
+ required: true
+ ),
+ Operation.parameter(
+ :"sec-websocket-version",
+ :header,
+ %Schema{type: :string},
+ "sec-websocket-version header",
+ required: true
+ )
+ ] ++ stream_params() ++ access_token_params(),
+ requestBody: request_body("Client-sent events", client_sent_events()),
+ responses: %{
+ 101 => switching_protocols_response(),
+ 200 =>
+ Operation.response(
+ "Server-sent events",
+ "application/json",
+ server_sent_events()
+ )
+ }
+ }
+ end
+
+ defp stream_params do
+ stream_specifier()
+ |> Enum.map(fn {name, schema} ->
+ Operation.parameter(name, :query, schema, get_schema(schema).description)
+ end)
+ end
+
+ defp access_token_params do
+ [
+ Operation.parameter(:access_token, :query, token(), token().description),
+ Operation.parameter(:"sec-websocket-protocol", :header, token(), token().description)
+ ]
+ end
+
+ defp switching_protocols_response do
+ %Response{
+ description: "Switching protocols",
+ headers: %{
+ "connection" => %OpenApiSpex.Header{required: true},
+ "upgrade" => %OpenApiSpex.Header{required: true},
+ "sec-websocket-accept" => %OpenApiSpex.Header{required: true}
+ }
+ }
+ end
+
+ defp server_sent_events do
+ %Schema{
+ oneOf: [
+ update_event(),
+ status_update_event(),
+ notification_event(),
+ chat_update_event(),
+ follow_relationships_update_event(),
+ conversation_event(),
+ delete_event(),
+ pleroma_respond_event()
+ ]
+ }
+ end
+
+ defp stream do
+ %Schema{
+ type: :array,
+ title: "Stream",
+ description: """
+ The stream identifier.
+ The first item is the name of the stream. If the stream needs a differentiator, the second item will be the corresponding identifier.
+ Currently, for the following stream types, there is a second element in the array:
+
+ - `list`: The second element is the id of the list, as a string.
+ - `hashtag`: The second element is the name of the hashtag.
+ - `public:remote:media` and `public:remote`: The second element is the domain of the corresponding instance.
+ """,
+ maxItems: 2,
+ minItems: 1,
+ items: %Schema{type: :string},
+ example: ["hashtag", "mew"]
+ }
+ end
+
+ defp get_schema(%Schema{} = schema), do: schema
+ defp get_schema(schema), do: schema.schema
+
+ defp server_sent_event_helper(name, description, type, payload, opts \\ []) do
+ payload_type = Keyword.get(opts, :payload_type, :json)
+ has_stream = Keyword.get(opts, :has_stream, true)
+
+ stream_properties =
+ if has_stream do
+ %{stream: stream()}
+ else
+ %{}
+ end
+
+ stream_example = if has_stream, do: %{"stream" => get_schema(stream()).example}, else: %{}
+
+ stream_required = if has_stream, do: [:stream], else: []
+
+ payload_schema =
+ if payload_type == :json do
+ %Schema{
+ title: "Event payload",
+ description: "JSON-encoded string of #{get_schema(payload).title}",
+ allOf: [payload]
+ }
+ else
+ payload
+ end
+
+ payload_example =
+ if payload_type == :json do
+ get_schema(payload).example |> Jason.encode!()
+ else
+ get_schema(payload).example
+ end
+
+ %Schema{
+ type: :object,
+ title: name,
+ description: description,
+ required: [:event, :payload] ++ stream_required,
+ properties:
+ %{
+ event: %Schema{
+ title: "Event type",
+ description: "Type of the event.",
+ type: :string,
+ required: true,
+ enum: [type]
+ },
+ payload: payload_schema
+ }
+ |> Map.merge(stream_properties),
+ example:
+ %{
+ "event" => type,
+ "payload" => payload_example
+ }
+ |> Map.merge(stream_example)
+ }
+ end
+
+ defp update_event do
+ server_sent_event_helper("New status", "A newly-posted status.", "update", Status)
+ end
+
+ defp status_update_event do
+ server_sent_event_helper("Edit", "A status that was just edited", "status.update", Status)
+ end
+
+ defp notification_event do
+ server_sent_event_helper(
+ "Notification",
+ "A new notification.",
+ "notification",
+ NotificationOperation.notification()
+ )
+ end
+
+ defp follow_relationships_update_event do
+ server_sent_event_helper(
+ "Follow relationships update",
+ "An update to follow relationships.",
+ "pleroma:follow_relationships_update",
+ %Schema{
+ type: :object,
+ title: "Follow relationships update",
+ required: [:state, :follower, :following],
+ properties: %{
+ state: %Schema{
+ type: :string,
+ description: "Follow state of the relationship.",
+ enum: ["follow_pending", "follow_accept", "follow_reject", "unfollow"]
+ },
+ follower: %Schema{
+ type: :object,
+ description: "Information about the follower.",
+ required: [:id, :follower_count, :following_count],
+ properties: %{
+ id: FlakeID,
+ follower_count: %Schema{type: :integer},
+ following_count: %Schema{type: :integer}
+ }
+ },
+ following: %Schema{
+ type: :object,
+ description: "Information about the following person.",
+ required: [:id, :follower_count, :following_count],
+ properties: %{
+ id: FlakeID,
+ follower_count: %Schema{type: :integer},
+ following_count: %Schema{type: :integer}
+ }
+ }
+ },
+ example: %{
+ "state" => "follow_pending",
+ "follower" => %{
+ "id" => "someUser1",
+ "follower_count" => 1,
+ "following_count" => 1
+ },
+ "following" => %{
+ "id" => "someUser2",
+ "follower_count" => 1,
+ "following_count" => 1
+ }
+ }
+ }
+ )
+ end
+
+ defp chat_update_event do
+ server_sent_event_helper(
+ "Chat update",
+ "A new chat message.",
+ "pleroma:chat_update",
+ Chat
+ )
+ end
+
+ defp conversation_event do
+ server_sent_event_helper(
+ "Conversation update",
+ "An update about a conversation",
+ "conversation",
+ Conversation
+ )
+ end
+
+ defp delete_event do
+ server_sent_event_helper(
+ "Delete",
+ "A status that was just deleted.",
+ "delete",
+ %Schema{
+ type: :string,
+ title: "Status id",
+ description: "Id of the deleted status",
+ allOf: [FlakeID],
+ example: "some-opaque-id"
+ },
+ payload_type: :string,
+ has_stream: false
+ )
+ end
+
+ defp pleroma_respond_event do
+ server_sent_event_helper(
+ "Server response",
+ "A response to a client-sent event.",
+ "pleroma:respond",
+ %Schema{
+ type: :object,
+ title: "Results",
+ required: [:result, :type],
+ properties: %{
+ result: %Schema{
+ type: :string,
+ title: "Result of the request",
+ enum: ["success", "error", "ignored"]
+ },
+ error: %Schema{
+ type: :string,
+ title: "Error code",
+ description: "An error identifier. Only appears if `result` is `error`."
+ },
+ type: %Schema{
+ type: :string,
+ description: "Type of the request."
+ }
+ },
+ example: %{"result" => "success", "type" => "pleroma:authenticate"}
+ },
+ has_stream: false
+ )
+ end
+
+ defp client_sent_events do
+ %Schema{
+ oneOf: [
+ subscribe_event(),
+ unsubscribe_event(),
+ authenticate_event()
+ ]
+ }
+ end
+
+ defp request_body(description, schema, opts \\ []) do
+ %OpenApiSpex.RequestBody{
+ description: description,
+ content: %{
+ "application/json" => %OpenApiSpex.MediaType{
+ schema: schema,
+ example: opts[:example],
+ examples: opts[:examples]
+ }
+ }
+ }
+ end
+
+ defp client_sent_event_helper(name, description, type, properties, opts) do
+ required = opts[:required] || []
+
+ %Schema{
+ type: :object,
+ title: name,
+ required: [:type] ++ required,
+ description: description,
+ properties:
+ %{
+ type: %Schema{type: :string, enum: [type], description: "Type of the event."}
+ }
+ |> Map.merge(properties),
+ example: opts[:example]
+ }
+ end
+
+ defp subscribe_event do
+ client_sent_event_helper(
+ "Subscribe",
+ "Subscribe to a stream.",
+ "subscribe",
+ stream_specifier(),
+ required: [:stream],
+ example: %{"type" => "subscribe", "stream" => "list", "list" => "1"}
+ )
+ end
+
+ defp unsubscribe_event do
+ client_sent_event_helper(
+ "Unsubscribe",
+ "Unsubscribe from a stream.",
+ "unsubscribe",
+ stream_specifier(),
+ required: [:stream],
+ example: %{
+ "type" => "unsubscribe",
+ "stream" => "public:remote:media",
+ "instance" => "example.org"
+ }
+ )
+ end
+
+ defp authenticate_event do
+ client_sent_event_helper(
+ "Authenticate",
+ "Authenticate via an access token.",
+ "pleroma:authenticate",
+ %{
+ token: token()
+ },
+ required: [:token]
+ )
+ end
+
+ defp token do
+ %Schema{
+ type: :string,
+ description: "An OAuth access token with corresponding permissions.",
+ example: "some token"
+ }
+ end
+
+ defp stream_specifier do
+ %{
+ stream: %Schema{
+ type: :string,
+ description: "The name of the stream.",
+ enum:
+ Pleroma.Constants.public_streams() ++
+ [
+ "public:remote",
+ "public:remote:media",
+ "user",
+ "user:pleroma_chat",
+ "user:notification",
+ "direct",
+ "list",
+ "hashtag"
+ ]
+ },
+ list: %Schema{
+ type: :string,
+ title: "List id",
+ description: "The id of the list. Required when `stream` is `list`.",
+ example: "some-id"
+ },
+ tag: %Schema{
+ type: :string,
+ title: "Hashtag name",
+ description: "The name of the hashtag. Required when `stream` is `hashtag`.",
+ example: "mew"
+ },
+ instance: %Schema{
+ type: :string,
+ title: "Domain name",
+ description:
+ "Domain name of the instance. Required when `stream` is `public:remote` or `public:remote:media`.",
+ example: "example.org"
+ }
+ }
+ end
+end
diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex
index 88444106d..07c2b62e3 100644
--- a/lib/pleroma/web/mastodon_api/websocket_handler.ex
+++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex
@@ -9,6 +9,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
alias Pleroma.User
alias Pleroma.Web.OAuth.Token
alias Pleroma.Web.Streamer
+ alias Pleroma.Web.StreamerView
@behaviour :cowboy_websocket
@@ -32,8 +33,15 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
req
end
+ topics =
+ if topic do
+ [topic]
+ else
+ []
+ end
+
{:cowboy_websocket, req,
- %{user: user, topic: topic, oauth_token: oauth_token, count: 0, timer: nil},
+ %{user: user, topics: topics, oauth_token: oauth_token, count: 0, timer: nil},
%{idle_timeout: @timeout}}
else
{:error, :bad_topic} ->
@@ -50,10 +58,10 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
def websocket_init(state) do
Logger.debug(
- "#{__MODULE__} accepted websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topic #{state.topic}"
+ "#{__MODULE__} accepted websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topics #{state.topics}"
)
- Streamer.add_socket(state.topic, state.oauth_token)
+ Enum.each(state.topics, fn topic -> Streamer.add_socket(topic, state.oauth_token) end)
{:ok, %{state | timer: timer()}}
end
@@ -66,16 +74,26 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
# We only receive pings for now
def websocket_handle(:ping, state), do: {:ok, state}
+ def websocket_handle({:text, text}, state) do
+ with {:ok, %{} = event} <- Jason.decode(text) do
+ handle_client_event(event, state)
+ else
+ _ ->
+ Logger.error("#{__MODULE__} received non-JSON event: #{inspect(text)}")
+ {:ok, state}
+ end
+ end
+
def websocket_handle(frame, state) do
Logger.error("#{__MODULE__} received frame: #{inspect(frame)}")
{:ok, state}
end
- def websocket_info({:render_with_user, view, template, item}, state) do
+ def websocket_info({:render_with_user, view, template, item, topic}, state) do
user = %User{} = User.get_cached_by_ap_id(state.user.ap_id)
unless Streamer.filtered_by_user?(user, item) do
- websocket_info({:text, view.render(template, item, user)}, %{state | user: user})
+ websocket_info({:text, view.render(template, item, user, topic)}, %{state | user: user})
else
{:ok, state}
end
@@ -109,10 +127,10 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
def terminate(reason, _req, state) do
Logger.debug(
- "#{__MODULE__} terminating websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topic #{state.topic || "?"}: #{inspect(reason)}"
+ "#{__MODULE__} terminating websocket connection for user #{(state.user || %{id: "anonymous"}).id}, topics #{state.topics || "?"}: #{inspect(reason)}"
)
- Streamer.remove_socket(state.topic)
+ Enum.each(state.topics, fn topic -> Streamer.remove_socket(topic) end)
:ok
end
@@ -137,4 +155,103 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
defp timer do
Process.send_after(self(), :tick, @tick)
end
+
+ defp handle_client_event(%{"type" => "subscribe", "stream" => _topic} = params, state) do
+ with {_, {:ok, topic}} <-
+ {:topic, Streamer.get_topic(params["stream"], state.user, state.oauth_token, params)},
+ {_, false} <- {:subscribed, topic in state.topics} do
+ Streamer.add_socket(topic, state.oauth_token)
+
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{type: "subscribe", result: "success"})}
+ ], %{state | topics: [topic | state.topics]}}
+ else
+ {:subscribed, true} ->
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{type: "subscribe", result: "ignored"})}
+ ], state}
+
+ {:topic, {:error, error}} ->
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{
+ type: "subscribe",
+ result: "error",
+ error: error
+ })}
+ ], state}
+ end
+ end
+
+ defp handle_client_event(%{"type" => "unsubscribe", "stream" => _topic} = params, state) do
+ with {_, {:ok, topic}} <-
+ {:topic, Streamer.get_topic(params["stream"], state.user, state.oauth_token, params)},
+ {_, true} <- {:subscribed, topic in state.topics} do
+ Streamer.remove_socket(topic)
+
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{type: "unsubscribe", result: "success"})}
+ ], %{state | topics: List.delete(state.topics, topic)}}
+ else
+ {:subscribed, false} ->
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{type: "unsubscribe", result: "ignored"})}
+ ], state}
+
+ {:topic, {:error, error}} ->
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{
+ type: "unsubscribe",
+ result: "error",
+ error: error
+ })}
+ ], state}
+ end
+ end
+
+ defp handle_client_event(
+ %{"type" => "pleroma:authenticate", "token" => access_token} = _params,
+ state
+ ) do
+ with {:auth, nil, nil} <- {:auth, state.user, state.oauth_token},
+ {:ok, user, oauth_token} <- authenticate_request(access_token, nil) do
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{
+ type: "pleroma:authenticate",
+ result: "success"
+ })}
+ ], %{state | user: user, oauth_token: oauth_token}}
+ else
+ {:auth, _, _} ->
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{
+ type: "pleroma:authenticate",
+ result: "error",
+ error: :already_authenticated
+ })}
+ ], state}
+
+ _ ->
+ {[
+ {:text,
+ StreamerView.render("pleroma_respond.json", %{
+ type: "pleroma:authenticate",
+ result: "error",
+ error: :unauthorized
+ })}
+ ], state}
+ end
+ end
+
+ defp handle_client_event(params, state) do
+ Logger.error("#{__MODULE__} received unknown event: #{inspect(params)}")
+ {[], state}
+ end
end
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex
index b9a04cc76..48ca82421 100644
--- a/lib/pleroma/web/streamer.ex
+++ b/lib/pleroma/web/streamer.ex
@@ -4,6 +4,7 @@
defmodule Pleroma.Web.Streamer do
require Logger
+ require Pleroma.Constants
alias Pleroma.Activity
alias Pleroma.Chat.MessageReference
@@ -24,7 +25,7 @@ defmodule Pleroma.Web.Streamer do
def registry, do: @registry
- @public_streams ["public", "public:local", "public:media", "public:local:media"]
+ @public_streams Pleroma.Constants.public_streams()
@local_streams ["public:local", "public:local:media"]
@user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
@@ -59,10 +60,14 @@ defmodule Pleroma.Web.Streamer do
end
@doc "Expand and authorizes a stream"
- @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
- {:ok, topic :: String.t()} | {:error, :bad_topic}
+ @spec get_topic(stream :: String.t() | nil, User.t() | nil, Token.t() | nil, Map.t()) ::
+ {:ok, topic :: String.t() | nil} | {:error, :bad_topic}
def get_topic(stream, user, oauth_token, params \\ %{})
+ def get_topic(nil = _stream, _user, _oauth_token, _params) do
+ {:ok, nil}
+ end
+
# Allow all public steams if the instance allows unauthenticated access.
# Otherwise, only allow users with valid oauth tokens.
def get_topic(stream, user, oauth_token, _params) when stream in @public_streams do
@@ -219,8 +224,8 @@ defmodule Pleroma.Web.Streamer do
end
defp do_stream("follow_relationship", item) do
- text = StreamerView.render("follow_relationships_update.json", item)
user_topic = "user:#{item.follower.id}"
+ text = StreamerView.render("follow_relationships_update.json", item, user_topic)
Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
@@ -266,9 +271,11 @@ defmodule Pleroma.Web.Streamer do
defp do_stream(topic, %Notification{} = item)
when topic in ["user", "user:notification"] do
- Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
+ user_topic = "#{topic}:#{item.user_id}"
+
+ Registry.dispatch(@registry, user_topic, fn list ->
Enum.each(list, fn {pid, _auth} ->
- send(pid, {:render_with_user, StreamerView, "notification.json", item})
+ send(pid, {:render_with_user, StreamerView, "notification.json", item, user_topic})
end)
end)
end
@@ -277,7 +284,7 @@ defmodule Pleroma.Web.Streamer do
when topic in ["user", "user:pleroma_chat"] do
topic = "#{topic}:#{user.id}"
- text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
+ text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}, topic)
Registry.dispatch(@registry, topic, fn list ->
Enum.each(list, fn {pid, _auth} ->
@@ -305,7 +312,7 @@ defmodule Pleroma.Web.Streamer do
end
defp push_to_socket(topic, %Participation{} = participation) do
- rendered = StreamerView.render("conversation.json", participation)
+ rendered = StreamerView.render("conversation.json", participation, topic)
Registry.dispatch(@registry, topic, fn list ->
Enum.each(list, fn {pid, _} ->
@@ -333,12 +340,15 @@ defmodule Pleroma.Web.Streamer do
Pleroma.Activity.get_create_by_object_ap_id(item.object.data["id"])
|> Map.put(:object, item.object)
- anon_render = StreamerView.render("status_update.json", create_activity)
+ anon_render = StreamerView.render("status_update.json", create_activity, topic)
Registry.dispatch(@registry, topic, fn list ->
Enum.each(list, fn {pid, auth?} ->
if auth? do
- send(pid, {:render_with_user, StreamerView, "status_update.json", create_activity})
+ send(
+ pid,
+ {:render_with_user, StreamerView, "status_update.json", create_activity, topic}
+ )
else
send(pid, {:text, anon_render})
end
@@ -347,12 +357,12 @@ defmodule Pleroma.Web.Streamer do
end
defp push_to_socket(topic, item) do
- anon_render = StreamerView.render("update.json", item)
+ anon_render = StreamerView.render("update.json", item, topic)
Registry.dispatch(@registry, topic, fn list ->
Enum.each(list, fn {pid, auth?} ->
if auth? do
- send(pid, {:render_with_user, StreamerView, "update.json", item})
+ send(pid, {:render_with_user, StreamerView, "update.json", item, topic})
else
send(pid, {:text, anon_render})
end
diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex
index 6a55242b0..f97570b0a 100644
--- a/lib/pleroma/web/views/streamer_view.ex
+++ b/lib/pleroma/web/views/streamer_view.ex
@@ -11,8 +11,11 @@ defmodule Pleroma.Web.StreamerView do
alias Pleroma.User
alias Pleroma.Web.MastodonAPI.NotificationView
- def render("update.json", %Activity{} = activity, %User{} = user) do
+ require Pleroma.Constants
+
+ def render("update.json", %Activity{} = activity, %User{} = user, topic) do
%{
+ stream: render("stream.json", %{topic: topic}),
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
@@ -25,8 +28,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!()
end
- def render("status_update.json", %Activity{} = activity, %User{} = user) do
+ def render("status_update.json", %Activity{} = activity, %User{} = user, topic) do
%{
+ stream: render("stream.json", %{topic: topic}),
event: "status.update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
@@ -39,8 +43,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!()
end
- def render("notification.json", %Notification{} = notify, %User{} = user) do
+ def render("notification.json", %Notification{} = notify, %User{} = user, topic) do
%{
+ stream: render("stream.json", %{topic: topic}),
event: "notification",
payload:
NotificationView.render(
@@ -52,8 +57,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!()
end
- def render("update.json", %Activity{} = activity) do
+ def render("update.json", %Activity{} = activity, topic) do
%{
+ stream: render("stream.json", %{topic: topic}),
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
@@ -65,8 +71,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!()
end
- def render("status_update.json", %Activity{} = activity) do
+ def render("status_update.json", %Activity{} = activity, topic) do
%{
+ stream: render("stream.json", %{topic: topic}),
event: "status.update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
@@ -78,7 +85,7 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!()
end
- def render("chat_update.json", %{chat_message_reference: cm_ref}) do
+ def render("chat_update.json", %{chat_message_reference: cm_ref}, topic) do
# Explicitly giving the cmr for the object here, so we don't accidentally
# send a later 'last_message' that was inserted between inserting this and
# streaming it out
@@ -93,6 +100,7 @@ defmodule Pleroma.Web.StreamerView do
)
%{
+ stream: render("stream.json", %{topic: topic}),
event: "pleroma:chat_update",
payload:
representation
@@ -101,8 +109,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!()
end
- def render("follow_relationships_update.json", item) do
+ def render("follow_relationships_update.json", item, topic) do
%{
+ stream: render("stream.json", %{topic: topic}),
event: "pleroma:follow_relationships_update",
payload:
%{
@@ -123,8 +132,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!()
end
- def render("conversation.json", %Participation{} = participation) do
+ def render("conversation.json", %Participation{} = participation, topic) do
%{
+ stream: render("stream.json", %{topic: topic}),
event: "conversation",
payload:
Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
@@ -135,4 +145,39 @@ defmodule Pleroma.Web.StreamerView do
}
|> Jason.encode!()
end
+
+ def render("pleroma_respond.json", %{type: type, result: result} = params) do
+ %{
+ event: "pleroma:respond",
+ payload:
+ %{
+ result: result,
+ type: type
+ }
+ |> Map.merge(maybe_error(params))
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+
+ def render("stream.json", %{topic: "user:pleroma_chat:" <> _}), do: ["user:pleroma_chat"]
+ def render("stream.json", %{topic: "user:notification:" <> _}), do: ["user:notification"]
+ def render("stream.json", %{topic: "user:" <> _}), do: ["user"]
+ def render("stream.json", %{topic: "direct:" <> _}), do: ["direct"]
+ def render("stream.json", %{topic: "list:" <> id}), do: ["list", id]
+ def render("stream.json", %{topic: "hashtag:" <> tag}), do: ["hashtag", tag]
+
+ def render("stream.json", %{topic: "public:remote:media:" <> instance}),
+ do: ["public:remote:media", instance]
+
+ def render("stream.json", %{topic: "public:remote:" <> instance}),
+ do: ["public:remote", instance]
+
+ def render("stream.json", %{topic: stream}) when stream in Pleroma.Constants.public_streams(),
+ do: [stream]
+
+ defp maybe_error(%{error: :bad_topic}), do: %{error: "bad_topic"}
+ defp maybe_error(%{error: :unauthorized}), do: %{error: "unauthorized"}
+ defp maybe_error(%{error: :already_authenticated}), do: %{error: "already_authenticated"}
+ defp maybe_error(_), do: %{}
end
diff --git a/test/pleroma/integration/mastodon_websocket_test.exs b/test/pleroma/integration/mastodon_websocket_test.exs
index 9be0445c0..a2c20f0a6 100644
--- a/test/pleroma/integration/mastodon_websocket_test.exs
+++ b/test/pleroma/integration/mastodon_websocket_test.exs
@@ -31,9 +31,22 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
WebsocketClient.start_link(self(), path, headers)
end
+ defp decode_json(json) do
+ with {:ok, %{"event" => event, "payload" => payload_text}} <- Jason.decode(json),
+ {:ok, payload} <- Jason.decode(payload_text) do
+ {:ok, %{"event" => event, "payload" => payload}}
+ end
+ end
+
+ # Turns atom keys to strings
+ defp atom_key_to_string(json) do
+ json
+ |> Jason.encode!()
+ |> Jason.decode!()
+ end
+
test "refuses invalid requests" do
capture_log(fn ->
- assert {:error, %WebSockex.RequestError{code: 404}} = start_socket()
assert {:error, %WebSockex.RequestError{code: 404}} = start_socket("?stream=ncjdk")
Process.sleep(30)
end)
@@ -49,6 +62,10 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
end)
end
+ test "allows unified stream" do
+ assert {:ok, _} = start_socket()
+ end
+
test "allows public streams without authentication" do
assert {:ok, _} = start_socket("?stream=public")
assert {:ok, _} = start_socket("?stream=public:local")
@@ -70,12 +87,143 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
view_json =
Pleroma.Web.MastodonAPI.StatusView.render("show.json", activity: activity, for: nil)
- |> Jason.encode!()
- |> Jason.decode!()
+ |> atom_key_to_string()
assert json == view_json
end
+ describe "subscribing via WebSocket" do
+ test "can subscribe" do
+ user = insert(:user)
+ {:ok, pid} = start_socket()
+ WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
+ assert_receive {:text, raw_json}, 1_000
+
+ assert {:ok,
+ %{
+ "event" => "pleroma:respond",
+ "payload" => %{"type" => "subscribe", "result" => "success"}
+ }} = decode_json(raw_json)
+
+ {:ok, activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
+
+ assert_receive {:text, raw_json}, 1_000
+ assert {:ok, json} = Jason.decode(raw_json)
+
+ assert "update" == json["event"]
+ assert json["payload"]
+ assert {:ok, json} = Jason.decode(json["payload"])
+
+ view_json =
+ Pleroma.Web.MastodonAPI.StatusView.render("show.json", activity: activity, for: nil)
+ |> Jason.encode!()
+ |> Jason.decode!()
+
+ assert json == view_json
+ end
+
+ test "can subscribe to multiple streams" do
+ user = insert(:user)
+ {:ok, pid} = start_socket()
+ WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
+ assert_receive {:text, raw_json}, 1_000
+
+ assert {:ok,
+ %{
+ "event" => "pleroma:respond",
+ "payload" => %{"type" => "subscribe", "result" => "success"}
+ }} = decode_json(raw_json)
+
+ WebsocketClient.send_text(
+ pid,
+ %{type: "subscribe", stream: "hashtag", tag: "mew"} |> Jason.encode!()
+ )
+
+ assert_receive {:text, raw_json}, 1_000
+
+ assert {:ok,
+ %{
+ "event" => "pleroma:respond",
+ "payload" => %{"type" => "subscribe", "result" => "success"}
+ }} = decode_json(raw_json)
+
+ {:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber #mew"})
+
+ assert_receive {:text, raw_json}, 1_000
+ assert {:ok, %{"stream" => stream1}} = Jason.decode(raw_json)
+ assert_receive {:text, raw_json}, 1_000
+ assert {:ok, %{"stream" => stream2}} = Jason.decode(raw_json)
+
+ streams = [stream1, stream2]
+ assert ["hashtag", "mew"] in streams
+ assert ["public"] in streams
+ end
+
+ test "won't double subscribe" do
+ user = insert(:user)
+ {:ok, pid} = start_socket()
+ WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
+ assert_receive {:text, raw_json}, 1_000
+
+ assert {:ok,
+ %{
+ "event" => "pleroma:respond",
+ "payload" => %{"type" => "subscribe", "result" => "success"}
+ }} = decode_json(raw_json)
+
+ WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
+ assert_receive {:text, raw_json}, 1_000
+
+ assert {:ok,
+ %{
+ "event" => "pleroma:respond",
+ "payload" => %{"type" => "subscribe", "result" => "ignored"}
+ }} = decode_json(raw_json)
+
+ {:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
+
+ assert_receive {:text, _}, 1_000
+ refute_receive {:text, _}, 1_000
+ end
+
+ test "rejects invalid streams" do
+ {:ok, pid} = start_socket()
+ WebsocketClient.send_text(pid, %{type: "subscribe", stream: "nonsense"} |> Jason.encode!())
+ assert_receive {:text, raw_json}, 1_000
+
+ assert {:ok,
+ %{
+ "event" => "pleroma:respond",
+ "payload" => %{"type" => "subscribe", "result" => "error", "error" => "bad_topic"}
+ }} = decode_json(raw_json)
+ end
+
+ test "can unsubscribe" do
+ user = insert(:user)
+ {:ok, pid} = start_socket()
+ WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
+ assert_receive {:text, raw_json}, 1_000
+
+ assert {:ok,
+ %{
+ "event" => "pleroma:respond",
+ "payload" => %{"type" => "subscribe", "result" => "success"}
+ }} = decode_json(raw_json)
+
+ WebsocketClient.send_text(pid, %{type: "unsubscribe", stream: "public"} |> Jason.encode!())
+ assert_receive {:text, raw_json}, 1_000
+
+ assert {:ok,
+ %{
+ "event" => "pleroma:respond",
+ "payload" => %{"type" => "unsubscribe", "result" => "success"}
+ }} = decode_json(raw_json)
+
+ {:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
+ refute_receive {:text, _}, 1_000
+ end
+ end
+
describe "with a valid user token" do
setup do
{:ok, app} =
@@ -131,6 +279,124 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
end)
end
+ test "accepts valid token on client-sent event", %{token: token} do
+ assert {:ok, pid} = start_socket()
+
+ WebsocketClient.send_text(
+ pid,
+ %{type: "pleroma:authenticate", token: token.token} |> Jason.encode!()
+ )
+
+ assert_receive {:text, raw_json}, 1_000
+
+ assert {:ok,
+ %{
+ "event" => "pleroma:respond",
+ "payload" => %{"type" => "pleroma:authenticate", "result" => "success"}
+ }} = decode_json(raw_json)
+
+ WebsocketClient.send_text(pid, %{type: "subscribe", stream: "user"} |> Jason.encode!())
+ assert_receive {:text, raw_json}, 1_000
+
+ assert {:ok,
+ %{
+ "event" => "pleroma:respond",
+ "payload" => %{"type" => "subscribe", "result" => "success"}
+ }} = decode_json(raw_json)
+ end
+
+ test "rejects invalid token on client-sent event" do
+ assert {:ok, pid} = start_socket()
+
+ WebsocketClient.send_text(
+ pid,
+ %{type: "pleroma:authenticate", token: "Something else"} |> Jason.encode!()
+ )
+
+ assert_receive {:text, raw_json}, 1_000
+
+ assert {:ok,
+ %{
+ "event" => "pleroma:respond",
+ "payload" => %{
+ "type" => "pleroma:authenticate",
+ "result" => "error",
+ "error" => "unauthorized"
+ }
+ }} = decode_json(raw_json)
+ end
+
+ test "rejects new authenticate request if already logged-in", %{token: token} do
+ assert {:ok, pid} = start_socket()
+
+ WebsocketClient.send_text(
+ pid,
+ %{type: "pleroma:authenticate", token: token.token} |> Jason.encode!()
+ )
+
+ assert_receive {:text, raw_json}, 1_000
+
+ assert {:ok,
+ %{
+ "event" => "pleroma:respond",
+ "payload" => %{"type" => "pleroma:authenticate", "result" => "success"}
+ }} = decode_json(raw_json)
+
+ WebsocketClient.send_text(
+ pid,
+ %{type: "pleroma:authenticate", token: "Something else"} |> Jason.encode!()
+ )
+
+ assert_receive {:text, raw_json}, 1_000
+
+ assert {:ok,
+ %{
+ "event" => "pleroma:respond",
+ "payload" => %{
+ "type" => "pleroma:authenticate",
+ "result" => "error",
+ "error" => "already_authenticated"
+ }
+ }} = decode_json(raw_json)
+ end
+
+ test "accepts the 'list' stream", %{token: token, user: user} do
+ posting_user = insert(:user)
+
+ {:ok, list} = Pleroma.List.create("test", user)
+ Pleroma.List.follow(list, posting_user)
+
+ assert {:ok, _} = start_socket("?stream=list&access_token=#{token.token}&list=#{list.id}")
+
+ assert {:ok, pid} = start_socket("?access_token=#{token.token}")
+
+ WebsocketClient.send_text(
+ pid,
+ %{type: "subscribe", stream: "list", list: list.id} |> Jason.encode!()
+ )
+
+ assert_receive {:text, raw_json}, 1_000
+
+ assert {:ok,
+ %{
+ "event" => "pleroma:respond",
+ "payload" => %{"type" => "subscribe", "result" => "success"}
+ }} = decode_json(raw_json)
+
+ WebsocketClient.send_text(
+ pid,
+ %{type: "subscribe", stream: "list", list: to_string(list.id)} |> Jason.encode!()
+ )
+
+ assert_receive {:text, raw_json}, 1_000
+
+ assert {:ok,
+ %{
+ "event" => "pleroma:respond",
+ "payload" => %{"type" => "subscribe", "result" => "ignored"}
+ }} = decode_json(raw_json)
+ end
+
test "disconnect when token is revoked", %{app: app, user: user, token: token} do
assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
@@ -146,5 +412,85 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
assert_receive {:close, _}
refute_receive {:close, _}
end
+
+ test "receives private statuses", %{user: reading_user, token: token} do
+ user = insert(:user)
+ CommonAPI.follow(reading_user, user)
+
+ {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
+
+ {:ok, activity} =
+ CommonAPI.post(user, %{status: "nice echo chamber", visibility: "private"})
+
+ assert_receive {:text, raw_json}, 1_000
+ assert {:ok, json} = Jason.decode(raw_json)
+
+ assert "update" == json["event"]
+ assert json["payload"]
+ assert {:ok, json} = Jason.decode(json["payload"])
+
+ view_json =
+ Pleroma.Web.MastodonAPI.StatusView.render("show.json",
+ activity: activity,
+ for: reading_user
+ )
+ |> Jason.encode!()
+ |> Jason.decode!()
+
+ assert json == view_json
+ end
+
+ test "receives edits", %{user: reading_user, token: token} do
+ user = insert(:user)
+ CommonAPI.follow(reading_user, user)
+
+ {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
+
+ {:ok, activity} =
+ CommonAPI.post(user, %{status: "nice echo chamber", visibility: "private"})
+
+ assert_receive {:text, _raw_json}, 1_000
+
+ {:ok, _} = CommonAPI.update(user, activity, %{status: "mew mew", visibility: "private"})
+
+ assert_receive {:text, raw_json}, 1_000
+
+ activity = Pleroma.Activity.normalize(activity)
+
+ view_json =
+ Pleroma.Web.MastodonAPI.StatusView.render("show.json",
+ activity: activity,
+ for: reading_user
+ )
+ |> Jason.encode!()
+ |> Jason.decode!()
+
+ assert {:ok, %{"event" => "status.update", "payload" => ^view_json}} = decode_json(raw_json)
+ end
+
+ test "receives notifications", %{user: reading_user, token: token} do
+ user = insert(:user)
+ CommonAPI.follow(reading_user, user)
+
+ {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
+
+ {:ok, %Pleroma.Activity{id: activity_id} = _activity} =
+ CommonAPI.post(user, %{
+ status: "nice echo chamber @#{reading_user.nickname}",
+ visibility: "private"
+ })
+
+ assert_receive {:text, raw_json}, 1_000
+
+ assert {:ok,
+ %{
+ "event" => "notification",
+ "payload" => %{
+ "status" => %{
+ "id" => ^activity_id
+ }
+ }
+ }} = decode_json(raw_json)
+ end
end
end
diff --git a/test/pleroma/notification_test.exs b/test/pleroma/notification_test.exs
index e55aa3a08..71af9acb8 100644
--- a/test/pleroma/notification_test.exs
+++ b/test/pleroma/notification_test.exs
@@ -252,7 +252,7 @@ defmodule Pleroma.NotificationTest do
task =
Task.async(fn ->
{:ok, _topic} = Streamer.get_topic_and_add_socket("user", user, oauth_token)
- assert_receive {:render_with_user, _, _, _}, 4_000
+ assert_receive {:render_with_user, _, _, _, _}, 4_000
end)
task_user_notification =
@@ -260,7 +260,7 @@ defmodule Pleroma.NotificationTest do
{:ok, _topic} =
Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
- assert_receive {:render_with_user, _, _, _}, 4_000
+ assert_receive {:render_with_user, _, _, _, _}, 4_000
end)
activity = insert(:note_activity)
diff --git a/test/pleroma/web/streamer_test.exs b/test/pleroma/web/streamer_test.exs
index 7ab0e379b..d85358fd4 100644
--- a/test/pleroma/web/streamer_test.exs
+++ b/test/pleroma/web/streamer_test.exs
@@ -22,6 +22,10 @@ defmodule Pleroma.Web.StreamerTest do
setup do: clear_config([:instance, :skip_thread_containment])
describe "get_topic/_ (unauthenticated)" do
+ test "allows no stream" do
+ assert {:ok, nil} = Streamer.get_topic(nil, nil, nil)
+ end
+
test "allows public" do
assert {:ok, "public"} = Streamer.get_topic("public", nil, nil)
assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil, nil)
@@ -242,7 +246,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user", user, oauth_token)
{:ok, activity} = CommonAPI.post(user, %{status: "hey"})
- assert_receive {:render_with_user, _, _, ^activity}
+ assert_receive {:render_with_user, _, _, ^activity, _}
refute Streamer.filtered_by_user?(user, activity)
end
@@ -253,7 +257,7 @@ defmodule Pleroma.Web.StreamerTest do
{:ok, activity} = CommonAPI.post(other_user, %{status: "hey"})
{:ok, announce} = CommonAPI.repeat(activity.id, user)
- assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce}
+ assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce, _}
refute Streamer.filtered_by_user?(user, announce)
end
@@ -306,7 +310,7 @@ defmodule Pleroma.Web.StreamerTest do
{:ok, %Pleroma.Activity{data: _data, local: false} = announce} =
Pleroma.Web.ActivityPub.Transmogrifier.handle_incoming(data)
- assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce}
+ assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce, _}
refute Streamer.filtered_by_user?(user, announce)
end
@@ -318,7 +322,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user", user, oauth_token)
Streamer.stream("user", notify)
- assert_receive {:render_with_user, _, _, ^notify}
+ assert_receive {:render_with_user, _, _, ^notify, _}
refute Streamer.filtered_by_user?(user, notify)
end
@@ -330,7 +334,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
Streamer.stream("user:notification", notify)
- assert_receive {:render_with_user, _, _, ^notify}
+ assert_receive {:render_with_user, _, _, ^notify, _}
refute Streamer.filtered_by_user?(user, notify)
end
@@ -351,7 +355,12 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user:pleroma_chat", user, oauth_token)
Streamer.stream("user:pleroma_chat", {user, cm_ref})
- text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
+ text =
+ StreamerView.render(
+ "chat_update.json",
+ %{chat_message_reference: cm_ref},
+ "user:pleroma_chat:#{user.id}"
+ )
assert text =~ "hey cirno"
assert_receive {:text, ^text}
@@ -369,7 +378,12 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user", user, oauth_token)
Streamer.stream("user", {user, cm_ref})
- text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
+ text =
+ StreamerView.render(
+ "chat_update.json",
+ %{chat_message_reference: cm_ref},
+ "user:#{user.id}"
+ )
assert text =~ "hey cirno"
assert_receive {:text, ^text}
@@ -390,7 +404,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
Streamer.stream("user:notification", notify)
- assert_receive {:render_with_user, _, _, ^notify}
+ assert_receive {:render_with_user, _, _, ^notify, _}
refute Streamer.filtered_by_user?(user, notify)
end
@@ -436,7 +450,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
{:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
- assert_receive {:render_with_user, _, "notification.json", notif}
+ assert_receive {:render_with_user, _, "notification.json", notif, _}
assert notif.activity.id == favorite_activity.id
refute Streamer.filtered_by_user?(user, notif)
end
@@ -465,7 +479,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
{:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user)
- assert_receive {:render_with_user, _, "notification.json", notif}
+ assert_receive {:render_with_user, _, "notification.json", notif, _}
assert notif.activity.id == follow_activity.id
refute Streamer.filtered_by_user?(user, notif)
end
@@ -530,7 +544,7 @@ defmodule Pleroma.Web.StreamerTest do
{:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew"})
create = Pleroma.Activity.get_create_by_object_ap_id_with_object(activity.object.data["id"])
- assert_receive {:render_with_user, _, "status_update.json", ^create}
+ assert_receive {:render_with_user, _, "status_update.json", ^create, _}
refute Streamer.filtered_by_user?(user, edited)
end
@@ -541,7 +555,7 @@ defmodule Pleroma.Web.StreamerTest do
{:ok, edited} = CommonAPI.update(user, activity, %{status: "mew mew"})
create = Pleroma.Activity.get_create_by_object_ap_id_with_object(activity.object.data["id"])
- assert_receive {:render_with_user, _, "status_update.json", ^create}
+ assert_receive {:render_with_user, _, "status_update.json", ^create, _}
refute Streamer.filtered_by_user?(user, edited)
end
end
@@ -554,7 +568,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("public", user, oauth_token)
{:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
- assert_receive {:render_with_user, _, _, ^activity}
+ assert_receive {:render_with_user, _, _, ^activity, _}
refute Streamer.filtered_by_user?(other_user, activity)
end
@@ -654,7 +668,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("public", user, oauth_token)
Streamer.stream("public", activity)
- assert_receive {:render_with_user, _, _, ^activity}
+ assert_receive {:render_with_user, _, _, ^activity, _}
assert Streamer.filtered_by_user?(user, activity)
end
@@ -676,7 +690,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("public", user, oauth_token)
Streamer.stream("public", activity)
- assert_receive {:render_with_user, _, _, ^activity}
+ assert_receive {:render_with_user, _, _, ^activity, _}
refute Streamer.filtered_by_user?(user, activity)
end
@@ -699,7 +713,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("public", user, oauth_token)
Streamer.stream("public", activity)
- assert_receive {:render_with_user, _, _, ^activity}
+ assert_receive {:render_with_user, _, _, ^activity, _}
refute Streamer.filtered_by_user?(user, activity)
end
end
@@ -713,7 +727,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("public", user, oauth_token)
{:ok, activity} = CommonAPI.post(blocked_user, %{status: "Test"})
- assert_receive {:render_with_user, _, _, ^activity}
+ assert_receive {:render_with_user, _, _, ^activity, _}
assert Streamer.filtered_by_user?(user, activity)
end
@@ -730,17 +744,17 @@ defmodule Pleroma.Web.StreamerTest do
{:ok, activity_one} = CommonAPI.post(friend, %{status: "hey! @#{blockee.nickname}"})
- assert_receive {:render_with_user, _, _, ^activity_one}
+ assert_receive {:render_with_user, _, _, ^activity_one, _}
assert Streamer.filtered_by_user?(blocker, activity_one)
{:ok, activity_two} = CommonAPI.post(blockee, %{status: "hey! @#{friend.nickname}"})
- assert_receive {:render_with_user, _, _, ^activity_two}
+ assert_receive {:render_with_user, _, _, ^activity_two, _}
assert Streamer.filtered_by_user?(blocker, activity_two)
{:ok, activity_three} = CommonAPI.post(blockee, %{status: "hey! @#{blocker.nickname}"})
- assert_receive {:render_with_user, _, _, ^activity_three}
+ assert_receive {:render_with_user, _, _, ^activity_three, _}
assert Streamer.filtered_by_user?(blocker, activity_three)
end
end
@@ -801,7 +815,7 @@ defmodule Pleroma.Web.StreamerTest do
visibility: "private"
})
- assert_receive {:render_with_user, _, _, ^activity}
+ assert_receive {:render_with_user, _, _, ^activity, _}
refute Streamer.filtered_by_user?(user_a, activity)
end
end
@@ -819,7 +833,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user", user1, user1_token)
{:ok, announce_activity} = CommonAPI.repeat(create_activity.id, user2)
- assert_receive {:render_with_user, _, _, ^announce_activity}
+ assert_receive {:render_with_user, _, _, ^announce_activity, _}
assert Streamer.filtered_by_user?(user1, announce_activity)
end
@@ -835,7 +849,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user", user1, user1_token)
{:ok, _announce_activity} = CommonAPI.repeat(create_activity.id, user2)
- assert_receive {:render_with_user, _, "notification.json", notif}
+ assert_receive {:render_with_user, _, "notification.json", notif, _}
assert Streamer.filtered_by_user?(user1, notif)
end
@@ -851,7 +865,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user", user1, user1_token)
{:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id)
- assert_receive {:render_with_user, _, "notification.json", notif}
+ assert_receive {:render_with_user, _, "notification.json", notif, _}
refute Streamer.filtered_by_user?(user1, notif)
end
end
@@ -866,7 +880,7 @@ defmodule Pleroma.Web.StreamerTest do
{:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
{:ok, _} = CommonAPI.add_mute(user2, activity)
- assert_receive {:render_with_user, _, _, ^activity}
+ assert_receive {:render_with_user, _, _, ^activity, _}
assert Streamer.filtered_by_user?(user2, activity)
end
end
@@ -908,7 +922,7 @@ defmodule Pleroma.Web.StreamerTest do
})
create_activity_id = create_activity.id
- assert_receive {:render_with_user, _, _, ^create_activity}
+ assert_receive {:render_with_user, _, _, ^create_activity, _}
assert_receive {:text, received_conversation1}
assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
@@ -943,8 +957,8 @@ defmodule Pleroma.Web.StreamerTest do
visibility: "direct"
})
- assert_receive {:render_with_user, _, _, ^create_activity}
- assert_receive {:render_with_user, _, _, ^create_activity2}
+ assert_receive {:render_with_user, _, _, ^create_activity, _}
+ assert_receive {:render_with_user, _, _, ^create_activity2, _}
assert_receive {:text, received_conversation1}
assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
assert_receive {:text, received_conversation1}
@@ -973,7 +987,7 @@ defmodule Pleroma.Web.StreamerTest do
receive do
{StreamerTest, :ready} ->
- assert_receive {:render_with_user, _, "update.json", _}
+ assert_receive {:render_with_user, _, "update.json", _, _}
receive do
{StreamerTest, :revoked} -> finalize.()