logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://anongit.hacktivis.me/git/pleroma.git/

publisher.ex (11244B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.ActivityPub.Publisher do
  5. alias Pleroma.Activity
  6. alias Pleroma.Config
  7. alias Pleroma.Delivery
  8. alias Pleroma.HTTP
  9. alias Pleroma.Instances
  10. alias Pleroma.Object
  11. alias Pleroma.Repo
  12. alias Pleroma.User
  13. alias Pleroma.Web.ActivityPub.Publisher.Prepared
  14. alias Pleroma.Web.ActivityPub.Relay
  15. alias Pleroma.Web.ActivityPub.Transmogrifier
  16. alias Pleroma.Workers.PublisherWorker
  17. require Pleroma.Constants
  18. import Pleroma.Web.ActivityPub.Visibility
  19. require Logger
  20. @moduledoc """
  21. ActivityPub outgoing federation module.
  22. """
  23. @doc """
  24. Enqueue publishing a single activity.
  25. """
  26. @spec enqueue_one(map(), Keyword.t()) :: {:ok, %Oban.Job{}}
  27. def enqueue_one(%{} = params, worker_args \\ []) do
  28. PublisherWorker.new(
  29. %{"op" => "publish_one", "params" => params},
  30. worker_args
  31. )
  32. |> Oban.insert()
  33. end
  34. @doc """
  35. Gathers a set of remote users given an IR envelope.
  36. """
  37. def remote_users(%User{id: user_id}, %{data: %{"to" => to} = data}) do
  38. cc = Map.get(data, "cc", [])
  39. bcc =
  40. data
  41. |> Map.get("bcc", [])
  42. |> Enum.reduce([], fn ap_id, bcc ->
  43. case Pleroma.List.get_by_ap_id(ap_id) do
  44. %Pleroma.List{user_id: ^user_id} = list ->
  45. {:ok, following} = Pleroma.List.get_following(list)
  46. bcc ++ Enum.map(following, & &1.ap_id)
  47. _ ->
  48. bcc
  49. end
  50. end)
  51. [to, cc, bcc]
  52. |> Enum.concat()
  53. |> Enum.map(&User.get_cached_by_ap_id/1)
  54. |> Enum.filter(fn user -> user && !user.local end)
  55. end
  56. @doc """
  57. Determine if an activity can be represented by running it through Transmogrifier.
  58. """
  59. def representable?(%Activity{} = activity) do
  60. with {:ok, _data} <- Transmogrifier.prepare_outgoing(activity.data) do
  61. true
  62. else
  63. _e ->
  64. false
  65. end
  66. end
  67. @doc """
  68. Prepare an activity for publishing from an Oban job
  69. * `inbox`: the inbox to publish to
  70. * `activity_id`: the internal activity id
  71. * `cc`: the cc recipients relevant to this inbox (optional)
  72. """
  73. @spec prepare_one(map()) :: Prepared.t()
  74. def prepare_one(%{inbox: inbox, activity_id: activity_id} = params) do
  75. activity = Activity.get_by_id_with_user_actor(activity_id)
  76. actor = activity.user_actor
  77. ap_id = activity.data["id"]
  78. Logger.debug("Federating #{ap_id} to #{inbox}")
  79. uri = %{path: path} = URI.parse(inbox)
  80. {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
  81. cc = Map.get(params, :cc, [])
  82. json =
  83. data
  84. |> Map.put("cc", cc)
  85. |> Jason.encode!()
  86. digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
  87. date = Pleroma.Signature.signed_date()
  88. signature =
  89. Pleroma.Signature.sign(actor, %{
  90. "(request-target)": "post #{path}",
  91. host: signature_host(uri),
  92. "content-length": byte_size(json),
  93. digest: digest,
  94. date: date
  95. })
  96. %Prepared{
  97. activity_id: activity_id,
  98. json: json,
  99. date: date,
  100. signature: signature,
  101. digest: digest,
  102. inbox: inbox,
  103. unreachable_since: params[:unreachable_since]
  104. }
  105. end
  106. @doc """
  107. Publish a single message to a peer. Takes a struct with the following
  108. parameters set:
  109. * `activity_id`: the activity id
  110. * `json`: the json payload
  111. * `date`: the signed date from Pleroma.Signature.signed_date()
  112. * `signature`: the signature from Pleroma.Signature.sign/2
  113. * `digest`: base64 encoded the hash of the json payload prefixed with "SHA-256="
  114. * `inbox`: the inbox URI of this delivery
  115. * `unreachable_since`: timestamp the instance was marked unreachable
  116. """
  117. def publish_one(%Prepared{} = p) do
  118. with {:ok, %{status: code}} = result when code in 200..299 <-
  119. HTTP.post(
  120. p.inbox,
  121. p.json,
  122. [
  123. {"Content-Type", "application/activity+json"},
  124. {"Date", p.date},
  125. {"signature", p.signature},
  126. {"digest", p.digest}
  127. ]
  128. ) do
  129. if not is_nil(p.unreachable_since) do
  130. Instances.set_reachable(p.inbox)
  131. end
  132. result
  133. else
  134. {_post_result, %{status: code} = response} = e ->
  135. if is_nil(p.unreachable_since) do
  136. Instances.set_unreachable(p.inbox)
  137. end
  138. Logger.metadata(activity: p.activity_id, inbox: p.inbox, status: code)
  139. Logger.error("Publisher failed to inbox #{p.inbox} with status #{code}")
  140. case response do
  141. %{status: 400} -> {:cancel, :bad_request}
  142. %{status: 403} -> {:cancel, :forbidden}
  143. %{status: 404} -> {:cancel, :not_found}
  144. %{status: 410} -> {:cancel, :not_found}
  145. _ -> {:error, e}
  146. end
  147. {:error, {:already_started, _}} ->
  148. Logger.debug("Publisher snoozing worker job due worker :already_started race condition")
  149. connection_pool_snooze()
  150. {:error, :pool_full} ->
  151. Logger.debug("Publisher snoozing worker job due to full connection pool")
  152. connection_pool_snooze()
  153. e ->
  154. if is_nil(p.unreachable_since) do
  155. Instances.set_unreachable(p.inbox)
  156. end
  157. Logger.metadata(activity: p.activity_id, inbox: p.inbox)
  158. Logger.error("Publisher failed to inbox #{p.inbox} #{inspect(e)}")
  159. {:error, e}
  160. end
  161. end
  162. defp connection_pool_snooze, do: {:snooze, 3}
  163. defp signature_host(%URI{port: port, scheme: scheme, host: host}) do
  164. if port == URI.default_port(scheme) do
  165. host
  166. else
  167. "#{host}:#{port}"
  168. end
  169. end
  170. def should_federate?(nil, _), do: false
  171. def should_federate?(_, true), do: true
  172. def should_federate?(inbox, _) do
  173. %{host: host} = URI.parse(inbox)
  174. quarantined_instances =
  175. Config.get([:instance, :quarantined_instances], [])
  176. |> Pleroma.Web.ActivityPub.MRF.instance_list_from_tuples()
  177. |> Pleroma.Web.ActivityPub.MRF.subdomains_regex()
  178. !Pleroma.Web.ActivityPub.MRF.subdomain_match?(quarantined_instances, host)
  179. end
  180. @spec recipients(User.t(), Activity.t()) :: [[User.t()]]
  181. defp recipients(actor, activity) do
  182. followers =
  183. if actor.follower_address in activity.recipients do
  184. User.get_external_followers(actor)
  185. else
  186. []
  187. end
  188. fetchers =
  189. with %Activity{data: %{"type" => "Delete"}} <- activity,
  190. %Object{id: object_id} <- Object.normalize(activity, fetch: false),
  191. fetchers <- User.get_delivered_users_by_object_id(object_id),
  192. _ <- Delivery.delete_all_by_object_id(object_id) do
  193. fetchers
  194. else
  195. _ ->
  196. []
  197. end
  198. mentioned = remote_users(actor, activity)
  199. non_mentioned = (followers ++ fetchers) -- mentioned
  200. [mentioned, non_mentioned]
  201. end
  202. defp get_cc_ap_ids(ap_id, recipients) do
  203. host = Map.get(URI.parse(ap_id), :host)
  204. recipients
  205. |> Enum.filter(fn %User{ap_id: ap_id} -> Map.get(URI.parse(ap_id), :host) == host end)
  206. |> Enum.map(& &1.ap_id)
  207. end
  208. defp maybe_use_sharedinbox(%User{shared_inbox: nil, inbox: inbox}), do: inbox
  209. defp maybe_use_sharedinbox(%User{shared_inbox: shared_inbox}), do: shared_inbox
  210. @doc """
  211. Determine a user inbox to use based on heuristics. These heuristics
  212. are based on an approximation of the ``sharedInbox`` rules in the
  213. [ActivityPub specification][ap-sharedinbox].
  214. Please do not edit this function (or its children) without reading
  215. the spec, as editing the code is likely to introduce some breakage
  216. without some familiarity.
  217. [ap-sharedinbox]: https://www.w3.org/TR/activitypub/#shared-inbox-delivery
  218. """
  219. def determine_inbox(
  220. %Activity{data: activity_data},
  221. %User{inbox: inbox} = user
  222. ) do
  223. to = activity_data["to"] || []
  224. cc = activity_data["cc"] || []
  225. type = activity_data["type"]
  226. cond do
  227. type == "Delete" ->
  228. maybe_use_sharedinbox(user)
  229. Pleroma.Constants.as_public() in to || Pleroma.Constants.as_public() in cc ->
  230. maybe_use_sharedinbox(user)
  231. length(to) + length(cc) > 1 ->
  232. maybe_use_sharedinbox(user)
  233. true ->
  234. inbox
  235. end
  236. end
  237. @doc """
  238. Publishes an activity with BCC to all relevant peers.
  239. """
  240. def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity)
  241. when is_list(bcc) and bcc != [] do
  242. public = public?(activity)
  243. [priority_recipients, recipients] = recipients(actor, activity)
  244. inboxes =
  245. [priority_recipients, recipients]
  246. |> Enum.map(fn recipients ->
  247. recipients
  248. |> Enum.map(fn %User{} = user ->
  249. determine_inbox(activity, user)
  250. end)
  251. |> Enum.uniq()
  252. |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
  253. |> Instances.filter_reachable()
  254. end)
  255. Repo.checkout(fn ->
  256. Enum.each(inboxes, fn inboxes ->
  257. Enum.each(inboxes, fn {inbox, unreachable_since} ->
  258. %User{ap_id: ap_id} = Enum.find(recipients, fn actor -> actor.inbox == inbox end)
  259. # Get all the recipients on the same host and add them to cc. Otherwise, a remote
  260. # instance would only accept a first message for the first recipient and ignore the rest.
  261. cc = get_cc_ap_ids(ap_id, recipients)
  262. __MODULE__.enqueue_one(%{
  263. inbox: inbox,
  264. cc: cc,
  265. activity_id: activity.id,
  266. unreachable_since: unreachable_since
  267. })
  268. end)
  269. end)
  270. end)
  271. end
  272. # Publishes an activity to all relevant peers.
  273. def publish(%User{} = actor, %Activity{} = activity) do
  274. public = public?(activity)
  275. if public && Config.get([:instance, :allow_relay]) do
  276. Logger.debug(fn -> "Relaying #{activity.data["id"]} out" end)
  277. Relay.publish(activity)
  278. end
  279. [priority_inboxes, inboxes] =
  280. recipients(actor, activity)
  281. |> Enum.map(fn recipients ->
  282. recipients
  283. |> Enum.map(fn %User{} = user ->
  284. determine_inbox(activity, user)
  285. end)
  286. |> Enum.uniq()
  287. |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
  288. end)
  289. inboxes = inboxes -- priority_inboxes
  290. [{priority_inboxes, 0}, {inboxes, 1}]
  291. |> Enum.each(fn {inboxes, priority} ->
  292. inboxes
  293. |> Instances.filter_reachable()
  294. |> Enum.each(fn {inbox, unreachable_since} ->
  295. __MODULE__.enqueue_one(
  296. %{
  297. inbox: inbox,
  298. activity_id: activity.id,
  299. unreachable_since: unreachable_since
  300. },
  301. priority: priority
  302. )
  303. end)
  304. end)
  305. :ok
  306. end
  307. def gather_webfinger_links(%User{} = user) do
  308. [
  309. %{"rel" => "self", "type" => "application/activity+json", "href" => user.ap_id},
  310. %{
  311. "rel" => "self",
  312. "type" => "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"",
  313. "href" => user.ap_id
  314. },
  315. %{
  316. "rel" => "http://ostatus.org/schema/1.0/subscribe",
  317. "template" => "#{Pleroma.Web.Endpoint.url()}/ostatus_subscribe?acct={uri}"
  318. }
  319. ]
  320. end
  321. def gather_nodeinfo_protocol_names, do: ["activitypub"]
  322. end