logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://anongit.hacktivis.me/git/pleroma.git/

streamer.ex (13366B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.Streamer do
  5. require Logger
  6. require Pleroma.Constants
  7. alias Pleroma.Activity
  8. alias Pleroma.Chat.MessageReference
  9. alias Pleroma.Config
  10. alias Pleroma.Conversation.Participation
  11. alias Pleroma.Notification
  12. alias Pleroma.Object
  13. alias Pleroma.User
  14. alias Pleroma.Web.ActivityPub.ActivityPub
  15. alias Pleroma.Web.ActivityPub.Visibility
  16. alias Pleroma.Web.CommonAPI
  17. alias Pleroma.Web.OAuth.Token
  18. alias Pleroma.Web.Plugs.OAuthScopesPlug
  19. alias Pleroma.Web.StreamerView
  20. require Pleroma.Constants
  21. @registry Pleroma.Web.StreamerRegistry
  22. def registry, do: @registry
  23. @public_streams Pleroma.Constants.public_streams()
  24. @local_streams ["public:local", "public:local:media"]
  25. @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
  26. @doc "Expands and authorizes a stream, and registers the process for streaming."
  27. @spec get_topic_and_add_socket(
  28. stream :: String.t(),
  29. User.t() | nil,
  30. Token.t() | nil,
  31. map() | nil
  32. ) ::
  33. {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
  34. def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
  35. with {:ok, topic} <- get_topic(stream, user, oauth_token, params) do
  36. add_socket(topic, oauth_token)
  37. end
  38. end
  39. defp can_access_stream(user, oauth_token, kind) do
  40. with {_, true} <- {:restrict?, Config.restrict_unauthenticated_access?(:timelines, kind)},
  41. {_, %User{id: user_id}, %Token{user_id: user_id}} <- {:user, user, oauth_token},
  42. {_, true} <-
  43. {:scopes,
  44. OAuthScopesPlug.filter_descendants(["read:statuses"], oauth_token.scopes) != []} do
  45. true
  46. else
  47. {:restrict?, _} ->
  48. true
  49. _ ->
  50. false
  51. end
  52. end
  53. @doc "Expand and authorizes a stream"
  54. @spec get_topic(stream :: String.t() | nil, User.t() | nil, Token.t() | nil, map()) ::
  55. {:ok, topic :: String.t() | nil} | {:error, :bad_topic}
  56. def get_topic(stream, user, oauth_token, params \\ %{})
  57. def get_topic(nil = _stream, _user, _oauth_token, _params) do
  58. {:ok, nil}
  59. end
  60. # Allow all public steams if the instance allows unauthenticated access.
  61. # Otherwise, only allow users with valid oauth tokens.
  62. def get_topic(stream, user, oauth_token, _params) when stream in @public_streams do
  63. kind = if stream in @local_streams, do: :local, else: :federated
  64. if can_access_stream(user, oauth_token, kind) do
  65. {:ok, stream}
  66. else
  67. {:error, :unauthorized}
  68. end
  69. end
  70. # Allow all hashtags streams.
  71. def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
  72. {:ok, "hashtag:" <> tag}
  73. end
  74. # Allow remote instance streams.
  75. def get_topic("public:remote", user, oauth_token, %{"instance" => instance} = _params) do
  76. if can_access_stream(user, oauth_token, :federated) do
  77. {:ok, "public:remote:" <> instance}
  78. else
  79. {:error, :unauthorized}
  80. end
  81. end
  82. def get_topic("public:remote:media", user, oauth_token, %{"instance" => instance} = _params) do
  83. if can_access_stream(user, oauth_token, :federated) do
  84. {:ok, "public:remote:media:" <> instance}
  85. else
  86. {:error, :unauthorized}
  87. end
  88. end
  89. # Expand user streams.
  90. def get_topic(
  91. stream,
  92. %User{id: user_id} = user,
  93. %Token{user_id: user_id} = oauth_token,
  94. _params
  95. )
  96. when stream in @user_streams do
  97. # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
  98. required_scopes =
  99. if stream == "user:notification" do
  100. ["read:notifications"]
  101. else
  102. ["read:statuses"]
  103. end
  104. if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
  105. {:error, :unauthorized}
  106. else
  107. {:ok, stream <> ":" <> to_string(user.id)}
  108. end
  109. end
  110. def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
  111. {:error, :unauthorized}
  112. end
  113. # List streams.
  114. def get_topic(
  115. "list",
  116. %User{id: user_id} = user,
  117. %Token{user_id: user_id} = oauth_token,
  118. %{"list" => id}
  119. ) do
  120. cond do
  121. OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
  122. {:error, :unauthorized}
  123. Pleroma.List.get(id, user) ->
  124. {:ok, "list:" <> to_string(id)}
  125. true ->
  126. {:error, :bad_topic}
  127. end
  128. end
  129. def get_topic("list", _user, _oauth_token, _params) do
  130. {:error, :unauthorized}
  131. end
  132. def get_topic(_stream, _user, _oauth_token, _params) do
  133. {:error, :bad_topic}
  134. end
  135. @doc "Registers the process for streaming. Use `get_topic/3` to get the full authorized topic."
  136. def add_socket(topic, oauth_token) do
  137. if should_env_send?() do
  138. oauth_token_id = if oauth_token, do: oauth_token.id, else: false
  139. Registry.register(@registry, topic, oauth_token_id)
  140. end
  141. {:ok, topic}
  142. end
  143. def remove_socket(topic) do
  144. if should_env_send?(), do: Registry.unregister(@registry, topic)
  145. end
  146. def stream(topics, items) do
  147. if should_env_send?() do
  148. for topic <- List.wrap(topics), item <- List.wrap(items) do
  149. fun = fn -> do_stream(topic, item) end
  150. if Config.get([__MODULE__, :sync_streaming], false) do
  151. fun.()
  152. else
  153. spawn(fun)
  154. end
  155. end
  156. end
  157. end
  158. def filtered_by_user?(user, item, streamed_type \\ :activity)
  159. def filtered_by_user?(%User{} = user, %Activity{} = item, streamed_type) do
  160. %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
  161. User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
  162. recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
  163. recipients = MapSet.new(item.recipients)
  164. domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
  165. with parent <- Object.normalize(item, fetch: false) || item,
  166. true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
  167. true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
  168. true <-
  169. !(streamed_type == :activity && item.data["type"] == "Announce" &&
  170. parent.data["actor"] == user.ap_id),
  171. true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
  172. true <- MapSet.disjoint?(recipients, recipient_blocks),
  173. %{host: item_host} <- URI.parse(item.actor),
  174. %{host: parent_host} <- URI.parse(parent.data["actor"]),
  175. false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
  176. false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
  177. true <- thread_containment(item, user),
  178. false <- CommonAPI.thread_muted?(parent, user) do
  179. false
  180. else
  181. _ -> true
  182. end
  183. end
  184. def filtered_by_user?(%User{} = user, %Notification{activity: activity}, _) do
  185. filtered_by_user?(user, activity, :notification)
  186. end
  187. defp do_stream("direct", item) do
  188. recipient_topics =
  189. User.get_recipients_from_activity(item)
  190. |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
  191. Enum.each(recipient_topics, fn user_topic ->
  192. Logger.debug("Trying to push direct message to #{user_topic}\n\n")
  193. push_to_socket(user_topic, item)
  194. end)
  195. end
  196. defp do_stream("follow_relationship", item) do
  197. user_topic = "user:#{item.follower.id}"
  198. text = StreamerView.render("follow_relationships_update.json", item, user_topic)
  199. Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
  200. Registry.dispatch(@registry, user_topic, fn list ->
  201. Enum.each(list, fn {pid, _auth} ->
  202. send(pid, {:text, text})
  203. end)
  204. end)
  205. end
  206. defp do_stream("participation", participation) do
  207. user_topic = "direct:#{participation.user_id}"
  208. Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
  209. push_to_socket(user_topic, participation)
  210. end
  211. defp do_stream("list", item) do
  212. # filter the recipient list if the activity is not public, see #270.
  213. recipient_lists =
  214. case Visibility.public?(item) do
  215. true ->
  216. Pleroma.List.get_lists_from_activity(item)
  217. _ ->
  218. Pleroma.List.get_lists_from_activity(item)
  219. |> Enum.filter(fn list ->
  220. owner = User.get_cached_by_id(list.user_id)
  221. Visibility.visible_for_user?(item, owner)
  222. end)
  223. end
  224. recipient_topics =
  225. recipient_lists
  226. |> Enum.map(fn %{id: id} -> "list:#{id}" end)
  227. Enum.each(recipient_topics, fn list_topic ->
  228. Logger.debug("Trying to push message to #{list_topic}\n\n")
  229. push_to_socket(list_topic, item)
  230. end)
  231. end
  232. defp do_stream(topic, %Notification{} = item)
  233. when topic in ["user", "user:notification"] do
  234. user_topic = "#{topic}:#{item.user_id}"
  235. Registry.dispatch(@registry, user_topic, fn list ->
  236. Enum.each(list, fn {pid, _auth} ->
  237. send(pid, {:render_with_user, StreamerView, "notification.json", item, user_topic})
  238. end)
  239. end)
  240. end
  241. defp do_stream(topic, {user, %MessageReference{} = cm_ref})
  242. when topic in ["user", "user:pleroma_chat"] do
  243. topic = "#{topic}:#{user.id}"
  244. text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}, topic)
  245. Registry.dispatch(@registry, topic, fn list ->
  246. Enum.each(list, fn {pid, _auth} ->
  247. send(pid, {:text, text})
  248. end)
  249. end)
  250. end
  251. defp do_stream("user", item) do
  252. Logger.debug("Trying to push to users")
  253. recipient_topics =
  254. User.get_recipients_from_activity(item)
  255. |> Enum.map(fn %{id: id} -> "user:#{id}" end)
  256. hashtag_recipients =
  257. if Pleroma.Constants.as_public() in item.recipients do
  258. Pleroma.Hashtag.get_recipients_for_activity(item)
  259. |> Enum.map(fn id -> "user:#{id}" end)
  260. else
  261. []
  262. end
  263. all_recipients = Enum.uniq(recipient_topics ++ hashtag_recipients)
  264. Enum.each(all_recipients, fn topic ->
  265. push_to_socket(topic, item)
  266. end)
  267. end
  268. defp do_stream(topic, item) do
  269. Logger.debug("Trying to push to #{topic}")
  270. Logger.debug("Pushing item to #{topic}")
  271. push_to_socket(topic, item)
  272. end
  273. defp push_to_socket(topic, %Participation{} = participation) do
  274. rendered = StreamerView.render("conversation.json", participation, topic)
  275. Registry.dispatch(@registry, topic, fn list ->
  276. Enum.each(list, fn {pid, _} ->
  277. send(pid, {:text, rendered})
  278. end)
  279. end)
  280. end
  281. defp push_to_socket(topic, %Activity{
  282. data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
  283. }) do
  284. rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
  285. Registry.dispatch(@registry, topic, fn list ->
  286. Enum.each(list, fn {pid, _} ->
  287. send(pid, {:text, rendered})
  288. end)
  289. end)
  290. end
  291. defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
  292. defp push_to_socket(topic, %Activity{data: %{"type" => "Update"}} = item) do
  293. create_activity =
  294. Pleroma.Activity.get_create_by_object_ap_id(item.object.data["id"])
  295. |> Map.put(:object, item.object)
  296. anon_render = StreamerView.render("status_update.json", create_activity, topic)
  297. Registry.dispatch(@registry, topic, fn list ->
  298. Enum.each(list, fn {pid, auth?} ->
  299. if auth? do
  300. send(
  301. pid,
  302. {:render_with_user, StreamerView, "status_update.json", create_activity, topic}
  303. )
  304. else
  305. send(pid, {:text, anon_render})
  306. end
  307. end)
  308. end)
  309. end
  310. defp push_to_socket(topic, item) do
  311. anon_render = StreamerView.render("update.json", item, topic)
  312. Registry.dispatch(@registry, topic, fn list ->
  313. Enum.each(list, fn {pid, auth?} ->
  314. if auth? do
  315. send(pid, {:render_with_user, StreamerView, "update.json", item, topic})
  316. else
  317. send(pid, {:text, anon_render})
  318. end
  319. end)
  320. end)
  321. end
  322. defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
  323. defp thread_containment(activity, user) do
  324. if Config.get([:instance, :skip_thread_containment]) do
  325. true
  326. else
  327. ActivityPub.contain_activity(activity, user)
  328. end
  329. end
  330. def close_streams_by_oauth_token(oauth_token) do
  331. if should_env_send?() do
  332. Registry.select(
  333. @registry,
  334. [
  335. {
  336. {:"$1", :"$2", :"$3"},
  337. [{:==, :"$3", oauth_token.id}],
  338. [:"$2"]
  339. }
  340. ]
  341. )
  342. |> Enum.each(fn pid -> send(pid, :close) end)
  343. end
  344. end
  345. # In dev/prod the streamer registry is expected to be started, so return true
  346. # In test it is possible to have the registry started for a test so it will check
  347. # In benchmark it will never find the process alive and return false
  348. def should_env_send? do
  349. if Application.get_env(:pleroma, Pleroma.Application)[:streamer_registry] do
  350. true
  351. else
  352. case Process.whereis(@registry) do
  353. nil ->
  354. false
  355. pid ->
  356. Process.alive?(pid)
  357. end
  358. end
  359. end
  360. end