logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://hacktivis.me/git/pleroma.git

streamer.ex (12918B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.Streamer do
  5. require Logger
  6. require Pleroma.Constants
  7. alias Pleroma.Activity
  8. alias Pleroma.Chat.MessageReference
  9. alias Pleroma.Config
  10. alias Pleroma.Conversation.Participation
  11. alias Pleroma.Notification
  12. alias Pleroma.Object
  13. alias Pleroma.User
  14. alias Pleroma.Web.ActivityPub.ActivityPub
  15. alias Pleroma.Web.ActivityPub.Visibility
  16. alias Pleroma.Web.CommonAPI
  17. alias Pleroma.Web.OAuth.Token
  18. alias Pleroma.Web.Plugs.OAuthScopesPlug
  19. alias Pleroma.Web.StreamerView
  20. @registry Pleroma.Web.StreamerRegistry
  21. def registry, do: @registry
  22. @public_streams Pleroma.Constants.public_streams()
  23. @local_streams ["public:local", "public:local:media"]
  24. @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
  25. @doc "Expands and authorizes a stream, and registers the process for streaming."
  26. @spec get_topic_and_add_socket(
  27. stream :: String.t(),
  28. User.t() | nil,
  29. Token.t() | nil,
  30. map() | nil
  31. ) ::
  32. {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
  33. def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
  34. with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
  35. add_socket(topic, oauth_token)
  36. end
  37. end
  38. defp can_access_stream(user, oauth_token, kind) do
  39. with {_, true} <- {:restrict?, Config.restrict_unauthenticated_access?(:timelines, kind)},
  40. {_, %User{id: user_id}, %Token{user_id: user_id}} <- {:user, user, oauth_token},
  41. {_, true} <-
  42. {:scopes,
  43. OAuthScopesPlug.filter_descendants(["read:statuses"], oauth_token.scopes) != []} do
  44. true
  45. else
  46. {:restrict?, _} ->
  47. true
  48. _ ->
  49. false
  50. end
  51. end
  52. @doc "Expand and authorizes a stream"
  53. @spec get_topic(stream :: String.t() | nil, User.t() | nil, Token.t() | nil, map()) ::
  54. {:ok, topic :: String.t() | nil} | {:error, :bad_topic}
  55. def get_topic(stream, user, oauth_token, params \\ %{})
  56. def get_topic(nil = _stream, _user, _oauth_token, _params) do
  57. {:ok, nil}
  58. end
  59. # Allow all public steams if the instance allows unauthenticated access.
  60. # Otherwise, only allow users with valid oauth tokens.
  61. def get_topic(stream, user, oauth_token, _params) when stream in @public_streams do
  62. kind = if stream in @local_streams, do: :local, else: :federated
  63. if can_access_stream(user, oauth_token, kind) do
  64. {:ok, stream}
  65. else
  66. {:error, :unauthorized}
  67. end
  68. end
  69. # Allow all hashtags streams.
  70. def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
  71. {:ok, "hashtag:" <> tag}
  72. end
  73. # Allow remote instance streams.
  74. def get_topic("public:remote", user, oauth_token, %{"instance" => instance} = _params) do
  75. if can_access_stream(user, oauth_token, :federated) do
  76. {:ok, "public:remote:" <> instance}
  77. else
  78. {:error, :unauthorized}
  79. end
  80. end
  81. def get_topic("public:remote:media", user, oauth_token, %{"instance" => instance} = _params) do
  82. if can_access_stream(user, oauth_token, :federated) do
  83. {:ok, "public:remote:media:" <> instance}
  84. else
  85. {:error, :unauthorized}
  86. end
  87. end
  88. # Expand user streams.
  89. def get_topic(
  90. stream,
  91. %User{id: user_id} = user,
  92. %Token{user_id: user_id} = oauth_token,
  93. _params
  94. )
  95. when stream in @user_streams do
  96. # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
  97. required_scopes =
  98. if stream == "user:notification" do
  99. ["read:notifications"]
  100. else
  101. ["read:statuses"]
  102. end
  103. if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
  104. {:error, :unauthorized}
  105. else
  106. {:ok, stream <> ":" <> to_string(user.id)}
  107. end
  108. end
  109. def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
  110. {:error, :unauthorized}
  111. end
  112. # List streams.
  113. def get_topic(
  114. "list",
  115. %User{id: user_id} = user,
  116. %Token{user_id: user_id} = oauth_token,
  117. %{"list" => id}
  118. ) do
  119. cond do
  120. OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
  121. {:error, :unauthorized}
  122. Pleroma.List.get(id, user) ->
  123. {:ok, "list:" <> to_string(id)}
  124. true ->
  125. {:error, :bad_topic}
  126. end
  127. end
  128. def get_topic("list", _user, _oauth_token, _params) do
  129. {:error, :unauthorized}
  130. end
  131. def get_topic(_stream, _user, _oauth_token, _params) do
  132. {:error, :bad_topic}
  133. end
  134. @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
  135. def add_socket(topic, oauth_token) do
  136. if should_env_send?() do
  137. oauth_token_id = if oauth_token, do: oauth_token.id, else: false
  138. Registry.register(@registry, topic, oauth_token_id)
  139. end
  140. {:ok, topic}
  141. end
  142. def remove_socket(topic) do
  143. if should_env_send?(), do: Registry.unregister(@registry, topic)
  144. end
  145. def stream(topics, items) do
  146. if should_env_send?() do
  147. for topic <- List.wrap(topics), item <- List.wrap(items) do
  148. spawn(fn -> do_stream(topic, item) end)
  149. end
  150. end
  151. end
  152. def filtered_by_user?(user, item, streamed_type \\ :activity)
  153. def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
  154. %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
  155. User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
  156. recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
  157. recipients = MapSet.new(item.recipients)
  158. domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
  159. with parent <- Object.normalize(item, fetch: false) || item,
  160. true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
  161. true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
  162. true <-
  163. !(streamed_type == :activity && item.data["type"] == "Announce" &&
  164. parent.data["actor"] == user.ap_id),
  165. true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
  166. true <- MapSet.disjoint?(recipients, recipient_blocks),
  167. %{host: item_host} <- URI.parse(item.actor),
  168. %{host: parent_host} <- URI.parse(parent.data["actor"]),
  169. false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
  170. false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
  171. true <- thread_containment(item, user),
  172. false <- CommonAPI.thread_muted?(user, parent) do
  173. false
  174. else
  175. _ -> true
  176. end
  177. end
  178. def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
  179. filtered_by_user?(user, activity, :notification)
  180. end
  181. defp do_stream("direct", item) do
  182. recipient_topics =
  183. User.get_recipients_from_activity(item)
  184. |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
  185. Enum.each(recipient_topics, fn user_topic ->
  186. Logger.debug("Trying to push direct message to #{user_topic}\n\n")
  187. push_to_socket(user_topic, item)
  188. end)
  189. end
  190. defp do_stream("follow_relationship", item) do
  191. user_topic = "user:#{item.follower.id}"
  192. text = StreamerView.render("follow_relationships_update.json", item, user_topic)
  193. Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
  194. Registry.dispatch(@registry, user_topic, fn list ->
  195. Enum.each(list, fn {pid, _auth} ->
  196. send(pid, {:text, text})
  197. end)
  198. end)
  199. end
  200. defp do_stream("participation", participation) do
  201. user_topic = "direct:#{participation.user_id}"
  202. Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
  203. push_to_socket(user_topic, participation)
  204. end
  205. defp do_stream("list", item) do
  206. # filter the recipient list if the activity is not public, see #270.
  207. recipient_lists =
  208. case Visibility.public?(item) do
  209. true ->
  210. Pleroma.List.get_lists_from_activity(item)
  211. _ ->
  212. Pleroma.List.get_lists_from_activity(item)
  213. |> Enum.filter(fn list ->
  214. owner = User.get_cached_by_id(list.user_id)
  215. Visibility.visible_for_user?(item, owner)
  216. end)
  217. end
  218. recipient_topics =
  219. recipient_lists
  220. |> Enum.map(fn %{id: id} -> "list:#{id}" end)
  221. Enum.each(recipient_topics, fn list_topic ->
  222. Logger.debug("Trying to push message to #{list_topic}\n\n")
  223. push_to_socket(list_topic, item)
  224. end)
  225. end
  226. defp do_stream(topic, %Notification{} = item)
  227. when topic in ["user", "user:notification"] do
  228. user_topic = "#{topic}:#{item.user_id}"
  229. Registry.dispatch(@registry, user_topic, fn list ->
  230. Enum.each(list, fn {pid, _auth} ->
  231. send(pid, {:render_with_user, StreamerView, "notification.json", item, user_topic})
  232. end)
  233. end)
  234. end
  235. defp do_stream(topic, {user, %MessageReference{} = cm_ref})
  236. when topic in ["user", "user:pleroma_chat"] do
  237. topic = "#{topic}:#{user.id}"
  238. text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}, topic)
  239. Registry.dispatch(@registry, topic, fn list ->
  240. Enum.each(list, fn {pid, _auth} ->
  241. send(pid, {:text, text})
  242. end)
  243. end)
  244. end
  245. defp do_stream("user", item) do
  246. Logger.debug("Trying to push to users")
  247. recipient_topics =
  248. User.get_recipients_from_activity(item)
  249. |> Enum.map(fn %{id: id} -> "user:#{id}" end)
  250. Enum.each(recipient_topics, fn topic ->
  251. push_to_socket(topic, item)
  252. end)
  253. end
  254. defp do_stream(topic, item) do
  255. Logger.debug("Trying to push to #{topic}")
  256. Logger.debug("Pushing item to #{topic}")
  257. push_to_socket(topic, item)
  258. end
  259. defp push_to_socket(topic, %Participation{} = participation) do
  260. rendered = StreamerView.render("conversation.json", participation, topic)
  261. Registry.dispatch(@registry, topic, fn list ->
  262. Enum.each(list, fn {pid, _} ->
  263. send(pid, {:text, rendered})
  264. end)
  265. end)
  266. end
  267. defp push_to_socket(topic, %Activity{
  268. data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
  269. }) do
  270. rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
  271. Registry.dispatch(@registry, topic, fn list ->
  272. Enum.each(list, fn {pid, _} ->
  273. send(pid, {:text, rendered})
  274. end)
  275. end)
  276. end
  277. defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
  278. defp push_to_socket(topic, %Activity{data: %{"type" => "Update"}} = item) do
  279. create_activity =
  280. Pleroma.Activity.get_create_by_object_ap_id(item.object.data["id"])
  281. |> Map.put(:object, item.object)
  282. anon_render = StreamerView.render("status_update.json", create_activity, topic)
  283. Registry.dispatch(@registry, topic, fn list ->
  284. Enum.each(list, fn {pid, auth?} ->
  285. if auth? do
  286. send(
  287. pid,
  288. {:render_with_user, StreamerView, "status_update.json", create_activity, topic}
  289. )
  290. else
  291. send(pid, {:text, anon_render})
  292. end
  293. end)
  294. end)
  295. end
  296. defp push_to_socket(topic, item) do
  297. anon_render = StreamerView.render("update.json", item, topic)
  298. Registry.dispatch(@registry, topic, fn list ->
  299. Enum.each(list, fn {pid, auth?} ->
  300. if auth? do
  301. send(pid, {:render_with_user, StreamerView, "update.json", item, topic})
  302. else
  303. send(pid, {:text, anon_render})
  304. end
  305. end)
  306. end)
  307. end
  308. defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
  309. defp thread_containment(activity, user) do
  310. if Config.get([:instance, :skip_thread_containment]) do
  311. true
  312. else
  313. ActivityPub.contain_activity(activity, user)
  314. end
  315. end
  316. def close_streams_by_oauth_token(oauth_token) do
  317. if should_env_send?() do
  318. Registry.select(
  319. @registry,
  320. [
  321. {
  322. {:"$1", :"$2", :"$3"},
  323. [{:==, :"$3", oauth_token.id}],
  324. [:"$2"]
  325. }
  326. ]
  327. )
  328. |> Enum.each(fn pid -> send(pid, :close) end)
  329. end
  330. end
  331. # In dev/prod the streamer registry is expected to be started, so return true
  332. # In test it is possible to have the registry started for a test so it will check
  333. # In benchmark it will never find the process alive and return false
  334. def should_env_send? do
  335. if Application.get_env(:pleroma, Pleroma.Application)[:streamer_registry] do
  336. true
  337. else
  338. case Process.whereis(@registry) do
  339. nil ->
  340. false
  341. pid ->
  342. Process.alive?(pid)
  343. end
  344. end
  345. end
  346. end