logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://hacktivis.me/git/pleroma.git

side_effects.ex (19380B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.ActivityPub.SideEffects do
  5. @moduledoc """
  6. This module looks at an inserted object and executes the side effects that it
  7. implies. For example, a `Like` activity will increase the like count on the
  8. liked object, a `Follow` activity will add the user to the follower
  9. collection, and so on.
  10. """
  11. alias Pleroma.Activity
  12. alias Pleroma.Chat
  13. alias Pleroma.Chat.MessageReference
  14. alias Pleroma.FollowingRelationship
  15. alias Pleroma.Notification
  16. alias Pleroma.Object
  17. alias Pleroma.Repo
  18. alias Pleroma.User
  19. alias Pleroma.Web.ActivityPub.ActivityPub
  20. alias Pleroma.Web.ActivityPub.Builder
  21. alias Pleroma.Web.ActivityPub.Pipeline
  22. alias Pleroma.Web.ActivityPub.Utils
  23. alias Pleroma.Web.Push
  24. alias Pleroma.Web.Streamer
  25. alias Pleroma.Workers.PollWorker
  26. require Pleroma.Constants
  27. require Logger
  28. @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
  29. @logger Pleroma.Config.get([:side_effects, :logger], Logger)
  30. @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
  31. defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
  32. @impl true
  33. def handle(object, meta \\ [])
  34. # Task this handles
  35. # - Follows
  36. # - Sends a notification
  37. @impl true
  38. def handle(
  39. %{
  40. data: %{
  41. "actor" => actor,
  42. "type" => "Accept",
  43. "object" => follow_activity_id
  44. }
  45. } = object,
  46. meta
  47. ) do
  48. with %Activity{actor: follower_id} = follow_activity <-
  49. Activity.get_by_ap_id(follow_activity_id),
  50. %User{} = followed <- User.get_cached_by_ap_id(actor),
  51. %User{} = follower <- User.get_cached_by_ap_id(follower_id),
  52. {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
  53. {:ok, _follower, followed} <-
  54. FollowingRelationship.update(follower, followed, :follow_accept) do
  55. Notification.update_notification_type(followed, follow_activity)
  56. end
  57. {:ok, object, meta}
  58. end
  59. # Task this handles
  60. # - Rejects all existing follow activities for this person
  61. # - Updates the follow state
  62. # - Dismisses notification
  63. @impl true
  64. def handle(
  65. %{
  66. data: %{
  67. "actor" => actor,
  68. "type" => "Reject",
  69. "object" => follow_activity_id
  70. }
  71. } = object,
  72. meta
  73. ) do
  74. with %Activity{actor: follower_id} = follow_activity <-
  75. Activity.get_by_ap_id(follow_activity_id),
  76. %User{} = followed <- User.get_cached_by_ap_id(actor),
  77. %User{} = follower <- User.get_cached_by_ap_id(follower_id),
  78. {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
  79. FollowingRelationship.update(follower, followed, :follow_reject)
  80. Notification.dismiss(follow_activity)
  81. end
  82. {:ok, object, meta}
  83. end
  84. # Tasks this handle
  85. # - Follows if possible
  86. # - Sends a notification
  87. # - Generates accept or reject if appropriate
  88. @impl true
  89. def handle(
  90. %{
  91. data: %{
  92. "id" => follow_id,
  93. "type" => "Follow",
  94. "object" => followed_user,
  95. "actor" => following_user
  96. }
  97. } = object,
  98. meta
  99. ) do
  100. with %User{} = follower <- User.get_cached_by_ap_id(following_user),
  101. %User{} = followed <- User.get_cached_by_ap_id(followed_user),
  102. {_, {:ok, _, _}, _, _} <-
  103. {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
  104. if followed.local && !followed.is_locked do
  105. {:ok, accept_data, _} = Builder.accept(followed, object)
  106. {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
  107. end
  108. else
  109. {:following, {:error, _}, _follower, followed} ->
  110. {:ok, reject_data, _} = Builder.reject(followed, object)
  111. {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
  112. _ ->
  113. nil
  114. end
  115. {:ok, notifications} = Notification.create_notifications(object, do_send: false)
  116. meta =
  117. meta
  118. |> add_notifications(notifications)
  119. updated_object = Activity.get_by_ap_id(follow_id)
  120. {:ok, updated_object, meta}
  121. end
  122. # Tasks this handles:
  123. # - Unfollow and block
  124. @impl true
  125. def handle(
  126. %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
  127. object,
  128. meta
  129. ) do
  130. with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
  131. %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
  132. User.block(blocker, blocked)
  133. end
  134. {:ok, object, meta}
  135. end
  136. # Tasks this handles:
  137. # - Update the user
  138. # - Update a non-user object (Note, Question, etc.)
  139. #
  140. # For a local user, we also get a changeset with the full information, so we
  141. # can update non-federating, non-activitypub settings as well.
  142. @impl true
  143. def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
  144. updated_object_id = updated_object["id"]
  145. with {_, true} <- {:has_id, is_binary(updated_object_id)},
  146. %{"type" => type} <- updated_object,
  147. {_, is_user} <- {:is_user, type in Pleroma.Constants.actor_types()} do
  148. if is_user do
  149. handle_update_user(object, meta)
  150. else
  151. handle_update_object(object, meta)
  152. end
  153. else
  154. _ ->
  155. {:ok, object, meta}
  156. end
  157. end
  158. # Tasks this handles:
  159. # - Add like to object
  160. # - Set up notification
  161. @impl true
  162. def handle(%{data: %{"type" => "Like"}} = object, meta) do
  163. liked_object = Object.get_by_ap_id(object.data["object"])
  164. Utils.add_like_to_object(object, liked_object)
  165. Notification.create_notifications(object)
  166. {:ok, object, meta}
  167. end
  168. # Tasks this handles
  169. # - Actually create object
  170. # - Rollback if we couldn't create it
  171. # - Increase the user note count
  172. # - Increase the reply count
  173. # - Increase replies count
  174. # - Set up ActivityExpiration
  175. # - Set up notifications
  176. # - Index incoming posts for search (if needed)
  177. @impl true
  178. def handle(%{data: %{"type" => "Create"}} = activity, meta) do
  179. with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
  180. %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
  181. {:ok, notifications} = Notification.create_notifications(activity, do_send: false)
  182. {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
  183. {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
  184. if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
  185. Object.increase_replies_count(in_reply_to)
  186. end
  187. if quote_url = object.data["quoteUrl"] do
  188. Object.increase_quotes_count(quote_url)
  189. end
  190. reply_depth = (meta[:depth] || 0) + 1
  191. # FIXME: Force inReplyTo to replies
  192. if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
  193. object.data["replies"] != nil do
  194. for reply_id <- object.data["replies"] do
  195. Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
  196. "id" => reply_id,
  197. "depth" => reply_depth
  198. })
  199. end
  200. end
  201. ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
  202. Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
  203. end)
  204. Pleroma.Search.add_to_index(Map.put(activity, :object, object))
  205. Utils.maybe_handle_group_posts(activity)
  206. meta =
  207. meta
  208. |> add_notifications(notifications)
  209. ap_streamer().stream_out(activity)
  210. {:ok, activity, meta}
  211. else
  212. e -> Repo.rollback(e)
  213. end
  214. end
  215. # Tasks this handles:
  216. # - Add announce to object
  217. # - Set up notification
  218. # - Stream out the announce
  219. @impl true
  220. def handle(%{data: %{"type" => "Announce"}} = object, meta) do
  221. announced_object = Object.get_by_ap_id(object.data["object"])
  222. user = User.get_cached_by_ap_id(object.data["actor"])
  223. Utils.add_announce_to_object(object, announced_object)
  224. if !User.internal?(user) do
  225. Notification.create_notifications(object)
  226. ap_streamer().stream_out(object)
  227. end
  228. {:ok, object, meta}
  229. end
  230. @impl true
  231. def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
  232. with undone_object <- Activity.get_by_ap_id(undone_object),
  233. :ok <- handle_undoing(undone_object) do
  234. {:ok, object, meta}
  235. end
  236. end
  237. # Tasks this handles:
  238. # - Add reaction to object
  239. # - Set up notification
  240. @impl true
  241. def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
  242. reacted_object = Object.get_by_ap_id(object.data["object"])
  243. Utils.add_emoji_reaction_to_object(object, reacted_object)
  244. Notification.create_notifications(object)
  245. {:ok, object, meta}
  246. end
  247. # Tasks this handles:
  248. # - Delete and unpins the create activity
  249. # - Replace object with Tombstone
  250. # - Reduce the user note count
  251. # - Reduce the reply count
  252. # - Stream out the activity
  253. # - Removes posts from search index (if needed)
  254. @impl true
  255. def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
  256. deleted_object =
  257. Object.normalize(deleted_object, fetch: false) ||
  258. User.get_cached_by_ap_id(deleted_object)
  259. result =
  260. case deleted_object do
  261. %Object{} ->
  262. with {_, {:ok, deleted_object, _activity}} <- {:object, Object.delete(deleted_object)},
  263. {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
  264. {_, %User{} = user} <- {:user, User.get_cached_by_ap_id(actor)} do
  265. User.remove_pinned_object_id(user, deleted_object.data["id"])
  266. {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
  267. if in_reply_to = deleted_object.data["inReplyTo"] do
  268. Object.decrease_replies_count(in_reply_to)
  269. end
  270. if quote_url = deleted_object.data["quoteUrl"] do
  271. Object.decrease_quotes_count(quote_url)
  272. end
  273. MessageReference.delete_for_object(deleted_object)
  274. ap_streamer().stream_out(object)
  275. ap_streamer().stream_out_participations(deleted_object, user)
  276. :ok
  277. else
  278. {:actor, _} ->
  279. @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
  280. :no_object_actor
  281. {:user, _} ->
  282. @logger.error(
  283. "The object's actor could not be resolved to a user: #{inspect(deleted_object)}"
  284. )
  285. :no_object_user
  286. {:object, _} ->
  287. @logger.error("The object could not be deleted: #{inspect(deleted_object)}")
  288. {:error, object}
  289. end
  290. %User{} ->
  291. with {:ok, _} <- User.delete(deleted_object) do
  292. :ok
  293. end
  294. end
  295. if result == :ok do
  296. # Only remove from index when deleting actual objects, not users or anything else
  297. with %Pleroma.Object{} <- deleted_object do
  298. Pleroma.Search.remove_from_index(deleted_object)
  299. end
  300. {:ok, object, meta}
  301. else
  302. {:error, result}
  303. end
  304. end
  305. # Tasks this handles:
  306. # - adds pin to user
  307. # - removes expiration job for pinned activity, if was set for expiration
  308. @impl true
  309. def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
  310. with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
  311. {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
  312. # if pinned activity was scheduled for deletion, we remove job
  313. if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
  314. Oban.cancel_job(expiration.id)
  315. end
  316. {:ok, object, meta}
  317. else
  318. nil ->
  319. {:error, :user_not_found}
  320. {:error, changeset} ->
  321. if changeset.errors[:pinned_objects] do
  322. {:error, :pinned_statuses_limit_reached}
  323. else
  324. changeset.errors
  325. end
  326. end
  327. end
  328. # Tasks this handles:
  329. # - removes pin from user
  330. # - removes corresponding Add activity
  331. # - if activity had expiration, recreates activity expiration job
  332. @impl true
  333. def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
  334. with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
  335. {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
  336. data["object"]
  337. |> Activity.add_by_params_query(user.ap_id, user.featured_address)
  338. |> Repo.delete_all()
  339. # if pinned activity was scheduled for deletion, we reschedule it for deletion
  340. if meta[:expires_at] do
  341. # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
  342. {:ok, expires_at} =
  343. Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
  344. Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
  345. activity_id: meta[:activity_id],
  346. expires_at: expires_at
  347. })
  348. end
  349. {:ok, object, meta}
  350. else
  351. nil -> {:error, :user_not_found}
  352. error -> error
  353. end
  354. end
  355. # Nothing to do
  356. @impl true
  357. def handle(object, meta) do
  358. {:ok, object, meta}
  359. end
  360. defp handle_update_user(
  361. %{data: %{"type" => "Update", "object" => updated_object}} = object,
  362. meta
  363. ) do
  364. if changeset = Keyword.get(meta, :user_update_changeset) do
  365. changeset
  366. |> User.update_and_set_cache()
  367. else
  368. {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
  369. User.get_by_ap_id(updated_object["id"])
  370. |> User.remote_user_changeset(new_user_data)
  371. |> User.update_and_set_cache()
  372. end
  373. {:ok, object, meta}
  374. end
  375. defp handle_update_object(
  376. %{data: %{"type" => "Update", "object" => updated_object}} = object,
  377. meta
  378. ) do
  379. orig_object_ap_id = updated_object["id"]
  380. orig_object = Object.get_by_ap_id(orig_object_ap_id)
  381. orig_object_data = orig_object.data
  382. updated_object =
  383. if meta[:local] do
  384. # If this is a local Update, we don't process it by transmogrifier,
  385. # so we use the embedded object as-is.
  386. updated_object
  387. else
  388. meta[:object_data]
  389. end
  390. if orig_object_data["type"] in Pleroma.Constants.updatable_object_types() do
  391. {:ok, _, updated} =
  392. Object.Updater.do_update_and_invalidate_cache(orig_object, updated_object)
  393. if updated do
  394. object
  395. |> Activity.normalize()
  396. |> ActivityPub.notify_and_stream()
  397. end
  398. end
  399. {:ok, object, meta}
  400. end
  401. def handle_object_creation(%{"type" => "ChatMessage"} = object, _activity, meta) do
  402. with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
  403. actor = User.get_cached_by_ap_id(object.data["actor"])
  404. recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
  405. streamables =
  406. [[actor, recipient], [recipient, actor]]
  407. |> Enum.uniq()
  408. |> Enum.map(fn [user, other_user] ->
  409. if user.local do
  410. {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
  411. {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
  412. @cachex.put(
  413. :chat_message_id_idempotency_key_cache,
  414. cm_ref.id,
  415. meta[:idempotency_key]
  416. )
  417. {
  418. ["user", "user:pleroma_chat"],
  419. {user, %{cm_ref | chat: chat, object: object}}
  420. }
  421. end
  422. end)
  423. |> Enum.filter(& &1)
  424. meta =
  425. meta
  426. |> add_streamables(streamables)
  427. {:ok, object, meta}
  428. end
  429. end
  430. def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
  431. with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
  432. PollWorker.schedule_poll_end(activity)
  433. {:ok, object, meta}
  434. end
  435. end
  436. def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
  437. with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
  438. Object.increase_vote_count(
  439. object.data["inReplyTo"],
  440. object.data["name"],
  441. object.data["actor"]
  442. )
  443. {:ok, object, meta}
  444. end
  445. end
  446. def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
  447. when objtype in ~w[Audio Video Image Event Article Note Page] do
  448. with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
  449. {:ok, object, meta}
  450. end
  451. end
  452. # Nothing to do
  453. def handle_object_creation(object, _activity, meta) do
  454. {:ok, object, meta}
  455. end
  456. defp undo_like(nil, object), do: delete_object(object)
  457. defp undo_like(%Object{} = liked_object, object) do
  458. with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
  459. delete_object(object)
  460. end
  461. end
  462. def handle_undoing(%{data: %{"type" => "Like"}} = object) do
  463. object.data["object"]
  464. |> Object.get_by_ap_id()
  465. |> undo_like(object)
  466. end
  467. def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
  468. with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
  469. {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
  470. {:ok, _} <- Repo.delete(object) do
  471. :ok
  472. end
  473. end
  474. def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
  475. with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
  476. {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
  477. {:ok, _} <- Repo.delete(object) do
  478. :ok
  479. end
  480. end
  481. def handle_undoing(
  482. %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
  483. ) do
  484. with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
  485. %User{} = blocked <- User.get_cached_by_ap_id(blocked),
  486. {:ok, _} <- User.unblock(blocker, blocked),
  487. {:ok, _} <- Repo.delete(object) do
  488. :ok
  489. end
  490. end
  491. def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
  492. @spec delete_object(Activity.t()) :: :ok | {:error, Ecto.Changeset.t()}
  493. defp delete_object(object) do
  494. with {:ok, _} <- Repo.delete(object), do: :ok
  495. end
  496. defp send_notifications(meta) do
  497. Keyword.get(meta, :notifications, [])
  498. |> Enum.each(fn notification ->
  499. Streamer.stream(["user", "user:notification"], notification)
  500. Push.send(notification)
  501. end)
  502. meta
  503. end
  504. defp send_streamables(meta) do
  505. Keyword.get(meta, :streamables, [])
  506. |> Enum.each(fn {topics, items} ->
  507. Streamer.stream(topics, items)
  508. end)
  509. meta
  510. end
  511. defp add_streamables(meta, streamables) do
  512. existing = Keyword.get(meta, :streamables, [])
  513. meta
  514. |> Keyword.put(:streamables, streamables ++ existing)
  515. end
  516. defp add_notifications(meta, notifications) do
  517. existing = Keyword.get(meta, :notifications, [])
  518. meta
  519. |> Keyword.put(:notifications, notifications ++ existing)
  520. end
  521. @impl true
  522. def handle_after_transaction(meta) do
  523. meta
  524. |> send_notifications()
  525. |> send_streamables()
  526. end
  527. end