logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma

connections.ex (7027B)


      1 # Pleroma: A lightweight social networking server
      2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
      3 # SPDX-License-Identifier: AGPL-3.0-only
      4 
      5 defmodule Pleroma.Pool.Connections do
      6   use GenServer
      7 
      8   alias Pleroma.Config
      9   alias Pleroma.Gun
     10 
     11   require Logger
     12 
     13   @type domain :: String.t()
     14   @type conn :: Pleroma.Gun.Conn.t()
     15 
     16   @type t :: %__MODULE__{
     17           conns: %{domain() => conn()},
     18           opts: keyword()
     19         }
     20 
     21   defstruct conns: %{}, opts: []
     22 
     23   @spec start_link({atom(), keyword()}) :: {:ok, pid()}
     24   def start_link({name, opts}) do
     25     GenServer.start_link(__MODULE__, opts, name: name)
     26   end
     27 
     28   @impl true
     29   def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
     30 
     31   @spec checkin(String.t() | URI.t(), atom()) :: pid() | nil
     32   def checkin(url, name)
     33   def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name)
     34 
     35   def checkin(%URI{} = uri, name) do
     36     timeout = Config.get([:connections_pool, :checkin_timeout], 250)
     37 
     38     GenServer.call(name, {:checkin, uri}, timeout)
     39   end
     40 
     41   @spec alive?(atom()) :: boolean()
     42   def alive?(name) do
     43     if pid = Process.whereis(name) do
     44       Process.alive?(pid)
     45     else
     46       false
     47     end
     48   end
     49 
     50   @spec get_state(atom()) :: t()
     51   def get_state(name) do
     52     GenServer.call(name, :state)
     53   end
     54 
     55   @spec count(atom()) :: pos_integer()
     56   def count(name) do
     57     GenServer.call(name, :count)
     58   end
     59 
     60   @spec get_unused_conns(atom()) :: [{domain(), conn()}]
     61   def get_unused_conns(name) do
     62     GenServer.call(name, :unused_conns)
     63   end
     64 
     65   @spec checkout(pid(), pid(), atom()) :: :ok
     66   def checkout(conn, pid, name) do
     67     GenServer.cast(name, {:checkout, conn, pid})
     68   end
     69 
     70   @spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok
     71   def add_conn(name, key, conn) do
     72     GenServer.cast(name, {:add_conn, key, conn})
     73   end
     74 
     75   @spec remove_conn(atom(), String.t()) :: :ok
     76   def remove_conn(name, key) do
     77     GenServer.cast(name, {:remove_conn, key})
     78   end
     79 
     80   @impl true
     81   def handle_cast({:add_conn, key, conn}, state) do
     82     state = put_in(state.conns[key], conn)
     83 
     84     Process.monitor(conn.conn)
     85     {:noreply, state}
     86   end
     87 
     88   @impl true
     89   def handle_cast({:checkout, conn_pid, pid}, state) do
     90     state =
     91       with true <- Process.alive?(conn_pid),
     92            {key, conn} <- find_conn(state.conns, conn_pid),
     93            used_by <- List.keydelete(conn.used_by, pid, 0) do
     94         conn_state = if used_by == [], do: :idle, else: conn.conn_state
     95 
     96         put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
     97       else
     98         false ->
     99           Logger.debug("checkout for closed conn #{inspect(conn_pid)}")
    100           state
    101 
    102         nil ->
    103           Logger.debug("checkout for alive conn #{inspect(conn_pid)}, but is not in state")
    104           state
    105       end
    106 
    107     {:noreply, state}
    108   end
    109 
    110   @impl true
    111   def handle_cast({:remove_conn, key}, state) do
    112     state = put_in(state.conns, Map.delete(state.conns, key))
    113     {:noreply, state}
    114   end
    115 
    116   @impl true
    117   def handle_call({:checkin, uri}, from, state) do
    118     key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
    119 
    120     case state.conns[key] do
    121       %{conn: pid, gun_state: :up} = conn ->
    122         time = :os.system_time(:second)
    123         last_reference = time - conn.last_reference
    124         crf = crf(last_reference, 100, conn.crf)
    125 
    126         state =
    127           put_in(state.conns[key], %{
    128             conn
    129             | last_reference: time,
    130               crf: crf,
    131               conn_state: :active,
    132               used_by: [from | conn.used_by]
    133           })
    134 
    135         {:reply, pid, state}
    136 
    137       %{gun_state: :down} ->
    138         {:reply, nil, state}
    139 
    140       nil ->
    141         {:reply, nil, state}
    142     end
    143   end
    144 
    145   @impl true
    146   def handle_call(:state, _from, state), do: {:reply, state, state}
    147 
    148   @impl true
    149   def handle_call(:count, _from, state) do
    150     {:reply, Enum.count(state.conns), state}
    151   end
    152 
    153   @impl true
    154   def handle_call(:unused_conns, _from, state) do
    155     unused_conns =
    156       state.conns
    157       |> Enum.filter(&filter_conns/1)
    158       |> Enum.sort(&sort_conns/2)
    159 
    160     {:reply, unused_conns, state}
    161   end
    162 
    163   defp filter_conns({_, %{conn_state: :idle, used_by: []}}), do: true
    164   defp filter_conns(_), do: false
    165 
    166   defp sort_conns({_, c1}, {_, c2}) do
    167     c1.crf <= c2.crf and c1.last_reference <= c2.last_reference
    168   end
    169 
    170   @impl true
    171   def handle_info({:gun_up, conn_pid, _protocol}, state) do
    172     %{origin_host: host, origin_scheme: scheme, origin_port: port} = Gun.info(conn_pid)
    173 
    174     host =
    175       case :inet.ntoa(host) do
    176         {:error, :einval} -> host
    177         ip -> ip
    178       end
    179 
    180     key = "#{scheme}:#{host}:#{port}"
    181 
    182     state =
    183       with {key, conn} <- find_conn(state.conns, conn_pid, key),
    184            {true, key} <- {Process.alive?(conn_pid), key} do
    185         put_in(state.conns[key], %{
    186           conn
    187           | gun_state: :up,
    188             conn_state: :active,
    189             retries: 0
    190         })
    191       else
    192         {false, key} ->
    193           put_in(
    194             state.conns,
    195             Map.delete(state.conns, key)
    196           )
    197 
    198         nil ->
    199           :ok = Gun.close(conn_pid)
    200 
    201           state
    202       end
    203 
    204     {:noreply, state}
    205   end
    206 
    207   @impl true
    208   def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do
    209     retries = Config.get([:connections_pool, :retry], 1)
    210     # we can't get info on this pid, because pid is dead
    211     state =
    212       with {key, conn} <- find_conn(state.conns, conn_pid),
    213            {true, key} <- {Process.alive?(conn_pid), key} do
    214         if conn.retries == retries do
    215           :ok = Gun.close(conn.conn)
    216 
    217           put_in(
    218             state.conns,
    219             Map.delete(state.conns, key)
    220           )
    221         else
    222           put_in(state.conns[key], %{
    223             conn
    224             | gun_state: :down,
    225               retries: conn.retries + 1
    226           })
    227         end
    228       else
    229         {false, key} ->
    230           put_in(
    231             state.conns,
    232             Map.delete(state.conns, key)
    233           )
    234 
    235         nil ->
    236           Logger.debug(":gun_down for conn which isn't found in state")
    237 
    238           state
    239       end
    240 
    241     {:noreply, state}
    242   end
    243 
    244   @impl true
    245   def handle_info({:DOWN, _ref, :process, conn_pid, reason}, state) do
    246     Logger.debug("received DOWN message for #{inspect(conn_pid)} reason -> #{inspect(reason)}")
    247 
    248     state =
    249       with {key, conn} <- find_conn(state.conns, conn_pid) do
    250         Enum.each(conn.used_by, fn {pid, _ref} ->
    251           Process.exit(pid, reason)
    252         end)
    253 
    254         put_in(
    255           state.conns,
    256           Map.delete(state.conns, key)
    257         )
    258       else
    259         nil ->
    260           Logger.debug(":DOWN for conn which isn't found in state")
    261 
    262           state
    263       end
    264 
    265     {:noreply, state}
    266   end
    267 
    268   defp find_conn(conns, conn_pid) do
    269     Enum.find(conns, fn {_key, conn} ->
    270       conn.conn == conn_pid
    271     end)
    272   end
    273 
    274   defp find_conn(conns, conn_pid, conn_key) do
    275     Enum.find(conns, fn {key, conn} ->
    276       key == conn_key and conn.conn == conn_pid
    277     end)
    278   end
    279 
    280   def crf(current, steps, crf) do
    281     1 + :math.pow(0.5, current / steps) * crf
    282   end
    283 end