logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma

fed_registry.ex (5311B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.FedSockets.FedRegistry do
  5. @moduledoc """
  6. The FedRegistry stores the active FedSockets for quick retrieval.
  7. The storage and retrieval portion of the FedRegistry is done in process through
  8. elixir's `Registry` module for speed and its ability to monitor for terminated processes.
  9. Dropped connections will be caught by `Registry` and deleted. Since the next
  10. message will initiate a new connection there is no reason to try and reconnect at that point.
  11. Normally outside modules should have no need to call or use the FedRegistry themselves.
  12. """
  13. alias Pleroma.Web.FedSockets.FedSocket
  14. alias Pleroma.Web.FedSockets.SocketInfo
  15. require Logger
  16. @default_rejection_duration 15 * 60 * 1000
  17. @rejections :fed_socket_rejections
  18. @doc """
  19. Retrieves a FedSocket from the Registry given it's origin.
  20. The origin is expected to be a string identifying the endpoint "example.com" or "example2.com:8080"
  21. Will return:
  22. * {:ok, fed_socket} for working FedSockets
  23. * {:error, :rejected} for origins that have been tried and refused within the rejection duration interval
  24. * {:error, some_reason} usually :missing for unknown origins
  25. """
  26. def get_fed_socket(origin) do
  27. case get_registry_data(origin) do
  28. {:error, reason} ->
  29. {:error, reason}
  30. {:ok, %{state: :connected} = socket_info} ->
  31. {:ok, socket_info}
  32. end
  33. end
  34. @doc """
  35. Adds a connected FedSocket to the Registry.
  36. Always returns {:ok, fed_socket}
  37. """
  38. def add_fed_socket(origin, pid \\ nil) do
  39. origin
  40. |> SocketInfo.build(pid)
  41. |> SocketInfo.connect()
  42. |> add_socket_info
  43. end
  44. defp add_socket_info(%{origin: origin, state: :connected} = socket_info) do
  45. case Registry.register(FedSockets.Registry, origin, socket_info) do
  46. {:ok, _owner} ->
  47. clear_prior_rejection(origin)
  48. Logger.debug("fedsocket added: #{inspect(origin)}")
  49. {:ok, socket_info}
  50. {:error, {:already_registered, _pid}} ->
  51. FedSocket.close(socket_info)
  52. existing_socket_info = Registry.lookup(FedSockets.Registry, origin)
  53. {:ok, existing_socket_info}
  54. _ ->
  55. {:error, :error_adding_socket}
  56. end
  57. end
  58. @doc """
  59. Mark this origin as having rejected a connection attempt.
  60. This will keep it from getting additional connection attempts
  61. for a period of time specified in the config.
  62. Always returns {:ok, new_reg_data}
  63. """
  64. def set_host_rejected(uri) do
  65. new_reg_data =
  66. uri
  67. |> SocketInfo.origin()
  68. |> get_or_create_registry_data()
  69. |> set_to_rejected()
  70. |> save_registry_data()
  71. {:ok, new_reg_data}
  72. end
  73. @doc """
  74. Retrieves the FedRegistryData from the Registry given it's origin.
  75. The origin is expected to be a string identifying the endpoint "example.com" or "example2.com:8080"
  76. Will return:
  77. * {:ok, fed_registry_data} for known origins
  78. * {:error, :missing} for uniknown origins
  79. * {:error, :cache_error} indicating some low level runtime issues
  80. """
  81. def get_registry_data(origin) do
  82. case Registry.lookup(FedSockets.Registry, origin) do
  83. [] ->
  84. if is_rejected?(origin) do
  85. Logger.debug("previously rejected fedsocket requested")
  86. {:error, :rejected}
  87. else
  88. {:error, :missing}
  89. end
  90. [{_pid, %{state: :connected} = socket_info}] ->
  91. {:ok, socket_info}
  92. _ ->
  93. {:error, :cache_error}
  94. end
  95. end
  96. @doc """
  97. Retrieves a map of all sockets from the Registry. The keys are the origins and the values are the corresponding SocketInfo
  98. """
  99. def list_all do
  100. (list_all_connected() ++ list_all_rejected())
  101. |> Enum.into(%{})
  102. end
  103. defp list_all_connected do
  104. FedSockets.Registry
  105. |> Registry.select([{{:"$1", :_, :"$3"}, [], [{{:"$1", :"$3"}}]}])
  106. end
  107. defp list_all_rejected do
  108. {:ok, keys} = Cachex.keys(@rejections)
  109. {:ok, registry_data} =
  110. Cachex.execute(@rejections, fn worker ->
  111. Enum.map(keys, fn k -> {k, Cachex.get!(worker, k)} end)
  112. end)
  113. registry_data
  114. end
  115. defp clear_prior_rejection(origin),
  116. do: Cachex.del(@rejections, origin)
  117. defp is_rejected?(origin) do
  118. case Cachex.get(@rejections, origin) do
  119. {:ok, nil} ->
  120. false
  121. {:ok, _} ->
  122. true
  123. end
  124. end
  125. defp get_or_create_registry_data(origin) do
  126. case get_registry_data(origin) do
  127. {:error, :missing} ->
  128. %SocketInfo{origin: origin}
  129. {:ok, socket_info} ->
  130. socket_info
  131. end
  132. end
  133. defp save_registry_data(%SocketInfo{origin: origin, state: :connected} = socket_info) do
  134. {:ok, true} = Registry.update_value(FedSockets.Registry, origin, fn _ -> socket_info end)
  135. socket_info
  136. end
  137. defp save_registry_data(%SocketInfo{origin: origin, state: :rejected} = socket_info) do
  138. rejection_expiration =
  139. Pleroma.Config.get([:fed_sockets, :rejection_duration], @default_rejection_duration)
  140. {:ok, true} = Cachex.put(@rejections, origin, socket_info, ttl: rejection_expiration)
  141. socket_info
  142. end
  143. defp set_to_rejected(%SocketInfo{} = socket_info),
  144. do: %SocketInfo{socket_info | state: :rejected}
  145. end