logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma

fetch_registry.ex (4032B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.FedSockets.FetchRegistry do
  5. @moduledoc """
  6. The FetchRegistry acts as a broker for fetch requests and return values.
  7. This allows calling processes to block while waiting for a reply.
  8. It doesn't impose it's own process instead using `Cachex` to handle fetches in process, allowing
  9. multi threaded processes to avoid bottlenecking.
  10. Normally outside modules will have no need to call or use the FetchRegistry themselves.
  11. The `Cachex` parameters can be controlled from the config. Since exact timeout intervals
  12. aren't necessary the following settings are used by default:
  13. config :pleroma, :fed_sockets,
  14. fed_socket_fetches: [
  15. default: 12_000,
  16. interval: 3_000,
  17. lazy: false
  18. ]
  19. """
  20. defmodule FetchRegistryData do
  21. defstruct uuid: nil,
  22. sent_json: nil,
  23. received_json: nil,
  24. sent_at: nil,
  25. received_at: nil
  26. end
  27. alias Ecto.UUID
  28. require Logger
  29. @fetches :fed_socket_fetches
  30. @doc """
  31. Registers a json request wth the FetchRegistry and returns the identifying UUID.
  32. """
  33. def register_fetch(json) do
  34. %FetchRegistryData{uuid: uuid} =
  35. json
  36. |> new_registry_data
  37. |> save_registry_data
  38. uuid
  39. end
  40. @doc """
  41. Reports on the status of a Fetch given the identifying UUID.
  42. Will return
  43. * {:ok, fetched_object} if a fetch has completed
  44. * {:error, :waiting} if a fetch is still pending
  45. * {:error, other_error} usually :missing to indicate a fetch that has timed out
  46. """
  47. def check_fetch(uuid) do
  48. case get_registry_data(uuid) do
  49. {:ok, %FetchRegistryData{received_at: nil}} ->
  50. {:error, :waiting}
  51. {:ok, %FetchRegistryData{} = reg_data} ->
  52. {:ok, reg_data}
  53. e ->
  54. e
  55. end
  56. end
  57. @doc """
  58. Retrieves the response to a fetch given the identifying UUID.
  59. The completed fetch will be deleted from the FetchRegistry
  60. Will return
  61. * {:ok, fetched_object} if a fetch has completed
  62. * {:error, :waiting} if a fetch is still pending
  63. * {:error, other_error} usually :missing to indicate a fetch that has timed out
  64. """
  65. def pop_fetch(uuid) do
  66. case check_fetch(uuid) do
  67. {:ok, %FetchRegistryData{received_json: received_json}} ->
  68. delete_registry_data(uuid)
  69. {:ok, received_json}
  70. e ->
  71. e
  72. end
  73. end
  74. @doc """
  75. This is called to register a fetch has returned.
  76. It expects the result data along with the UUID that was sent in the request
  77. Will return the fetched object or :error
  78. """
  79. def register_fetch_received(uuid, data) do
  80. case get_registry_data(uuid) do
  81. {:ok, %FetchRegistryData{received_at: nil} = reg_data} ->
  82. reg_data
  83. |> set_fetch_received(data)
  84. |> save_registry_data()
  85. {:ok, %FetchRegistryData{} = reg_data} ->
  86. Logger.warn("tried to add fetched data twice - #{uuid}")
  87. reg_data
  88. {:error, _} ->
  89. Logger.warn("Error adding fetch to registry - #{uuid}")
  90. :error
  91. end
  92. end
  93. defp new_registry_data(json) do
  94. %FetchRegistryData{
  95. uuid: UUID.generate(),
  96. sent_json: json,
  97. sent_at: :erlang.monotonic_time(:millisecond)
  98. }
  99. end
  100. defp get_registry_data(origin) do
  101. case Cachex.get(@fetches, origin) do
  102. {:ok, nil} ->
  103. {:error, :missing}
  104. {:ok, reg_data} ->
  105. {:ok, reg_data}
  106. _ ->
  107. {:error, :cache_error}
  108. end
  109. end
  110. defp set_fetch_received(%FetchRegistryData{} = reg_data, data),
  111. do: %FetchRegistryData{
  112. reg_data
  113. | received_at: :erlang.monotonic_time(:millisecond),
  114. received_json: data
  115. }
  116. defp save_registry_data(%FetchRegistryData{uuid: uuid} = reg_data) do
  117. {:ok, true} = Cachex.put(@fetches, uuid, reg_data)
  118. reg_data
  119. end
  120. defp delete_registry_data(origin),
  121. do: {:ok, true} = Cachex.del(@fetches, origin)
  122. end