logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma

worker.ex (7019B)


      1 # Pleroma: A lightweight social networking server
      2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
      3 # SPDX-License-Identifier: AGPL-3.0-only
      4 
      5 defmodule Pleroma.Web.Streamer.Worker do
      6   use GenServer
      7 
      8   require Logger
      9 
     10   alias Pleroma.Activity
     11   alias Pleroma.Config
     12   alias Pleroma.Conversation.Participation
     13   alias Pleroma.Notification
     14   alias Pleroma.Object
     15   alias Pleroma.User
     16   alias Pleroma.Web.ActivityPub.ActivityPub
     17   alias Pleroma.Web.ActivityPub.Visibility
     18   alias Pleroma.Web.CommonAPI
     19   alias Pleroma.Web.Streamer.State
     20   alias Pleroma.Web.Streamer.StreamerSocket
     21   alias Pleroma.Web.StreamerView
     22 
     23   def start_link(_) do
     24     GenServer.start_link(__MODULE__, %{}, [])
     25   end
     26 
     27   def init(init_arg) do
     28     {:ok, init_arg}
     29   end
     30 
     31   def stream(pid, topics, items) do
     32     GenServer.call(pid, {:stream, topics, items})
     33   end
     34 
     35   def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
     36     Enum.each(topics, fn t ->
     37       do_stream(%{topic: t, item: item})
     38     end)
     39 
     40     {:reply, state, state}
     41   end
     42 
     43   def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
     44     Enum.each(items, fn i ->
     45       do_stream(%{topic: topic, item: i})
     46     end)
     47 
     48     {:reply, state, state}
     49   end
     50 
     51   def handle_call({:stream, topic, item}, _from, state) do
     52     do_stream(%{topic: topic, item: item})
     53 
     54     {:reply, state, state}
     55   end
     56 
     57   defp do_stream(%{topic: "direct", item: item}) do
     58     recipient_topics =
     59       User.get_recipients_from_activity(item)
     60       |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
     61 
     62     Enum.each(recipient_topics, fn user_topic ->
     63       Logger.debug("Trying to push direct message to #{user_topic}\n\n")
     64       push_to_socket(State.get_sockets(), user_topic, item)
     65     end)
     66   end
     67 
     68   defp do_stream(%{topic: "participation", item: participation}) do
     69     user_topic = "direct:#{participation.user_id}"
     70     Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
     71 
     72     push_to_socket(State.get_sockets(), user_topic, participation)
     73   end
     74 
     75   defp do_stream(%{topic: "list", item: item}) do
     76     # filter the recipient list if the activity is not public, see #270.
     77     recipient_lists =
     78       case Visibility.is_public?(item) do
     79         true ->
     80           Pleroma.List.get_lists_from_activity(item)
     81 
     82         _ ->
     83           Pleroma.List.get_lists_from_activity(item)
     84           |> Enum.filter(fn list ->
     85             owner = User.get_cached_by_id(list.user_id)
     86 
     87             Visibility.visible_for_user?(item, owner)
     88           end)
     89       end
     90 
     91     recipient_topics =
     92       recipient_lists
     93       |> Enum.map(fn %{id: id} -> "list:#{id}" end)
     94 
     95     Enum.each(recipient_topics, fn list_topic ->
     96       Logger.debug("Trying to push message to #{list_topic}\n\n")
     97       push_to_socket(State.get_sockets(), list_topic, item)
     98     end)
     99   end
    100 
    101   defp do_stream(%{topic: topic, item: %Notification{} = item})
    102        when topic in ["user", "user:notification"] do
    103     State.get_sockets()
    104     |> Map.get("#{topic}:#{item.user_id}", [])
    105     |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
    106       with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
    107            true <- should_send?(user, item) do
    108         send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
    109       end
    110     end)
    111   end
    112 
    113   defp do_stream(%{topic: "user", item: item}) do
    114     Logger.debug("Trying to push to users")
    115 
    116     recipient_topics =
    117       User.get_recipients_from_activity(item)
    118       |> Enum.map(fn %{id: id} -> "user:#{id}" end)
    119 
    120     Enum.each(recipient_topics, fn topic ->
    121       push_to_socket(State.get_sockets(), topic, item)
    122     end)
    123   end
    124 
    125   defp do_stream(%{topic: topic, item: item}) do
    126     Logger.debug("Trying to push to #{topic}")
    127     Logger.debug("Pushing item to #{topic}")
    128     push_to_socket(State.get_sockets(), topic, item)
    129   end
    130 
    131   defp should_send?(%User{} = user, %Activity{} = item) do
    132     %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
    133       User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
    134 
    135     recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
    136     recipients = MapSet.new(item.recipients)
    137     domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
    138 
    139     with parent <- Object.normalize(item) || item,
    140          true <-
    141            Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
    142          true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
    143          true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
    144          true <- MapSet.disjoint?(recipients, recipient_blocks),
    145          %{host: item_host} <- URI.parse(item.actor),
    146          %{host: parent_host} <- URI.parse(parent.data["actor"]),
    147          false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
    148          false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
    149          true <- thread_containment(item, user),
    150          false <- CommonAPI.thread_muted?(user, item) do
    151       true
    152     else
    153       _ -> false
    154     end
    155   end
    156 
    157   defp should_send?(%User{} = user, %Notification{activity: activity}) do
    158     should_send?(user, activity)
    159   end
    160 
    161   def push_to_socket(topics, topic, %Participation{} = participation) do
    162     Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
    163       send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
    164     end)
    165   end
    166 
    167   def push_to_socket(topics, topic, %Activity{
    168         data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
    169       }) do
    170     Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
    171       send(
    172         transport_pid,
    173         {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
    174       )
    175     end)
    176   end
    177 
    178   def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
    179 
    180   def push_to_socket(topics, topic, item) do
    181     Enum.each(topics[topic] || [], fn %StreamerSocket{
    182                                         transport_pid: transport_pid,
    183                                         user: socket_user
    184                                       } ->
    185       # Get the current user so we have up-to-date blocks etc.
    186       if socket_user do
    187         user = User.get_cached_by_ap_id(socket_user.ap_id)
    188 
    189         if should_send?(user, item) do
    190           send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
    191         end
    192       else
    193         send(transport_pid, {:text, StreamerView.render("update.json", item)})
    194       end
    195     end)
    196   end
    197 
    198   @spec thread_containment(Activity.t(), User.t()) :: boolean()
    199   defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
    200 
    201   defp thread_containment(activity, user) do
    202     if Config.get([:instance, :skip_thread_containment]) do
    203       true
    204     else
    205       ActivityPub.contain_activity(activity, user)
    206     end
    207   end
    208 end