logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma

federator.ex (3721B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.Federator do
  5. alias Pleroma.Activity
  6. alias Pleroma.Object.Containment
  7. alias Pleroma.User
  8. alias Pleroma.Web.ActivityPub.ActivityPub
  9. alias Pleroma.Web.ActivityPub.Transmogrifier
  10. alias Pleroma.Web.ActivityPub.Utils
  11. alias Pleroma.Web.Federator.Publisher
  12. alias Pleroma.Workers.PublisherWorker
  13. alias Pleroma.Workers.ReceiverWorker
  14. require Logger
  15. @doc """
  16. Returns `true` if the distance to target object does not exceed max configured value.
  17. Serves to prevent fetching of very long threads, especially useful on smaller instances.
  18. Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161).
  19. Applies to fetching of both ancestor (reply-to) and child (reply) objects.
  20. """
  21. # credo:disable-for-previous-line Credo.Check.Readability.MaxLineLength
  22. def allowed_thread_distance?(distance) do
  23. max_distance = Pleroma.Config.get([:instance, :federation_incoming_replies_max_depth])
  24. if max_distance && max_distance >= 0 do
  25. # Default depth is 0 (an object has zero distance from itself in its thread)
  26. (distance || 0) <= max_distance
  27. else
  28. true
  29. end
  30. end
  31. # Client API
  32. def incoming_ap_doc(params) do
  33. ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params})
  34. end
  35. def publish(%{id: "pleroma:fakeid"} = activity) do
  36. perform(:publish, activity)
  37. end
  38. def publish(activity) do
  39. PublisherWorker.enqueue("publish", %{"activity_id" => activity.id})
  40. end
  41. # Job Worker Callbacks
  42. @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
  43. def perform(:publish_one, module, params) do
  44. apply(module, :publish_one, [params])
  45. end
  46. def perform(:publish, activity) do
  47. Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
  48. with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]),
  49. {:ok, actor} <- User.ensure_keys_present(actor) do
  50. Publisher.publish(actor, activity)
  51. end
  52. end
  53. def perform(:incoming_ap_doc, params) do
  54. Logger.debug("Handling incoming AP activity")
  55. actor =
  56. params
  57. |> Map.get("actor")
  58. |> Utils.get_ap_id()
  59. # NOTE: we use the actor ID to do the containment, this is fine because an
  60. # actor shouldn't be acting on objects outside their own AP server.
  61. with {_, {:ok, _user}} <- {:actor, ap_enabled_actor(actor)},
  62. nil <- Activity.normalize(params["id"]),
  63. {_, :ok} <-
  64. {:correct_origin?, Containment.contain_origin_from_id(actor, params)},
  65. {:ok, activity} <- Transmogrifier.handle_incoming(params) do
  66. {:ok, activity}
  67. else
  68. {:correct_origin?, _} ->
  69. Logger.debug("Origin containment failure for #{params["id"]}")
  70. {:error, :origin_containment_failed}
  71. %Activity{} ->
  72. Logger.debug("Already had #{params["id"]}")
  73. {:error, :already_present}
  74. {:actor, e} ->
  75. Logger.debug("Unhandled actor #{actor}, #{inspect(e)}")
  76. {:error, e}
  77. {:error, {:validate_object, _}} = e ->
  78. Logger.error("Incoming AP doc validation error: #{inspect(e)}")
  79. Logger.debug(Jason.encode!(params, pretty: true))
  80. e
  81. e ->
  82. # Just drop those for now
  83. Logger.debug(fn -> "Unhandled activity\n" <> Jason.encode!(params, pretty: true) end)
  84. {:error, e}
  85. end
  86. end
  87. def ap_enabled_actor(id) do
  88. user = User.get_cached_by_ap_id(id)
  89. if User.ap_enabled?(user) do
  90. {:ok, user}
  91. else
  92. ActivityPub.make_user_from_ap_id(id)
  93. end
  94. end
  95. end