logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://hacktivis.me/git/pleroma.git

connection_pool.ex (2594B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Gun.ConnectionPool do
  5. @registry __MODULE__
  6. alias Pleroma.Gun.ConnectionPool.WorkerSupervisor
  7. def children do
  8. [
  9. {Registry, keys: :unique, name: @registry},
  10. Pleroma.Gun.ConnectionPool.WorkerSupervisor
  11. ]
  12. end
  13. @spec get_conn(URI.t(), keyword()) :: {:ok, pid()} | {:error, term()}
  14. def get_conn(uri, opts) do
  15. key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
  16. case Registry.lookup(@registry, key) do
  17. # The key has already been registered, but connection is not up yet
  18. [{worker_pid, nil}] ->
  19. get_gun_pid_from_worker(worker_pid, true)
  20. [{worker_pid, {gun_pid, _used_by, _crf, _last_reference}}] ->
  21. GenServer.call(worker_pid, :add_client)
  22. {:ok, gun_pid}
  23. [] ->
  24. # :gun.set_owner fails in :connected state for whatevever reason,
  25. # so we open the connection in the process directly and send it's pid back
  26. # We trust gun to handle timeouts by itself
  27. case WorkerSupervisor.start_worker([key, uri, opts, self()]) do
  28. {:ok, worker_pid} ->
  29. get_gun_pid_from_worker(worker_pid, false)
  30. {:error, {:already_started, worker_pid}} ->
  31. get_gun_pid_from_worker(worker_pid, true)
  32. err ->
  33. err
  34. end
  35. end
  36. end
  37. defp get_gun_pid_from_worker(worker_pid, register) do
  38. # GenServer.call will block the process for timeout length if
  39. # the server crashes on startup (which will happen if gun fails to connect)
  40. # so instead we use cast + monitor
  41. ref = Process.monitor(worker_pid)
  42. if register, do: GenServer.cast(worker_pid, {:add_client, self()})
  43. receive do
  44. {:conn_pid, pid} ->
  45. Process.demonitor(ref)
  46. {:ok, pid}
  47. {:DOWN, ^ref, :process, ^worker_pid, reason} ->
  48. case reason do
  49. {:shutdown, {:error, _} = error} -> error
  50. {:shutdown, error} -> {:error, error}
  51. _ -> {:error, reason}
  52. end
  53. end
  54. end
  55. @spec release_conn(pid()) :: :ok
  56. def release_conn(conn_pid) do
  57. # :ets.fun2ms(fn {_, {worker_pid, {gun_pid, _, _, _}}} when gun_pid == conn_pid ->
  58. # worker_pid end)
  59. query_result =
  60. Registry.select(@registry, [
  61. {{:_, :"$1", {:"$2", :_, :_, :_}}, [{:==, :"$2", conn_pid}], [:"$1"]}
  62. ])
  63. case query_result do
  64. [worker_pid] ->
  65. GenServer.call(worker_pid, :remove_client)
  66. [] ->
  67. :ok
  68. end
  69. end
  70. end