logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://hacktivis.me/git/pleroma.git

publisher.ex (10397B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.ActivityPub.Publisher do
  5. alias Pleroma.Activity
  6. alias Pleroma.Config
  7. alias Pleroma.Delivery
  8. alias Pleroma.HTTP
  9. alias Pleroma.Instances
  10. alias Pleroma.Object
  11. alias Pleroma.Repo
  12. alias Pleroma.User
  13. alias Pleroma.Web.ActivityPub.Relay
  14. alias Pleroma.Web.ActivityPub.Transmogrifier
  15. alias Pleroma.Workers.PublisherWorker
  16. require Pleroma.Constants
  17. import Pleroma.Web.ActivityPub.Visibility
  18. require Logger
  19. @moduledoc """
  20. ActivityPub outgoing federation module.
  21. """
  22. @doc """
  23. Enqueue publishing a single activity.
  24. """
  25. @spec enqueue_one(map(), Keyword.t()) :: {:ok, %Oban.Job{}}
  26. def enqueue_one(%{} = params, worker_args \\ []) do
  27. PublisherWorker.enqueue(
  28. "publish_one",
  29. %{"params" => params},
  30. worker_args
  31. )
  32. end
  33. @doc """
  34. Gathers a set of remote users given an IR envelope.
  35. """
  36. def remote_users(%User{id: user_id}, %{data: %{"to" => to} = data}) do
  37. cc = Map.get(data, "cc", [])
  38. bcc =
  39. data
  40. |> Map.get("bcc", [])
  41. |> Enum.reduce([], fn ap_id, bcc ->
  42. case Pleroma.List.get_by_ap_id(ap_id) do
  43. %Pleroma.List{user_id: ^user_id} = list ->
  44. {:ok, following} = Pleroma.List.get_following(list)
  45. bcc ++ Enum.map(following, & &1.ap_id)
  46. _ ->
  47. bcc
  48. end
  49. end)
  50. [to, cc, bcc]
  51. |> Enum.concat()
  52. |> Enum.map(&User.get_cached_by_ap_id/1)
  53. |> Enum.filter(fn user -> user && !user.local end)
  54. end
  55. @doc """
  56. Determine if an activity can be represented by running it through Transmogrifier.
  57. """
  58. def representable?(%Activity{} = activity) do
  59. with {:ok, _data} <- Transmogrifier.prepare_outgoing(activity.data) do
  60. true
  61. else
  62. _e ->
  63. false
  64. end
  65. end
  66. @doc """
  67. Publish a single message to a peer. Takes a struct with the following
  68. parameters set:
  69. * `inbox`: the inbox to publish to
  70. * `json`: the JSON message body representing the ActivityPub message
  71. * `actor`: the actor which is signing the message
  72. * `id`: the ActivityStreams URI of the message
  73. """
  74. def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = params) do
  75. Logger.debug("Federating #{id} to #{inbox}")
  76. uri = %{path: path} = URI.parse(inbox)
  77. digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
  78. date = Pleroma.Signature.signed_date()
  79. signature =
  80. Pleroma.Signature.sign(actor, %{
  81. "(request-target)": "post #{path}",
  82. host: signature_host(uri),
  83. "content-length": byte_size(json),
  84. digest: digest,
  85. date: date
  86. })
  87. with {:ok, %{status: code}} = result when code in 200..299 <-
  88. HTTP.post(
  89. inbox,
  90. json,
  91. [
  92. {"Content-Type", "application/activity+json"},
  93. {"Date", date},
  94. {"signature", signature},
  95. {"digest", digest}
  96. ]
  97. ) do
  98. if not Map.has_key?(params, :unreachable_since) || params[:unreachable_since] do
  99. Instances.set_reachable(inbox)
  100. end
  101. result
  102. else
  103. {_post_result, %{status: code} = response} = e ->
  104. unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
  105. Logger.metadata(activity: id, inbox: inbox, status: code)
  106. Logger.error("Publisher failed to inbox #{inbox} with status #{code}")
  107. case response do
  108. %{status: 403} -> {:discard, :forbidden}
  109. %{status: 404} -> {:discard, :not_found}
  110. %{status: 410} -> {:discard, :not_found}
  111. _ -> {:error, e}
  112. end
  113. {:error, :pool_full} ->
  114. Logger.debug("Publisher snoozing worker job due to full connection pool")
  115. {:snooze, 30}
  116. e ->
  117. unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
  118. Logger.metadata(activity: id, inbox: inbox)
  119. Logger.error("Publisher failed to inbox #{inbox} #{inspect(e)}")
  120. {:error, e}
  121. end
  122. end
  123. def publish_one(%{actor_id: actor_id} = params) do
  124. actor = User.get_cached_by_id(actor_id)
  125. params
  126. |> Map.delete(:actor_id)
  127. |> Map.put(:actor, actor)
  128. |> publish_one()
  129. end
  130. defp signature_host(%URI{port: port, scheme: scheme, host: host}) do
  131. if port == URI.default_port(scheme) do
  132. host
  133. else
  134. "#{host}:#{port}"
  135. end
  136. end
  137. defp should_federate?(inbox, public) do
  138. if public do
  139. true
  140. else
  141. %{host: host} = URI.parse(inbox)
  142. quarantined_instances =
  143. Config.get([:instance, :quarantined_instances], [])
  144. |> Pleroma.Web.ActivityPub.MRF.instance_list_from_tuples()
  145. |> Pleroma.Web.ActivityPub.MRF.subdomains_regex()
  146. !Pleroma.Web.ActivityPub.MRF.subdomain_match?(quarantined_instances, host)
  147. end
  148. end
  149. @spec recipients(User.t(), Activity.t()) :: [[User.t()]]
  150. defp recipients(actor, activity) do
  151. followers =
  152. if actor.follower_address in activity.recipients do
  153. User.get_external_followers(actor)
  154. else
  155. []
  156. end
  157. fetchers =
  158. with %Activity{data: %{"type" => "Delete"}} <- activity,
  159. %Object{id: object_id} <- Object.normalize(activity, fetch: false),
  160. fetchers <- User.get_delivered_users_by_object_id(object_id),
  161. _ <- Delivery.delete_all_by_object_id(object_id) do
  162. fetchers
  163. else
  164. _ ->
  165. []
  166. end
  167. mentioned = remote_users(actor, activity)
  168. non_mentioned = (followers ++ fetchers) -- mentioned
  169. [mentioned, non_mentioned]
  170. end
  171. defp get_cc_ap_ids(ap_id, recipients) do
  172. host = Map.get(URI.parse(ap_id), :host)
  173. recipients
  174. |> Enum.filter(fn %User{ap_id: ap_id} -> Map.get(URI.parse(ap_id), :host) == host end)
  175. |> Enum.map(& &1.ap_id)
  176. end
  177. defp maybe_use_sharedinbox(%User{shared_inbox: nil, inbox: inbox}), do: inbox
  178. defp maybe_use_sharedinbox(%User{shared_inbox: shared_inbox}), do: shared_inbox
  179. @doc """
  180. Determine a user inbox to use based on heuristics. These heuristics
  181. are based on an approximation of the ``sharedInbox`` rules in the
  182. [ActivityPub specification][ap-sharedinbox].
  183. Please do not edit this function (or its children) without reading
  184. the spec, as editing the code is likely to introduce some breakage
  185. without some familiarity.
  186. [ap-sharedinbox]: https://www.w3.org/TR/activitypub/#shared-inbox-delivery
  187. """
  188. def determine_inbox(
  189. %Activity{data: activity_data},
  190. %User{inbox: inbox} = user
  191. ) do
  192. to = activity_data["to"] || []
  193. cc = activity_data["cc"] || []
  194. type = activity_data["type"]
  195. cond do
  196. type == "Delete" ->
  197. maybe_use_sharedinbox(user)
  198. Pleroma.Constants.as_public() in to || Pleroma.Constants.as_public() in cc ->
  199. maybe_use_sharedinbox(user)
  200. length(to) + length(cc) > 1 ->
  201. maybe_use_sharedinbox(user)
  202. true ->
  203. inbox
  204. end
  205. end
  206. @doc """
  207. Publishes an activity with BCC to all relevant peers.
  208. """
  209. def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity)
  210. when is_list(bcc) and bcc != [] do
  211. public = public?(activity)
  212. {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
  213. [priority_recipients, recipients] = recipients(actor, activity)
  214. inboxes =
  215. [priority_recipients, recipients]
  216. |> Enum.map(fn recipients ->
  217. recipients
  218. |> Enum.map(fn %User{} = user ->
  219. determine_inbox(activity, user)
  220. end)
  221. |> Enum.uniq()
  222. |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
  223. |> Instances.filter_reachable()
  224. end)
  225. Repo.checkout(fn ->
  226. Enum.each(inboxes, fn inboxes ->
  227. Enum.each(inboxes, fn {inbox, unreachable_since} ->
  228. %User{ap_id: ap_id} = Enum.find(recipients, fn actor -> actor.inbox == inbox end)
  229. # Get all the recipients on the same host and add them to cc. Otherwise, a remote
  230. # instance would only accept a first message for the first recipient and ignore the rest.
  231. cc = get_cc_ap_ids(ap_id, recipients)
  232. json =
  233. data
  234. |> Map.put("cc", cc)
  235. |> Jason.encode!()
  236. __MODULE__.enqueue_one(%{
  237. inbox: inbox,
  238. json: json,
  239. actor_id: actor.id,
  240. id: activity.data["id"],
  241. unreachable_since: unreachable_since
  242. })
  243. end)
  244. end)
  245. end)
  246. end
  247. # Publishes an activity to all relevant peers.
  248. def publish(%User{} = actor, %Activity{} = activity) do
  249. public = public?(activity)
  250. if public && Config.get([:instance, :allow_relay]) do
  251. Logger.debug(fn -> "Relaying #{activity.data["id"]} out" end)
  252. Relay.publish(activity)
  253. end
  254. {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
  255. json = Jason.encode!(data)
  256. [priority_inboxes, inboxes] =
  257. recipients(actor, activity)
  258. |> Enum.map(fn recipients ->
  259. recipients
  260. |> Enum.map(fn %User{} = user ->
  261. determine_inbox(activity, user)
  262. end)
  263. |> Enum.uniq()
  264. |> Enum.filter(fn inbox -> should_federate?(inbox, public) end)
  265. end)
  266. inboxes = inboxes -- priority_inboxes
  267. [{priority_inboxes, 0}, {inboxes, 1}]
  268. |> Enum.each(fn {inboxes, priority} ->
  269. inboxes
  270. |> Instances.filter_reachable()
  271. |> Enum.each(fn {inbox, unreachable_since} ->
  272. __MODULE__.enqueue_one(
  273. %{
  274. inbox: inbox,
  275. json: json,
  276. actor_id: actor.id,
  277. id: activity.data["id"],
  278. unreachable_since: unreachable_since
  279. },
  280. priority: priority
  281. )
  282. end)
  283. end)
  284. :ok
  285. end
  286. def gather_webfinger_links(%User{} = user) do
  287. [
  288. %{"rel" => "self", "type" => "application/activity+json", "href" => user.ap_id},
  289. %{
  290. "rel" => "self",
  291. "type" => "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"",
  292. "href" => user.ap_id
  293. },
  294. %{
  295. "rel" => "http://ostatus.org/schema/1.0/subscribe",
  296. "template" => "#{Pleroma.Web.Endpoint.url()}/ostatus_subscribe?acct={uri}"
  297. }
  298. ]
  299. end
  300. def gather_nodeinfo_protocol_names, do: ["activitypub"]
  301. end