summaryrefslogtreecommitdiff
path: root/lib/pleroma/conversation.ex
blob: e76eb008746514f854533eb6fe4a23ff7fa15aac (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only

defmodule Pleroma.Conversation do
  alias Pleroma.Conversation.Participation
  alias Pleroma.Conversation.Participation.RecipientShip
  alias Pleroma.Repo
  alias Pleroma.User
  use Ecto.Schema
  import Ecto.Changeset

  schema "conversations" do
    # This is the context ap id.
    field(:ap_id, :string)
    has_many(:participations, Participation)
    has_many(:users, through: [:participations, :user])

    timestamps()
  end

  def creation_cng(struct, params) do
    struct
    |> cast(params, [:ap_id])
    |> validate_required([:ap_id])
    |> unique_constraint(:ap_id)
  end

  def create_for_ap_id(ap_id) do
    %__MODULE__{}
    |> creation_cng(%{ap_id: ap_id})
    |> Repo.insert(
      on_conflict: [set: [updated_at: NaiveDateTime.utc_now()]],
      returning: true,
      conflict_target: :ap_id
    )
  end

  def get_for_ap_id(ap_id) do
    Repo.get_by(__MODULE__, ap_id: ap_id)
  end

  def maybe_create_recipientships(participation, activity) do
    participation = Repo.preload(participation, :recipients)

    if participation.recipients |> Enum.empty?() do
      recipients = User.get_all_by_ap_id(activity.recipients)
      RecipientShip.create(recipients, participation)
    end
  end

  @doc """
  This will
  1. Create a conversation if there isn't one already
  2. Create a participation for all the people involved who don't have one already
  3. Bump all relevant participations to 'unread'
  """
  def create_or_bump_for(activity, opts \\ []) do
    with true <- Pleroma.Web.ActivityPub.Visibility.is_direct?(activity),
         "Create" <- activity.data["type"],
         object <- Pleroma.Object.normalize(activity),
         true <- object.data["type"] in ["Note", "Question"],
         ap_id when is_binary(ap_id) and byte_size(ap_id) > 0 <- object.data["context"] do
      {:ok, conversation} = create_for_ap_id(ap_id)

      users = User.get_users_from_set(activity.recipients, local_only: false)

      participations =
        Enum.map(users, fn user ->
          invisible_conversation = Enum.any?(users, &User.blocks?(user, &1))

          unless invisible_conversation do
            User.increment_unread_conversation_count(conversation, user)
          end

          opts = Keyword.put(opts, :invisible_conversation, invisible_conversation)

          {:ok, participation} =
            Participation.create_for_user_and_conversation(user, conversation, opts)

          maybe_create_recipientships(participation, activity)
          participation
        end)

      {:ok,
       %{
         conversation
         | participations: participations
       }}
    else
      e -> {:error, e}
    end
  end

  @doc """
  This is only meant to be run by a mix task. It creates conversations/participations for all direct messages in the database.
  """
  def bump_for_all_activities do
    stream =
      Pleroma.Web.ActivityPub.ActivityPub.fetch_direct_messages_query()
      |> Repo.stream()

    Repo.transaction(
      fn ->
        stream
        |> Enum.each(fn a -> create_or_bump_for(a, read: true) end)
      end,
      timeout: :infinity
    )
  end
end