logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://anongit.hacktivis.me/git/pleroma.git/

side_effects.ex (19431B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.ActivityPub.SideEffects do
  5. @moduledoc """
  6. This module looks at an inserted object and executes the side effects that it
  7. implies. For example, a `Like` activity will increase the like count on the
  8. liked object, a `Follow` activity will add the user to the follower
  9. collection, and so on.
  10. """
  11. alias Pleroma.Activity
  12. alias Pleroma.Chat
  13. alias Pleroma.Chat.MessageReference
  14. alias Pleroma.FollowingRelationship
  15. alias Pleroma.Notification
  16. alias Pleroma.Object
  17. alias Pleroma.Repo
  18. alias Pleroma.User
  19. alias Pleroma.Web.ActivityPub.ActivityPub
  20. alias Pleroma.Web.ActivityPub.Builder
  21. alias Pleroma.Web.ActivityPub.Pipeline
  22. alias Pleroma.Web.ActivityPub.Utils
  23. alias Pleroma.Web.Streamer
  24. alias Pleroma.Workers.PollWorker
  25. require Pleroma.Constants
  26. require Logger
  27. @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
  28. @logger Pleroma.Config.get([:side_effects, :logger], Logger)
  29. @behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
  30. defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
  31. @impl true
  32. def handle(object, meta \\ [])
  33. # Task this handles
  34. # - Follows
  35. # - Sends a notification
  36. @impl true
  37. def handle(
  38. %{
  39. data: %{
  40. "actor" => actor,
  41. "type" => "Accept",
  42. "object" => follow_activity_id
  43. }
  44. } = object,
  45. meta
  46. ) do
  47. with %Activity{actor: follower_id} = follow_activity <-
  48. Activity.get_by_ap_id(follow_activity_id),
  49. %User{} = followed <- User.get_cached_by_ap_id(actor),
  50. %User{} = follower <- User.get_cached_by_ap_id(follower_id),
  51. {:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
  52. {:ok, _follower, followed} <-
  53. FollowingRelationship.update(follower, followed, :follow_accept) do
  54. Notification.update_notification_type(followed, follow_activity)
  55. end
  56. {:ok, object, meta}
  57. end
  58. # Task this handles
  59. # - Rejects all existing follow activities for this person
  60. # - Updates the follow state
  61. # - Dismisses notification
  62. @impl true
  63. def handle(
  64. %{
  65. data: %{
  66. "actor" => actor,
  67. "type" => "Reject",
  68. "object" => follow_activity_id
  69. }
  70. } = object,
  71. meta
  72. ) do
  73. with %Activity{actor: follower_id} = follow_activity <-
  74. Activity.get_by_ap_id(follow_activity_id),
  75. %User{} = followed <- User.get_cached_by_ap_id(actor),
  76. %User{} = follower <- User.get_cached_by_ap_id(follower_id),
  77. {:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
  78. FollowingRelationship.update(follower, followed, :follow_reject)
  79. Notification.dismiss(follow_activity)
  80. end
  81. {:ok, object, meta}
  82. end
  83. # Tasks this handle
  84. # - Follows if possible
  85. # - Sends a notification
  86. # - Generates accept or reject if appropriate
  87. @impl true
  88. def handle(
  89. %{
  90. data: %{
  91. "id" => follow_id,
  92. "type" => "Follow",
  93. "object" => followed_user,
  94. "actor" => following_user
  95. }
  96. } = object,
  97. meta
  98. ) do
  99. with %User{} = follower <- User.get_cached_by_ap_id(following_user),
  100. %User{} = followed <- User.get_cached_by_ap_id(followed_user),
  101. {_, {:ok, _, _}, _, _} <-
  102. {:following, User.follow(follower, followed, :follow_pending), follower, followed} do
  103. if followed.local && !followed.is_locked do
  104. {:ok, accept_data, _} = Builder.accept(followed, object)
  105. {:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
  106. end
  107. else
  108. {:following, {:error, _}, _follower, followed} ->
  109. {:ok, reject_data, _} = Builder.reject(followed, object)
  110. {:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
  111. _ ->
  112. nil
  113. end
  114. {:ok, notifications} = Notification.create_notifications(object)
  115. meta =
  116. meta
  117. |> add_notifications(notifications)
  118. updated_object = Activity.get_by_ap_id(follow_id)
  119. {:ok, updated_object, meta}
  120. end
  121. # Tasks this handles:
  122. # - Unfollow and block
  123. @impl true
  124. def handle(
  125. %{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
  126. object,
  127. meta
  128. ) do
  129. with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
  130. %User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
  131. User.block(blocker, blocked)
  132. end
  133. {:ok, object, meta}
  134. end
  135. # Tasks this handles:
  136. # - Update the user
  137. # - Update a non-user object (Note, Question, etc.)
  138. #
  139. # For a local user, we also get a changeset with the full information, so we
  140. # can update non-federating, non-activitypub settings as well.
  141. @impl true
  142. def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
  143. updated_object_id = updated_object["id"]
  144. with {_, true} <- {:has_id, is_binary(updated_object_id)},
  145. %{"type" => type} <- updated_object,
  146. {_, is_user} <- {:is_user, type in Pleroma.Constants.actor_types()} do
  147. if is_user do
  148. handle_update_user(object, meta)
  149. else
  150. handle_update_object(object, meta)
  151. end
  152. else
  153. _ ->
  154. {:ok, object, meta}
  155. end
  156. end
  157. # Tasks this handles:
  158. # - Add like to object
  159. # - Set up notification
  160. @impl true
  161. def handle(%{data: %{"type" => "Like"}} = object, meta) do
  162. liked_object = Object.get_by_ap_id(object.data["object"])
  163. Utils.add_like_to_object(object, liked_object)
  164. {:ok, notifications} = Notification.create_notifications(object)
  165. meta =
  166. meta
  167. |> add_notifications(notifications)
  168. {:ok, object, meta}
  169. end
  170. # Tasks this handles
  171. # - Actually create object
  172. # - Rollback if we couldn't create it
  173. # - Increase the user note count
  174. # - Increase the reply count
  175. # - Increase replies count
  176. # - Set up ActivityExpiration
  177. # - Set up notifications
  178. # - Index incoming posts for search (if needed)
  179. @impl true
  180. def handle(%{data: %{"type" => "Create"}} = activity, meta) do
  181. with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
  182. %User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
  183. {:ok, notifications} = Notification.create_notifications(activity)
  184. {:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
  185. {:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
  186. if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
  187. Object.increase_replies_count(in_reply_to)
  188. end
  189. if quote_url = object.data["quoteUrl"] do
  190. Object.increase_quotes_count(quote_url)
  191. end
  192. reply_depth = (meta[:depth] || 0) + 1
  193. # FIXME: Force inReplyTo to replies
  194. if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
  195. object.data["replies"] != nil do
  196. for reply_id <- object.data["replies"] do
  197. Pleroma.Workers.RemoteFetcherWorker.new(%{
  198. "op" => "fetch_remote",
  199. "id" => reply_id,
  200. "depth" => reply_depth
  201. })
  202. |> Oban.insert()
  203. end
  204. end
  205. Pleroma.Web.RichMedia.Card.get_by_activity(activity)
  206. Pleroma.Search.add_to_index(Map.put(activity, :object, object))
  207. Utils.maybe_handle_group_posts(activity)
  208. meta =
  209. meta
  210. |> add_notifications(notifications)
  211. ap_streamer().stream_out(activity)
  212. {:ok, activity, meta}
  213. else
  214. e -> Repo.rollback(e)
  215. end
  216. end
  217. # Tasks this handles:
  218. # - Add announce to object
  219. # - Set up notification
  220. # - Stream out the announce
  221. @impl true
  222. def handle(%{data: %{"type" => "Announce"}} = object, meta) do
  223. announced_object = Object.get_by_ap_id(object.data["object"])
  224. user = User.get_cached_by_ap_id(object.data["actor"])
  225. Utils.add_announce_to_object(object, announced_object)
  226. {:ok, notifications} = Notification.create_notifications(object)
  227. if !User.internal?(user), do: ap_streamer().stream_out(object)
  228. meta =
  229. meta
  230. |> add_notifications(notifications)
  231. {:ok, object, meta}
  232. end
  233. @impl true
  234. def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
  235. with undone_object <- Activity.get_by_ap_id(undone_object),
  236. :ok <- handle_undoing(undone_object) do
  237. {:ok, object, meta}
  238. end
  239. end
  240. # Tasks this handles:
  241. # - Add reaction to object
  242. # - Set up notification
  243. @impl true
  244. def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
  245. reacted_object = Object.get_by_ap_id(object.data["object"])
  246. Utils.add_emoji_reaction_to_object(object, reacted_object)
  247. {:ok, notifications} = Notification.create_notifications(object)
  248. meta =
  249. meta
  250. |> add_notifications(notifications)
  251. {:ok, object, meta}
  252. end
  253. # Tasks this handles:
  254. # - Delete and unpins the create activity
  255. # - Replace object with Tombstone
  256. # - Reduce the user note count
  257. # - Reduce the reply count
  258. # - Stream out the activity
  259. # - Removes posts from search index (if needed)
  260. @impl true
  261. def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
  262. deleted_object =
  263. Object.normalize(deleted_object, fetch: false) ||
  264. User.get_cached_by_ap_id(deleted_object)
  265. result =
  266. case deleted_object do
  267. %Object{} ->
  268. with {_, {:ok, deleted_object, _activity}} <- {:object, Object.delete(deleted_object)},
  269. {_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
  270. {_, %User{} = user} <- {:user, User.get_cached_by_ap_id(actor)} do
  271. User.remove_pinned_object_id(user, deleted_object.data["id"])
  272. {:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
  273. if in_reply_to = deleted_object.data["inReplyTo"] do
  274. Object.decrease_replies_count(in_reply_to)
  275. end
  276. if quote_url = deleted_object.data["quoteUrl"] do
  277. Object.decrease_quotes_count(quote_url)
  278. end
  279. MessageReference.delete_for_object(deleted_object)
  280. ap_streamer().stream_out(object)
  281. ap_streamer().stream_out_participations(deleted_object, user)
  282. :ok
  283. else
  284. {:actor, _} ->
  285. @logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
  286. :no_object_actor
  287. {:user, _} ->
  288. @logger.error(
  289. "The object's actor could not be resolved to a user: #{inspect(deleted_object)}"
  290. )
  291. :no_object_user
  292. {:object, _} ->
  293. @logger.error("The object could not be deleted: #{inspect(deleted_object)}")
  294. {:error, object}
  295. end
  296. %User{} ->
  297. with {:ok, _} <- User.delete(deleted_object) do
  298. :ok
  299. end
  300. end
  301. if result == :ok do
  302. # Only remove from index when deleting actual objects, not users or anything else
  303. with %Pleroma.Object{} <- deleted_object do
  304. Pleroma.Search.remove_from_index(deleted_object)
  305. end
  306. {:ok, object, meta}
  307. else
  308. {:error, result}
  309. end
  310. end
  311. # Tasks this handles:
  312. # - adds pin to user
  313. # - removes expiration job for pinned activity, if was set for expiration
  314. @impl true
  315. def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
  316. with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
  317. {:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
  318. # if pinned activity was scheduled for deletion, we remove job
  319. if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
  320. Oban.cancel_job(expiration.id)
  321. end
  322. {:ok, object, meta}
  323. else
  324. nil ->
  325. {:error, :user_not_found}
  326. {:error, changeset} ->
  327. if changeset.errors[:pinned_objects] do
  328. {:error, :pinned_statuses_limit_reached}
  329. else
  330. changeset.errors
  331. end
  332. end
  333. end
  334. # Tasks this handles:
  335. # - removes pin from user
  336. # - removes corresponding Add activity
  337. # - if activity had expiration, recreates activity expiration job
  338. @impl true
  339. def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
  340. with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
  341. {:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
  342. data["object"]
  343. |> Activity.add_by_params_query(user.ap_id, user.featured_address)
  344. |> Repo.delete_all()
  345. # if pinned activity was scheduled for deletion, we reschedule it for deletion
  346. if meta[:expires_at] do
  347. # MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
  348. {:ok, expires_at} =
  349. Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
  350. Pleroma.Workers.PurgeExpiredActivity.enqueue(
  351. %{
  352. activity_id: meta[:activity_id]
  353. },
  354. scheduled_at: expires_at
  355. )
  356. end
  357. {:ok, object, meta}
  358. else
  359. nil -> {:error, :user_not_found}
  360. error -> error
  361. end
  362. end
  363. # Nothing to do
  364. @impl true
  365. def handle(object, meta) do
  366. {:ok, object, meta}
  367. end
  368. defp handle_update_user(
  369. %{data: %{"type" => "Update", "object" => updated_object}} = object,
  370. meta
  371. ) do
  372. if changeset = Keyword.get(meta, :user_update_changeset) do
  373. changeset
  374. |> User.update_and_set_cache()
  375. else
  376. {:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
  377. User.get_by_ap_id(updated_object["id"])
  378. |> User.remote_user_changeset(new_user_data)
  379. |> User.update_and_set_cache()
  380. end
  381. {:ok, object, meta}
  382. end
  383. defp handle_update_object(
  384. %{data: %{"type" => "Update", "object" => updated_object}} = object,
  385. meta
  386. ) do
  387. orig_object_ap_id = updated_object["id"]
  388. orig_object = Object.get_by_ap_id(orig_object_ap_id)
  389. orig_object_data = Map.get(orig_object, :data)
  390. updated_object =
  391. if meta[:local] do
  392. # If this is a local Update, we don't process it by transmogrifier,
  393. # so we use the embedded object as-is.
  394. updated_object
  395. else
  396. meta[:object_data]
  397. end
  398. if orig_object_data["type"] in Pleroma.Constants.updatable_object_types() do
  399. {:ok, _, updated} =
  400. Object.Updater.do_update_and_invalidate_cache(orig_object, updated_object)
  401. if updated do
  402. object
  403. |> Activity.normalize()
  404. |> ActivityPub.notify_and_stream()
  405. end
  406. end
  407. {:ok, object, meta}
  408. end
  409. def handle_object_creation(%{"type" => "ChatMessage"} = object, _activity, meta) do
  410. with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
  411. actor = User.get_cached_by_ap_id(object.data["actor"])
  412. recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
  413. streamables =
  414. [[actor, recipient], [recipient, actor]]
  415. |> Enum.uniq()
  416. |> Enum.map(fn [user, other_user] ->
  417. if user.local do
  418. {:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
  419. {:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
  420. @cachex.put(
  421. :chat_message_id_idempotency_key_cache,
  422. cm_ref.id,
  423. meta[:idempotency_key]
  424. )
  425. {
  426. ["user", "user:pleroma_chat"],
  427. {user, %{cm_ref | chat: chat, object: object}}
  428. }
  429. end
  430. end)
  431. |> Enum.filter(& &1)
  432. meta =
  433. meta
  434. |> add_streamables(streamables)
  435. {:ok, object, meta}
  436. end
  437. end
  438. def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
  439. with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
  440. PollWorker.schedule_poll_end(activity)
  441. {:ok, object, meta}
  442. end
  443. end
  444. def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
  445. with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
  446. Object.increase_vote_count(
  447. object.data["inReplyTo"],
  448. object.data["name"],
  449. object.data["actor"]
  450. )
  451. {:ok, object, meta}
  452. end
  453. end
  454. def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
  455. when objtype in ~w[Audio Video Image Event Article Note Page] do
  456. with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
  457. {:ok, object, meta}
  458. end
  459. end
  460. # Nothing to do
  461. def handle_object_creation(object, _activity, meta) do
  462. {:ok, object, meta}
  463. end
  464. defp undo_like(nil, object), do: delete_object(object)
  465. defp undo_like(%Object{} = liked_object, object) do
  466. with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
  467. delete_object(object)
  468. end
  469. end
  470. def handle_undoing(%{data: %{"type" => "Like"}} = object) do
  471. object.data["object"]
  472. |> Object.get_by_ap_id()
  473. |> undo_like(object)
  474. end
  475. def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
  476. with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
  477. {:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
  478. {:ok, _} <- Repo.delete(object) do
  479. :ok
  480. end
  481. end
  482. def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
  483. with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
  484. {:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
  485. {:ok, _} <- Repo.delete(object) do
  486. :ok
  487. end
  488. end
  489. def handle_undoing(
  490. %{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
  491. ) do
  492. with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
  493. %User{} = blocked <- User.get_cached_by_ap_id(blocked),
  494. {:ok, _} <- User.unblock(blocker, blocked),
  495. {:ok, _} <- Repo.delete(object) do
  496. :ok
  497. end
  498. end
  499. def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
  500. @spec delete_object(Activity.t()) :: :ok | {:error, Ecto.Changeset.t()}
  501. defp delete_object(object) do
  502. with {:ok, _} <- Repo.delete(object), do: :ok
  503. end
  504. defp stream_notifications(meta) do
  505. Keyword.get(meta, :notifications, [])
  506. |> Notification.stream()
  507. meta
  508. end
  509. defp send_streamables(meta) do
  510. Keyword.get(meta, :streamables, [])
  511. |> Enum.each(fn {topics, items} ->
  512. Streamer.stream(topics, items)
  513. end)
  514. meta
  515. end
  516. defp add_streamables(meta, streamables) do
  517. existing = Keyword.get(meta, :streamables, [])
  518. meta
  519. |> Keyword.put(:streamables, streamables ++ existing)
  520. end
  521. defp add_notifications(meta, notifications) do
  522. existing = Keyword.get(meta, :notifications, [])
  523. meta
  524. |> Keyword.put(:notifications, notifications ++ existing)
  525. end
  526. @impl true
  527. def handle_after_transaction(meta) do
  528. meta
  529. |> stream_notifications()
  530. |> send_streamables()
  531. end
  532. end