logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma

fed_sockets.ex (6411B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.FedSockets do
  5. @moduledoc """
  6. This documents the FedSockets framework. A framework for federating
  7. ActivityPub objects between servers via persistant WebSocket connections.
  8. FedSockets allow servers to authenticate on first contact and maintain that
  9. connection, eliminating the need to authenticate every time data needs to be shared.
  10. ## Protocol
  11. FedSockets currently support 2 types of data transfer:
  12. * `publish` method which doesn't require a response
  13. * `fetch` method requires a response be sent
  14. ### Publish
  15. The publish operation sends a json encoded map of the shape:
  16. %{action: :publish, data: json}
  17. and accepts (but does not require) a reply of form:
  18. %{"action" => "publish_reply"}
  19. The outgoing params represent
  20. * data: ActivityPub object encoded into json
  21. ### Fetch
  22. The fetch operation sends a json encoded map of the shape:
  23. %{action: :fetch, data: id, uuid: fetch_uuid}
  24. and requires a reply of form:
  25. %{"action" => "fetch_reply", "uuid" => uuid, "data" => data}
  26. The outgoing params represent
  27. * id: an ActivityPub object URI
  28. * uuid: a unique uuid generated by the sender
  29. The reply params represent
  30. * data: an ActivityPub object encoded into json
  31. * uuid: the uuid sent along with the fetch request
  32. ## Examples
  33. Clients of FedSocket transfers shouldn't need to use any of the functions outside of this module.
  34. A typical publish operation can be performed through the following code, and a fetch operation in a similar manner.
  35. case FedSockets.get_or_create_fed_socket(inbox) do
  36. {:ok, fedsocket} ->
  37. FedSockets.publish(fedsocket, json)
  38. _ ->
  39. alternative_publish(inbox, actor, json, params)
  40. end
  41. ## Configuration
  42. FedSockets have the following config settings
  43. config :pleroma, :fed_sockets,
  44. enabled: true,
  45. ping_interval: :timer.seconds(15),
  46. connection_duration: :timer.hours(1),
  47. rejection_duration: :timer.hours(1),
  48. fed_socket_fetches: [
  49. default: 12_000,
  50. interval: 3_000,
  51. lazy: false
  52. ]
  53. * enabled - turn FedSockets on or off with this flag. Can be toggled at runtime.
  54. * connection_duration - How long a FedSocket can sit idle before it's culled.
  55. * rejection_duration - After failing to make a FedSocket connection a host will be excluded
  56. from further connections for this amount of time
  57. * fed_socket_fetches - Use these parameters to pass options to the Cachex queue backing the FetchRegistry
  58. * fed_socket_rejections - Use these parameters to pass options to the Cachex queue backing the FedRegistry
  59. Cachex options are
  60. * default: the minimum amount of time a fetch can wait before it times out.
  61. * interval: the interval between checks for timed out entries. This plus the default represent the maximum time allowed
  62. * lazy: leave at false for consistant and fast lookups, set to true for stricter timeout enforcement
  63. """
  64. require Logger
  65. alias Pleroma.Web.FedSockets.FedRegistry
  66. alias Pleroma.Web.FedSockets.FedSocket
  67. alias Pleroma.Web.FedSockets.SocketInfo
  68. @doc """
  69. returns a FedSocket for the given origin. Will reuse an existing one or create a new one.
  70. address is expected to be a fully formed URL such as:
  71. "http://www.example.com" or "http://www.example.com:8080"
  72. It can and usually does include additional path parameters,
  73. but these are ignored as the FedSockets are organized by host and port info alone.
  74. """
  75. def get_or_create_fed_socket(address) do
  76. with {:cache, {:error, :missing}} <- {:cache, get_fed_socket(address)},
  77. {:connect, {:ok, _pid}} <- {:connect, FedSocket.connect_to_host(address)},
  78. {:cache, {:ok, fed_socket}} <- {:cache, get_fed_socket(address)} do
  79. Logger.debug("fedsocket created for - #{inspect(address)}")
  80. {:ok, fed_socket}
  81. else
  82. {:cache, {:ok, socket}} ->
  83. Logger.debug("fedsocket found in cache - #{inspect(address)}")
  84. {:ok, socket}
  85. {:cache, {:error, :rejected} = e} ->
  86. e
  87. {:connect, {:error, _host}} ->
  88. Logger.debug("set host rejected for - #{inspect(address)}")
  89. FedRegistry.set_host_rejected(address)
  90. {:error, :rejected}
  91. {_, {:error, :disabled}} ->
  92. {:error, :disabled}
  93. {_, {:error, reason}} ->
  94. Logger.warn("get_or_create_fed_socket error - #{inspect(reason)}")
  95. {:error, reason}
  96. end
  97. end
  98. @doc """
  99. returns a FedSocket for the given origin. Will not create a new FedSocket if one does not exist.
  100. address is expected to be a fully formed URL such as:
  101. "http://www.example.com" or "http://www.example.com:8080"
  102. """
  103. def get_fed_socket(address) do
  104. origin = SocketInfo.origin(address)
  105. with {:config, true} <- {:config, Pleroma.Config.get([:fed_sockets, :enabled], false)},
  106. {:ok, socket} <- FedRegistry.get_fed_socket(origin) do
  107. {:ok, socket}
  108. else
  109. {:config, _} ->
  110. {:error, :disabled}
  111. {:error, :rejected} ->
  112. Logger.debug("FedSocket previously rejected - #{inspect(origin)}")
  113. {:error, :rejected}
  114. {:error, reason} ->
  115. {:error, reason}
  116. end
  117. end
  118. @doc """
  119. Sends the supplied data via the publish protocol.
  120. It will not block waiting for a reply.
  121. Returns :ok but this is not an indication of a successful transfer.
  122. the data is expected to be JSON encoded binary data.
  123. """
  124. def publish(%SocketInfo{} = fed_socket, json) do
  125. FedSocket.publish(fed_socket, json)
  126. end
  127. @doc """
  128. Sends the supplied data via the fetch protocol.
  129. It will block waiting for a reply or timeout.
  130. Returns {:ok, object} where object is the requested object (or nil)
  131. {:error, :timeout} in the event the message was not responded to
  132. the id is expected to be the URI of an ActivityPub object.
  133. """
  134. def fetch(%SocketInfo{} = fed_socket, id) do
  135. FedSocket.fetch(fed_socket, id)
  136. end
  137. @doc """
  138. Disconnect all and restart FedSockets.
  139. This is mainly used in development and testing but could be useful in production.
  140. """
  141. def reset do
  142. FedRegistry
  143. |> Process.whereis()
  144. |> Process.exit(:testing)
  145. end
  146. def uri_for_origin(origin),
  147. do: "ws://#{origin}/api/fedsocket/v1"
  148. end