logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://hacktivis.me/git/pleroma.git

federator.ex (4019B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.Federator do
  5. alias Pleroma.Activity
  6. alias Pleroma.Object.Containment
  7. alias Pleroma.User
  8. alias Pleroma.Web.ActivityPub.Publisher
  9. alias Pleroma.Web.ActivityPub.Transmogrifier
  10. alias Pleroma.Web.ActivityPub.Utils
  11. alias Pleroma.Workers.PublisherWorker
  12. alias Pleroma.Workers.ReceiverWorker
  13. require Logger
  14. @behaviour Pleroma.Web.Federator.Publishing
  15. @doc """
  16. Returns `true` if the distance to target object does not exceed max configured value.
  17. Serves to prevent fetching of very long threads, especially useful on smaller instances.
  18. Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161).
  19. Applies to fetching of both ancestor (reply-to) and child (reply) objects.
  20. """
  21. # credo:disable-for-previous-line Credo.Check.Readability.MaxLineLength
  22. def allowed_thread_distance?(distance) do
  23. max_distance = Pleroma.Config.get([:instance, :federation_incoming_replies_max_depth])
  24. if max_distance && max_distance >= 0 do
  25. # Default depth is 0 (an object has zero distance from itself in its thread)
  26. (distance || 0) <= max_distance
  27. else
  28. true
  29. end
  30. end
  31. # Client API
  32. def incoming_ap_doc(%{params: params, req_headers: req_headers}) do
  33. ReceiverWorker.enqueue(
  34. "incoming_ap_doc",
  35. %{"req_headers" => req_headers, "params" => params, "timeout" => :timer.seconds(20)},
  36. priority: 2
  37. )
  38. end
  39. def incoming_ap_doc(%{"type" => "Delete"} = params) do
  40. ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}, priority: 3)
  41. end
  42. def incoming_ap_doc(params) do
  43. ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params})
  44. end
  45. @impl true
  46. def publish(%{id: "pleroma:fakeid"} = activity) do
  47. perform(:publish, activity)
  48. end
  49. @impl true
  50. def publish(%Pleroma.Activity{data: %{"type" => type}} = activity) do
  51. PublisherWorker.enqueue("publish", %{"activity_id" => activity.id},
  52. priority: publish_priority(type)
  53. )
  54. end
  55. defp publish_priority("Delete"), do: 3
  56. defp publish_priority(_), do: 0
  57. # Job Worker Callbacks
  58. @spec perform(atom(), any()) :: {:ok, any()} | {:error, any()}
  59. def perform(:publish_one, params), do: Publisher.publish_one(params)
  60. def perform(:publish, activity) do
  61. Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
  62. %User{} = actor = User.get_cached_by_ap_id(activity.data["actor"])
  63. Publisher.publish(actor, activity)
  64. end
  65. def perform(:incoming_ap_doc, params) do
  66. Logger.debug("Handling incoming AP activity")
  67. actor =
  68. params
  69. |> Map.get("actor")
  70. |> Utils.get_ap_id()
  71. # NOTE: we use the actor ID to do the containment, this is fine because an
  72. # actor shouldn't be acting on objects outside their own AP server.
  73. with {_, {:ok, _user}} <- {:actor, User.get_or_fetch_by_ap_id(actor)},
  74. nil <- Activity.normalize(params["id"]),
  75. {_, :ok} <-
  76. {:correct_origin?, Containment.contain_origin_from_id(actor, params)},
  77. {:ok, activity} <- Transmogrifier.handle_incoming(params) do
  78. {:ok, activity}
  79. else
  80. {:correct_origin?, _} ->
  81. Logger.debug("Origin containment failure for #{params["id"]}")
  82. {:error, :origin_containment_failed}
  83. %Activity{} ->
  84. Logger.debug("Already had #{params["id"]}")
  85. {:error, :already_present}
  86. {:actor, e} ->
  87. Logger.debug("Unhandled actor #{actor}, #{inspect(e)}")
  88. {:error, e}
  89. {:error, {:validate_object, _}} = e ->
  90. Logger.error("Incoming AP doc validation error: #{inspect(e)}")
  91. Logger.debug(Jason.encode!(params, pretty: true))
  92. e
  93. e ->
  94. # Just drop those for now
  95. Logger.debug(fn -> "Unhandled activity\n" <> Jason.encode!(params, pretty: true) end)
  96. {:error, e}
  97. end
  98. end
  99. end