logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://hacktivis.me/git/pleroma.git

side_effects.ex (19345B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.ActivityPub.SideEffects do
  5. @moduledoc """
  6. This module looks at an inserted object and executes the side effects that it
  7. implies. For example, a `Like` activity will increase the like count on the
  8. liked object, a `Follow` activity will add the user to the follower
  9. collection, and so on.
  10. """
  11. alias Pleroma.Activity
  12. alias Pleroma.Chat
  13. alias Pleroma.Chat.MessageReference
  14. alias Pleroma.FollowingRelationship
  15. alias Pleroma.Notification
  16. alias Pleroma.Object
  17. alias Pleroma.Repo
  18. alias Pleroma.User
  19. alias Pleroma.Web.ActivityPub.ActivityPub
  20. alias Pleroma.Web.ActivityPub.Builder
  21. alias Pleroma.Web.ActivityPub.Pipeline
  22. alias Pleroma.Web.ActivityPub.Utils
  23. alias Pleroma.Web.Streamer
  24. alias Pleroma.Workers.PollWorker
  25. require Pleroma.Constants
  26. require Logger
  27. @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
  28. @logger Pleroma.Config.get([:side_effects, :logger], Logger)
  29. @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
  30. defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
  31. @impl true
  32. def handle(object, meta \\ [])
  33. # Task this handles
  34. # - Follows
  35. # - Sends a notification
  36. @impl true
  37. def handle(
  38. %{
  39. data: %{
  40. "actor" => actor,
  41. "type" => "Accept",
  42. "object" => follow_activity_id
  43. }
  44. } = object,
  45. meta
  46. ) do
  47. with %Activity{actor: follower_id} = follow_activity <-
  48. Activity.get_by_ap_id(follow_activity_id),
  49. %User{} = followed <- User.get_cached_by_ap_id(actor),
  50. %User{} = follower <- User.get_cached_by_ap_id(follower_id),
  51. {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
  52. {:ok, _follower, followed} <-
  53. FollowingRelationship.update(follower, followed, :follow_accept) do
  54. Notification.update_notification_type(followed, follow_activity)
  55. end
  56. {:ok, object, meta}
  57. end
  58. # Task this handles
  59. # - Rejects all existing follow activities for this person
  60. # - Updates the follow state
  61. # - Dismisses notification
  62. @impl true
  63. def handle(
  64. %{
  65. data: %{
  66. "actor" => actor,
  67. "type" => "Reject",
  68. "object" => follow_activity_id
  69. }
  70. } = object,
  71. meta
  72. ) do
  73. with %Activity{actor: follower_id} = follow_activity <-
  74. Activity.get_by_ap_id(follow_activity_id),
  75. %User{} = followed <- User.get_cached_by_ap_id(actor),
  76. %User{} = follower <- User.get_cached_by_ap_id(follower_id),
  77. {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
  78. FollowingRelationship.update(follower, followed, :follow_reject)
  79. Notification.dismiss(follow_activity)
  80. end
  81. {:ok, object, meta}
  82. end
  83. # Tasks this handle
  84. # - Follows if possible
  85. # - Sends a notification
  86. # - Generates accept or reject if appropriate
  87. @impl true
  88. def handle(
  89. %{
  90. data: %{
  91. "id" => follow_id,
  92. "type" => "Follow",
  93. "object" => followed_user,
  94. "actor" => following_user
  95. }
  96. } = object,
  97. meta
  98. ) do
  99. with %User{} = follower <- User.get_cached_by_ap_id(following_user),
  100. %User{} = followed <- User.get_cached_by_ap_id(followed_user),
  101. {_, {:ok, _, _}, _, _} <-
  102. {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
  103. if followed.local && !followed.is_locked do
  104. {:ok, accept_data, _} = Builder.accept(followed, object)
  105. {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
  106. end
  107. else
  108. {:following, {:error, _}, _follower, followed} ->
  109. {:ok, reject_data, _} = Builder.reject(followed, object)
  110. {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
  111. _ ->
  112. nil
  113. end
  114. {:ok, notifications} = Notification.create_notifications(object)
  115. meta =
  116. meta
  117. |> add_notifications(notifications)
  118. updated_object = Activity.get_by_ap_id(follow_id)
  119. {:ok, updated_object, meta}
  120. end
  121. # Tasks this handles:
  122. # - Unfollow and block
  123. @impl true
  124. def handle(
  125. %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
  126. object,
  127. meta
  128. ) do
  129. with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
  130. %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
  131. User.block(blocker, blocked)
  132. end
  133. {:ok, object, meta}
  134. end
  135. # Tasks this handles:
  136. # - Update the user
  137. # - Update a non-user object (Note, Question, etc.)
  138. #
  139. # For a local user, we also get a changeset with the full information, so we
  140. # can update non-federating, non-activitypub settings as well.
  141. @impl true
  142. def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
  143. updated_object_id = updated_object["id"]
  144. with {_, true} <- {:has_id, is_binary(updated_object_id)},
  145. %{"type" => type} <- updated_object,
  146. {_, is_user} <- {:is_user, type in Pleroma.Constants.actor_types()} do
  147. if is_user do
  148. handle_update_user(object, meta)
  149. else
  150. handle_update_object(object, meta)
  151. end
  152. else
  153. _ ->
  154. {:ok, object, meta}
  155. end
  156. end
  157. # Tasks this handles:
  158. # - Add like to object
  159. # - Set up notification
  160. @impl true
  161. def handle(%{data: %{"type" => "Like"}} = object, meta) do
  162. liked_object = Object.get_by_ap_id(object.data["object"])
  163. Utils.add_like_to_object(object, liked_object)
  164. {:ok, notifications} = Notification.create_notifications(object)
  165. meta =
  166. meta
  167. |> add_notifications(notifications)
  168. {:ok, object, meta}
  169. end
  170. # Tasks this handles
  171. # - Actually create object
  172. # - Rollback if we couldn't create it
  173. # - Increase the user note count
  174. # - Increase the reply count
  175. # - Increase replies count
  176. # - Set up ActivityExpiration
  177. # - Set up notifications
  178. # - Index incoming posts for search (if needed)
  179. @impl true
  180. def handle(%{data: %{"type" => "Create"}} = activity, meta) do
  181. with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
  182. %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
  183. {:ok, notifications} = Notification.create_notifications(activity)
  184. {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
  185. {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
  186. if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
  187. Object.increase_replies_count(in_reply_to)
  188. end
  189. if quote_url = object.data["quoteUrl"] do
  190. Object.increase_quotes_count(quote_url)
  191. end
  192. reply_depth = (meta[:depth] || 0) + 1
  193. # FIXME: Force inReplyTo to replies
  194. if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
  195. object.data["replies"] != nil do
  196. for reply_id <- object.data["replies"] do
  197. Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
  198. "id" => reply_id,
  199. "depth" => reply_depth
  200. })
  201. end
  202. end
  203. Pleroma.Web.RichMedia.Card.get_by_activity(activity)
  204. Pleroma.Search.add_to_index(Map.put(activity, :object, object))
  205. Utils.maybe_handle_group_posts(activity)
  206. meta =
  207. meta
  208. |> add_notifications(notifications)
  209. ap_streamer().stream_out(activity)
  210. {:ok, activity, meta}
  211. else
  212. e -> Repo.rollback(e)
  213. end
  214. end
  215. # Tasks this handles:
  216. # - Add announce to object
  217. # - Set up notification
  218. # - Stream out the announce
  219. @impl true
  220. def handle(%{data: %{"type" => "Announce"}} = object, meta) do
  221. announced_object = Object.get_by_ap_id(object.data["object"])
  222. user = User.get_cached_by_ap_id(object.data["actor"])
  223. Utils.add_announce_to_object(object, announced_object)
  224. {:ok, notifications} = Notification.create_notifications(object)
  225. if !User.internal?(user), do: ap_streamer().stream_out(object)
  226. meta =
  227. meta
  228. |> add_notifications(notifications)
  229. {:ok, object, meta}
  230. end
  231. @impl true
  232. def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
  233. with undone_object <- Activity.get_by_ap_id(undone_object),
  234. :ok <- handle_undoing(undone_object) do
  235. {:ok, object, meta}
  236. end
  237. end
  238. # Tasks this handles:
  239. # - Add reaction to object
  240. # - Set up notification
  241. @impl true
  242. def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
  243. reacted_object = Object.get_by_ap_id(object.data["object"])
  244. Utils.add_emoji_reaction_to_object(object, reacted_object)
  245. {:ok, notifications} = Notification.create_notifications(object)
  246. meta =
  247. meta
  248. |> add_notifications(notifications)
  249. {:ok, object, meta}
  250. end
  251. # Tasks this handles:
  252. # - Delete and unpins the create activity
  253. # - Replace object with Tombstone
  254. # - Reduce the user note count
  255. # - Reduce the reply count
  256. # - Stream out the activity
  257. # - Removes posts from search index (if needed)
  258. @impl true
  259. def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
  260. deleted_object =
  261. Object.normalize(deleted_object, fetch: false) ||
  262. User.get_cached_by_ap_id(deleted_object)
  263. result =
  264. case deleted_object do
  265. %Object{} ->
  266. with {_, {:ok, deleted_object, _activity}} <- {:object, Object.delete(deleted_object)},
  267. {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
  268. {_, %User{} = user} <- {:user, User.get_cached_by_ap_id(actor)} do
  269. User.remove_pinned_object_id(user, deleted_object.data["id"])
  270. {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
  271. if in_reply_to = deleted_object.data["inReplyTo"] do
  272. Object.decrease_replies_count(in_reply_to)
  273. end
  274. if quote_url = deleted_object.data["quoteUrl"] do
  275. Object.decrease_quotes_count(quote_url)
  276. end
  277. MessageReference.delete_for_object(deleted_object)
  278. ap_streamer().stream_out(object)
  279. ap_streamer().stream_out_participations(deleted_object, user)
  280. :ok
  281. else
  282. {:actor, _} ->
  283. @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
  284. :no_object_actor
  285. {:user, _} ->
  286. @logger.error(
  287. "The object's actor could not be resolved to a user: #{inspect(deleted_object)}"
  288. )
  289. :no_object_user
  290. {:object, _} ->
  291. @logger.error("The object could not be deleted: #{inspect(deleted_object)}")
  292. {:error, object}
  293. end
  294. %User{} ->
  295. with {:ok, _} <- User.delete(deleted_object) do
  296. :ok
  297. end
  298. end
  299. if result == :ok do
  300. # Only remove from index when deleting actual objects, not users or anything else
  301. with %Pleroma.Object{} <- deleted_object do
  302. Pleroma.Search.remove_from_index(deleted_object)
  303. end
  304. {:ok, object, meta}
  305. else
  306. {:error, result}
  307. end
  308. end
  309. # Tasks this handles:
  310. # - adds pin to user
  311. # - removes expiration job for pinned activity, if was set for expiration
  312. @impl true
  313. def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
  314. with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
  315. {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
  316. # if pinned activity was scheduled for deletion, we remove job
  317. if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
  318. Oban.cancel_job(expiration.id)
  319. end
  320. {:ok, object, meta}
  321. else
  322. nil ->
  323. {:error, :user_not_found}
  324. {:error, changeset} ->
  325. if changeset.errors[:pinned_objects] do
  326. {:error, :pinned_statuses_limit_reached}
  327. else
  328. changeset.errors
  329. end
  330. end
  331. end
  332. # Tasks this handles:
  333. # - removes pin from user
  334. # - removes corresponding Add activity
  335. # - if activity had expiration, recreates activity expiration job
  336. @impl true
  337. def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
  338. with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
  339. {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
  340. data["object"]
  341. |> Activity.add_by_params_query(user.ap_id, user.featured_address)
  342. |> Repo.delete_all()
  343. # if pinned activity was scheduled for deletion, we reschedule it for deletion
  344. if meta[:expires_at] do
  345. # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
  346. {:ok, expires_at} =
  347. Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
  348. Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
  349. activity_id: meta[:activity_id],
  350. expires_at: expires_at
  351. })
  352. end
  353. {:ok, object, meta}
  354. else
  355. nil -> {:error, :user_not_found}
  356. error -> error
  357. end
  358. end
  359. # Nothing to do
  360. @impl true
  361. def handle(object, meta) do
  362. {:ok, object, meta}
  363. end
  364. defp handle_update_user(
  365. %{data: %{"type" => "Update", "object" => updated_object}} = object,
  366. meta
  367. ) do
  368. if changeset = Keyword.get(meta, :user_update_changeset) do
  369. changeset
  370. |> User.update_and_set_cache()
  371. else
  372. {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
  373. User.get_by_ap_id(updated_object["id"])
  374. |> User.remote_user_changeset(new_user_data)
  375. |> User.update_and_set_cache()
  376. end
  377. {:ok, object, meta}
  378. end
  379. defp handle_update_object(
  380. %{data: %{"type" => "Update", "object" => updated_object}} = object,
  381. meta
  382. ) do
  383. orig_object_ap_id = updated_object["id"]
  384. orig_object = Object.get_by_ap_id(orig_object_ap_id)
  385. orig_object_data = orig_object.data
  386. updated_object =
  387. if meta[:local] do
  388. # If this is a local Update, we don't process it by transmogrifier,
  389. # so we use the embedded object as-is.
  390. updated_object
  391. else
  392. meta[:object_data]
  393. end
  394. if orig_object_data["type"] in Pleroma.Constants.updatable_object_types() do
  395. {:ok, _, updated} =
  396. Object.Updater.do_update_and_invalidate_cache(orig_object, updated_object)
  397. if updated do
  398. object
  399. |> Activity.normalize()
  400. |> ActivityPub.notify_and_stream()
  401. end
  402. end
  403. {:ok, object, meta}
  404. end
  405. def handle_object_creation(%{"type" => "ChatMessage"} = object, _activity, meta) do
  406. with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
  407. actor = User.get_cached_by_ap_id(object.data["actor"])
  408. recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
  409. streamables =
  410. [[actor, recipient], [recipient, actor]]
  411. |> Enum.uniq()
  412. |> Enum.map(fn [user, other_user] ->
  413. if user.local do
  414. {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
  415. {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
  416. @cachex.put(
  417. :chat_message_id_idempotency_key_cache,
  418. cm_ref.id,
  419. meta[:idempotency_key]
  420. )
  421. {
  422. ["user", "user:pleroma_chat"],
  423. {user, %{cm_ref | chat: chat, object: object}}
  424. }
  425. end
  426. end)
  427. |> Enum.filter(& &1)
  428. meta =
  429. meta
  430. |> add_streamables(streamables)
  431. {:ok, object, meta}
  432. end
  433. end
  434. def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
  435. with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
  436. PollWorker.schedule_poll_end(activity)
  437. {:ok, object, meta}
  438. end
  439. end
  440. def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
  441. with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
  442. Object.increase_vote_count(
  443. object.data["inReplyTo"],
  444. object.data["name"],
  445. object.data["actor"]
  446. )
  447. {:ok, object, meta}
  448. end
  449. end
  450. def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
  451. when objtype in ~w[Audio Video Image Event Article Note Page] do
  452. with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
  453. {:ok, object, meta}
  454. end
  455. end
  456. # Nothing to do
  457. def handle_object_creation(object, _activity, meta) do
  458. {:ok, object, meta}
  459. end
  460. defp undo_like(nil, object), do: delete_object(object)
  461. defp undo_like(%Object{} = liked_object, object) do
  462. with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
  463. delete_object(object)
  464. end
  465. end
  466. def handle_undoing(%{data: %{"type" => "Like"}} = object) do
  467. object.data["object"]
  468. |> Object.get_by_ap_id()
  469. |> undo_like(object)
  470. end
  471. def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
  472. with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
  473. {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
  474. {:ok, _} <- Repo.delete(object) do
  475. :ok
  476. end
  477. end
  478. def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
  479. with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
  480. {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
  481. {:ok, _} <- Repo.delete(object) do
  482. :ok
  483. end
  484. end
  485. def handle_undoing(
  486. %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
  487. ) do
  488. with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
  489. %User{} = blocked <- User.get_cached_by_ap_id(blocked),
  490. {:ok, _} <- User.unblock(blocker, blocked),
  491. {:ok, _} <- Repo.delete(object) do
  492. :ok
  493. end
  494. end
  495. def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
  496. @spec delete_object(Activity.t()) :: :ok | {:error, Ecto.Changeset.t()}
  497. defp delete_object(object) do
  498. with {:ok, _} <- Repo.delete(object), do: :ok
  499. end
  500. defp send_notifications(meta) do
  501. Keyword.get(meta, :notifications, [])
  502. |> Notification.send()
  503. meta
  504. end
  505. defp send_streamables(meta) do
  506. Keyword.get(meta, :streamables, [])
  507. |> Enum.each(fn {topics, items} ->
  508. Streamer.stream(topics, items)
  509. end)
  510. meta
  511. end
  512. defp add_streamables(meta, streamables) do
  513. existing = Keyword.get(meta, :streamables, [])
  514. meta
  515. |> Keyword.put(:streamables, streamables ++ existing)
  516. end
  517. defp add_notifications(meta, notifications) do
  518. existing = Keyword.get(meta, :notifications, [])
  519. meta
  520. |> Keyword.put(:notifications, notifications ++ existing)
  521. end
  522. @impl true
  523. def handle_after_transaction(meta) do
  524. meta
  525. |> send_notifications()
  526. |> send_streamables()
  527. end
  528. end