logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma

streamer.ex (10135B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.Streamer do
  5. require Logger
  6. alias Pleroma.Activity
  7. alias Pleroma.Chat.MessageReference
  8. alias Pleroma.Config
  9. alias Pleroma.Conversation.Participation
  10. alias Pleroma.Notification
  11. alias Pleroma.Object
  12. alias Pleroma.Plugs.OAuthScopesPlug
  13. alias Pleroma.User
  14. alias Pleroma.Web.ActivityPub.ActivityPub
  15. alias Pleroma.Web.ActivityPub.Visibility
  16. alias Pleroma.Web.CommonAPI
  17. alias Pleroma.Web.OAuth.Token
  18. alias Pleroma.Web.StreamerView
  19. @mix_env Mix.env()
  20. @registry Pleroma.Web.StreamerRegistry
  21. def registry, do: @registry
  22. @public_streams ["public", "public:local", "public:media", "public:local:media"]
  23. @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
  24. @doc "Expands and authorizes a stream, and registers the process for streaming."
  25. @spec get_topic_and_add_socket(
  26. stream :: String.t(),
  27. User.t() | nil,
  28. Token.t() | nil,
  29. Map.t() | nil
  30. ) ::
  31. {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
  32. def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
  33. case get_topic(stream, user, oauth_token, params) do
  34. {:ok, topic} -> add_socket(topic, user)
  35. error -> error
  36. end
  37. end
  38. @doc "Expand and authorizes a stream"
  39. @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
  40. {:ok, topic :: String.t()} | {:error, :bad_topic}
  41. def get_topic(stream, user, oauth_token, params \\ %{})
  42. # Allow all public steams.
  43. def get_topic(stream, _user, _oauth_token, _params) when stream in @public_streams do
  44. {:ok, stream}
  45. end
  46. # Allow all hashtags streams.
  47. def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
  48. {:ok, "hashtag:" <> tag}
  49. end
  50. # Expand user streams.
  51. def get_topic(
  52. stream,
  53. %User{id: user_id} = user,
  54. %Token{user_id: token_user_id} = oauth_token,
  55. _params
  56. )
  57. when stream in @user_streams and user_id == token_user_id do
  58. # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
  59. required_scopes =
  60. if stream == "user:notification" do
  61. ["read:notifications"]
  62. else
  63. ["read:statuses"]
  64. end
  65. if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
  66. {:error, :unauthorized}
  67. else
  68. {:ok, stream <> ":" <> to_string(user.id)}
  69. end
  70. end
  71. def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
  72. {:error, :unauthorized}
  73. end
  74. # List streams.
  75. def get_topic(
  76. "list",
  77. %User{id: user_id} = user,
  78. %Token{user_id: token_user_id} = oauth_token,
  79. %{"list" => id}
  80. )
  81. when user_id == token_user_id do
  82. cond do
  83. OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
  84. {:error, :unauthorized}
  85. Pleroma.List.get(id, user) ->
  86. {:ok, "list:" <> to_string(id)}
  87. true ->
  88. {:error, :bad_topic}
  89. end
  90. end
  91. def get_topic("list", _user, _oauth_token, _params) do
  92. {:error, :unauthorized}
  93. end
  94. def get_topic(_stream, _user, _oauth_token, _params) do
  95. {:error, :bad_topic}
  96. end
  97. @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
  98. def add_socket(topic, user) do
  99. if should_env_send?() do
  100. auth? = if user, do: true
  101. Registry.register(@registry, topic, auth?)
  102. end
  103. {:ok, topic}
  104. end
  105. def remove_socket(topic) do
  106. if should_env_send?(), do: Registry.unregister(@registry, topic)
  107. end
  108. def stream(topics, items) do
  109. if should_env_send?() do
  110. List.wrap(topics)
  111. |> Enum.each(fn topic ->
  112. List.wrap(items)
  113. |> Enum.each(fn item ->
  114. spawn(fn -> do_stream(topic, item) end)
  115. end)
  116. end)
  117. end
  118. :ok
  119. end
  120. def filtered_by_user?(user, item, streamed_type \\ :activity)
  121. def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
  122. %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
  123. User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
  124. recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
  125. recipients = MapSet.new(item.recipients)
  126. domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
  127. with parent <- Object.normalize(item) || item,
  128. true <-
  129. Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
  130. true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
  131. true <-
  132. !(streamed_type == :activity && item.data["type"] == "Announce" &&
  133. parent.data["actor"] == user.ap_id),
  134. true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
  135. true <- MapSet.disjoint?(recipients, recipient_blocks),
  136. %{host: item_host} <- URI.parse(item.actor),
  137. %{host: parent_host} <- URI.parse(parent.data["actor"]),
  138. false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
  139. false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
  140. true <- thread_containment(item, user),
  141. false <- CommonAPI.thread_muted?(user, parent) do
  142. false
  143. else
  144. _ -> true
  145. end
  146. end
  147. def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
  148. filtered_by_user?(user, activity, :notification)
  149. end
  150. defp do_stream("direct", item) do
  151. recipient_topics =
  152. User.get_recipients_from_activity(item)
  153. |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
  154. Enum.each(recipient_topics, fn user_topic ->
  155. Logger.debug("Trying to push direct message to #{user_topic}\n\n")
  156. push_to_socket(user_topic, item)
  157. end)
  158. end
  159. defp do_stream("participation", participation) do
  160. user_topic = "direct:#{participation.user_id}"
  161. Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
  162. push_to_socket(user_topic, participation)
  163. end
  164. defp do_stream("list", item) do
  165. # filter the recipient list if the activity is not public, see #270.
  166. recipient_lists =
  167. case Visibility.is_public?(item) do
  168. true ->
  169. Pleroma.List.get_lists_from_activity(item)
  170. _ ->
  171. Pleroma.List.get_lists_from_activity(item)
  172. |> Enum.filter(fn list ->
  173. owner = User.get_cached_by_id(list.user_id)
  174. Visibility.visible_for_user?(item, owner)
  175. end)
  176. end
  177. recipient_topics =
  178. recipient_lists
  179. |> Enum.map(fn %{id: id} -> "list:#{id}" end)
  180. Enum.each(recipient_topics, fn list_topic ->
  181. Logger.debug("Trying to push message to #{list_topic}\n\n")
  182. push_to_socket(list_topic, item)
  183. end)
  184. end
  185. defp do_stream(topic, %Notification{} = item)
  186. when topic in ["user", "user:notification"] do
  187. Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
  188. Enum.each(list, fn {pid, _auth} ->
  189. send(pid, {:render_with_user, StreamerView, "notification.json", item})
  190. end)
  191. end)
  192. end
  193. defp do_stream(topic, {user, %MessageReference{} = cm_ref})
  194. when topic in ["user", "user:pleroma_chat"] do
  195. topic = "#{topic}:#{user.id}"
  196. text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
  197. Registry.dispatch(@registry, topic, fn list ->
  198. Enum.each(list, fn {pid, _auth} ->
  199. send(pid, {:text, text})
  200. end)
  201. end)
  202. end
  203. defp do_stream("user", item) do
  204. Logger.debug("Trying to push to users")
  205. recipient_topics =
  206. User.get_recipients_from_activity(item)
  207. |> Enum.map(fn %{id: id} -> "user:#{id}" end)
  208. Enum.each(recipient_topics, fn topic ->
  209. push_to_socket(topic, item)
  210. end)
  211. end
  212. defp do_stream(topic, item) do
  213. Logger.debug("Trying to push to #{topic}")
  214. Logger.debug("Pushing item to #{topic}")
  215. push_to_socket(topic, item)
  216. end
  217. defp push_to_socket(topic, %Participation{} = participation) do
  218. rendered = StreamerView.render("conversation.json", participation)
  219. Registry.dispatch(@registry, topic, fn list ->
  220. Enum.each(list, fn {pid, _} ->
  221. send(pid, {:text, rendered})
  222. end)
  223. end)
  224. end
  225. defp push_to_socket(topic, %Activity{
  226. data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
  227. }) do
  228. rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
  229. Registry.dispatch(@registry, topic, fn list ->
  230. Enum.each(list, fn {pid, _} ->
  231. send(pid, {:text, rendered})
  232. end)
  233. end)
  234. end
  235. defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
  236. defp push_to_socket(topic, item) do
  237. anon_render = StreamerView.render("update.json", item)
  238. Registry.dispatch(@registry, topic, fn list ->
  239. Enum.each(list, fn {pid, auth?} ->
  240. if auth? do
  241. send(pid, {:render_with_user, StreamerView, "update.json", item})
  242. else
  243. send(pid, {:text, anon_render})
  244. end
  245. end)
  246. end)
  247. end
  248. defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
  249. defp thread_containment(activity, user) do
  250. if Config.get([:instance, :skip_thread_containment]) do
  251. true
  252. else
  253. ActivityPub.contain_activity(activity, user)
  254. end
  255. end
  256. # In test environement, only return true if the registry is started.
  257. # In benchmark environment, returns false.
  258. # In any other environment, always returns true.
  259. cond do
  260. @mix_env == :test ->
  261. def should_env_send? do
  262. case Process.whereis(@registry) do
  263. nil ->
  264. false
  265. pid ->
  266. Process.alive?(pid)
  267. end
  268. end
  269. @mix_env == :benchmark ->
  270. def should_env_send?, do: false
  271. true ->
  272. def should_env_send?, do: true
  273. end
  274. end