worker.ex (7019B)
1 # Pleroma: A lightweight social networking server 2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> 3 # SPDX-License-Identifier: AGPL-3.0-only 4 5 defmodule Pleroma.Web.Streamer.Worker do 6 use GenServer 7 8 require Logger 9 10 alias Pleroma.Activity 11 alias Pleroma.Config 12 alias Pleroma.Conversation.Participation 13 alias Pleroma.Notification 14 alias Pleroma.Object 15 alias Pleroma.User 16 alias Pleroma.Web.ActivityPub.ActivityPub 17 alias Pleroma.Web.ActivityPub.Visibility 18 alias Pleroma.Web.CommonAPI 19 alias Pleroma.Web.Streamer.State 20 alias Pleroma.Web.Streamer.StreamerSocket 21 alias Pleroma.Web.StreamerView 22 23 def start_link(_) do 24 GenServer.start_link(__MODULE__, %{}, []) 25 end 26 27 def init(init_arg) do 28 {:ok, init_arg} 29 end 30 31 def stream(pid, topics, items) do 32 GenServer.call(pid, {:stream, topics, items}) 33 end 34 35 def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do 36 Enum.each(topics, fn t -> 37 do_stream(%{topic: t, item: item}) 38 end) 39 40 {:reply, state, state} 41 end 42 43 def handle_call({:stream, topic, items}, _from, state) when is_list(items) do 44 Enum.each(items, fn i -> 45 do_stream(%{topic: topic, item: i}) 46 end) 47 48 {:reply, state, state} 49 end 50 51 def handle_call({:stream, topic, item}, _from, state) do 52 do_stream(%{topic: topic, item: item}) 53 54 {:reply, state, state} 55 end 56 57 defp do_stream(%{topic: "direct", item: item}) do 58 recipient_topics = 59 User.get_recipients_from_activity(item) 60 |> Enum.map(fn %{id: id} -> "direct:#{id}" end) 61 62 Enum.each(recipient_topics, fn user_topic -> 63 Logger.debug("Trying to push direct message to #{user_topic}\n\n") 64 push_to_socket(State.get_sockets(), user_topic, item) 65 end) 66 end 67 68 defp do_stream(%{topic: "participation", item: participation}) do 69 user_topic = "direct:#{participation.user_id}" 70 Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") 71 72 push_to_socket(State.get_sockets(), user_topic, participation) 73 end 74 75 defp do_stream(%{topic: "list", item: item}) do 76 # filter the recipient list if the activity is not public, see #270. 77 recipient_lists = 78 case Visibility.is_public?(item) do 79 true -> 80 Pleroma.List.get_lists_from_activity(item) 81 82 _ -> 83 Pleroma.List.get_lists_from_activity(item) 84 |> Enum.filter(fn list -> 85 owner = User.get_cached_by_id(list.user_id) 86 87 Visibility.visible_for_user?(item, owner) 88 end) 89 end 90 91 recipient_topics = 92 recipient_lists 93 |> Enum.map(fn %{id: id} -> "list:#{id}" end) 94 95 Enum.each(recipient_topics, fn list_topic -> 96 Logger.debug("Trying to push message to #{list_topic}\n\n") 97 push_to_socket(State.get_sockets(), list_topic, item) 98 end) 99 end 100 101 defp do_stream(%{topic: topic, item: %Notification{} = item}) 102 when topic in ["user", "user:notification"] do 103 State.get_sockets() 104 |> Map.get("#{topic}:#{item.user_id}", []) 105 |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} -> 106 with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id), 107 true <- should_send?(user, item) do 108 send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)}) 109 end 110 end) 111 end 112 113 defp do_stream(%{topic: "user", item: item}) do 114 Logger.debug("Trying to push to users") 115 116 recipient_topics = 117 User.get_recipients_from_activity(item) 118 |> Enum.map(fn %{id: id} -> "user:#{id}" end) 119 120 Enum.each(recipient_topics, fn topic -> 121 push_to_socket(State.get_sockets(), topic, item) 122 end) 123 end 124 125 defp do_stream(%{topic: topic, item: item}) do 126 Logger.debug("Trying to push to #{topic}") 127 Logger.debug("Pushing item to #{topic}") 128 push_to_socket(State.get_sockets(), topic, item) 129 end 130 131 defp should_send?(%User{} = user, %Activity{} = item) do 132 %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} = 133 User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute]) 134 135 recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids) 136 recipients = MapSet.new(item.recipients) 137 domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks) 138 139 with parent <- Object.normalize(item) || item, 140 true <- 141 Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)), 142 true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids, 143 true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)), 144 true <- MapSet.disjoint?(recipients, recipient_blocks), 145 %{host: item_host} <- URI.parse(item.actor), 146 %{host: parent_host} <- URI.parse(parent.data["actor"]), 147 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), 148 false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), 149 true <- thread_containment(item, user), 150 false <- CommonAPI.thread_muted?(user, item) do 151 true 152 else 153 _ -> false 154 end 155 end 156 157 defp should_send?(%User{} = user, %Notification{activity: activity}) do 158 should_send?(user, activity) 159 end 160 161 def push_to_socket(topics, topic, %Participation{} = participation) do 162 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> 163 send(transport_pid, {:text, StreamerView.render("conversation.json", participation)}) 164 end) 165 end 166 167 def push_to_socket(topics, topic, %Activity{ 168 data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} 169 }) do 170 Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> 171 send( 172 transport_pid, 173 {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()} 174 ) 175 end) 176 end 177 178 def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop 179 180 def push_to_socket(topics, topic, item) do 181 Enum.each(topics[topic] || [], fn %StreamerSocket{ 182 transport_pid: transport_pid, 183 user: socket_user 184 } -> 185 # Get the current user so we have up-to-date blocks etc. 186 if socket_user do 187 user = User.get_cached_by_ap_id(socket_user.ap_id) 188 189 if should_send?(user, item) do 190 send(transport_pid, {:text, StreamerView.render("update.json", item, user)}) 191 end 192 else 193 send(transport_pid, {:text, StreamerView.render("update.json", item)}) 194 end 195 end) 196 end 197 198 @spec thread_containment(Activity.t(), User.t()) :: boolean() 199 defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true 200 201 defp thread_containment(activity, user) do 202 if Config.get([:instance, :skip_thread_containment]) do 203 true 204 else 205 ActivityPub.contain_activity(activity, user) 206 end 207 end 208 end