logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://anongit.hacktivis.me/git/pleroma.git/

federator.ex (4115B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.Federator do
  5. alias Pleroma.Activity
  6. alias Pleroma.Object.Containment
  7. alias Pleroma.User
  8. alias Pleroma.Web.ActivityPub.Publisher
  9. alias Pleroma.Web.ActivityPub.Transmogrifier
  10. alias Pleroma.Web.ActivityPub.Utils
  11. alias Pleroma.Workers.PublisherWorker
  12. alias Pleroma.Workers.ReceiverWorker
  13. require Logger
  14. @behaviour Pleroma.Web.Federator.Publishing
  15. @doc """
  16. Returns `true` if the distance to target object does not exceed max configured value.
  17. Serves to prevent fetching of very long threads, especially useful on smaller instances.
  18. Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161).
  19. Applies to fetching of both ancestor (reply-to) and child (reply) objects.
  20. """
  21. # credo:disable-for-previous-line Credo.Check.Readability.MaxLineLength
  22. def allowed_thread_distance?(distance) do
  23. max_distance = Pleroma.Config.get([:instance, :federation_incoming_replies_max_depth])
  24. if max_distance && max_distance >= 0 do
  25. # Default depth is 0 (an object has zero distance from itself in its thread)
  26. (distance || 0) <= max_distance
  27. else
  28. true
  29. end
  30. end
  31. # Client API
  32. def incoming_ap_doc(%{params: params, req_headers: req_headers}) do
  33. ReceiverWorker.new(
  34. %{
  35. "op" => "incoming_ap_doc",
  36. "req_headers" => req_headers,
  37. "params" => params,
  38. "timeout" => :timer.seconds(20)
  39. },
  40. priority: 2
  41. )
  42. |> Oban.insert()
  43. end
  44. def incoming_ap_doc(%{"type" => "Delete"} = params) do
  45. ReceiverWorker.new(%{"op" => "incoming_ap_doc", "params" => params},
  46. priority: 3,
  47. queue: :slow
  48. )
  49. |> Oban.insert()
  50. end
  51. def incoming_ap_doc(params) do
  52. ReceiverWorker.new(%{"op" => "incoming_ap_doc", "params" => params})
  53. |> Oban.insert()
  54. end
  55. @impl true
  56. def publish(%{id: "pleroma:fakeid"} = activity) do
  57. perform(:publish, activity)
  58. end
  59. @impl true
  60. def publish(%Pleroma.Activity{data: %{"type" => type}} = activity) do
  61. PublisherWorker.new(%{"op" => "publish", "activity_id" => activity.id},
  62. priority: publish_priority(type)
  63. )
  64. |> Oban.insert()
  65. end
  66. defp publish_priority("Delete"), do: 3
  67. defp publish_priority(_), do: 0
  68. # Job Worker Callbacks
  69. @spec perform(atom(), any()) :: {:ok, any()} | {:error, any()}
  70. def perform(:publish_one, params) do
  71. Publisher.prepare_one(params)
  72. |> Publisher.publish_one()
  73. end
  74. def perform(:publish, activity) do
  75. Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
  76. %User{} = actor = User.get_cached_by_ap_id(activity.data["actor"])
  77. Publisher.publish(actor, activity)
  78. end
  79. def perform(:incoming_ap_doc, params) do
  80. Logger.debug("Handling incoming AP activity")
  81. actor =
  82. params
  83. |> Map.get("actor")
  84. |> Utils.get_ap_id()
  85. # NOTE: we use the actor ID to do the containment, this is fine because an
  86. # actor shouldn't be acting on objects outside their own AP server.
  87. with {_, {:ok, user}} <- {:actor, User.get_or_fetch_by_ap_id(actor)},
  88. {:user_active, true} <- {:user_active, match?(true, user.is_active)},
  89. nil <- Activity.normalize(params["id"]),
  90. {_, :ok} <-
  91. {:correct_origin?, Containment.contain_origin_from_id(actor, params)},
  92. {:ok, activity} <- Transmogrifier.handle_incoming(params) do
  93. {:ok, activity}
  94. else
  95. {:correct_origin?, _} ->
  96. Logger.debug("Origin containment failure for #{params["id"]}")
  97. {:error, :origin_containment_failed}
  98. %Activity{} ->
  99. Logger.debug("Already had #{params["id"]}")
  100. {:error, :already_present}
  101. {:actor, e} ->
  102. Logger.debug("Unhandled actor #{actor}, #{inspect(e)}")
  103. {:error, e}
  104. e ->
  105. # Just drop those for now
  106. Logger.debug(fn -> "Unhandled activity\n" <> Jason.encode!(params, pretty: true) end)
  107. {:error, e}
  108. end
  109. end
  110. end