connections.ex (7027B)
1 # Pleroma: A lightweight social networking server 2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> 3 # SPDX-License-Identifier: AGPL-3.0-only 4 5 defmodule Pleroma.Pool.Connections do 6 use GenServer 7 8 alias Pleroma.Config 9 alias Pleroma.Gun 10 11 require Logger 12 13 @type domain :: String.t() 14 @type conn :: Pleroma.Gun.Conn.t() 15 16 @type t :: %__MODULE__{ 17 conns: %{domain() => conn()}, 18 opts: keyword() 19 } 20 21 defstruct conns: %{}, opts: [] 22 23 @spec start_link({atom(), keyword()}) :: {:ok, pid()} 24 def start_link({name, opts}) do 25 GenServer.start_link(__MODULE__, opts, name: name) 26 end 27 28 @impl true 29 def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}} 30 31 @spec checkin(String.t() | URI.t(), atom()) :: pid() | nil 32 def checkin(url, name) 33 def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name) 34 35 def checkin(%URI{} = uri, name) do 36 timeout = Config.get([:connections_pool, :checkin_timeout], 250) 37 38 GenServer.call(name, {:checkin, uri}, timeout) 39 end 40 41 @spec alive?(atom()) :: boolean() 42 def alive?(name) do 43 if pid = Process.whereis(name) do 44 Process.alive?(pid) 45 else 46 false 47 end 48 end 49 50 @spec get_state(atom()) :: t() 51 def get_state(name) do 52 GenServer.call(name, :state) 53 end 54 55 @spec count(atom()) :: pos_integer() 56 def count(name) do 57 GenServer.call(name, :count) 58 end 59 60 @spec get_unused_conns(atom()) :: [{domain(), conn()}] 61 def get_unused_conns(name) do 62 GenServer.call(name, :unused_conns) 63 end 64 65 @spec checkout(pid(), pid(), atom()) :: :ok 66 def checkout(conn, pid, name) do 67 GenServer.cast(name, {:checkout, conn, pid}) 68 end 69 70 @spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok 71 def add_conn(name, key, conn) do 72 GenServer.cast(name, {:add_conn, key, conn}) 73 end 74 75 @spec remove_conn(atom(), String.t()) :: :ok 76 def remove_conn(name, key) do 77 GenServer.cast(name, {:remove_conn, key}) 78 end 79 80 @impl true 81 def handle_cast({:add_conn, key, conn}, state) do 82 state = put_in(state.conns[key], conn) 83 84 Process.monitor(conn.conn) 85 {:noreply, state} 86 end 87 88 @impl true 89 def handle_cast({:checkout, conn_pid, pid}, state) do 90 state = 91 with true <- Process.alive?(conn_pid), 92 {key, conn} <- find_conn(state.conns, conn_pid), 93 used_by <- List.keydelete(conn.used_by, pid, 0) do 94 conn_state = if used_by == [], do: :idle, else: conn.conn_state 95 96 put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by}) 97 else 98 false -> 99 Logger.debug("checkout for closed conn #{inspect(conn_pid)}") 100 state 101 102 nil -> 103 Logger.debug("checkout for alive conn #{inspect(conn_pid)}, but is not in state") 104 state 105 end 106 107 {:noreply, state} 108 end 109 110 @impl true 111 def handle_cast({:remove_conn, key}, state) do 112 state = put_in(state.conns, Map.delete(state.conns, key)) 113 {:noreply, state} 114 end 115 116 @impl true 117 def handle_call({:checkin, uri}, from, state) do 118 key = "#{uri.scheme}:#{uri.host}:#{uri.port}" 119 120 case state.conns[key] do 121 %{conn: pid, gun_state: :up} = conn -> 122 time = :os.system_time(:second) 123 last_reference = time - conn.last_reference 124 crf = crf(last_reference, 100, conn.crf) 125 126 state = 127 put_in(state.conns[key], %{ 128 conn 129 | last_reference: time, 130 crf: crf, 131 conn_state: :active, 132 used_by: [from | conn.used_by] 133 }) 134 135 {:reply, pid, state} 136 137 %{gun_state: :down} -> 138 {:reply, nil, state} 139 140 nil -> 141 {:reply, nil, state} 142 end 143 end 144 145 @impl true 146 def handle_call(:state, _from, state), do: {:reply, state, state} 147 148 @impl true 149 def handle_call(:count, _from, state) do 150 {:reply, Enum.count(state.conns), state} 151 end 152 153 @impl true 154 def handle_call(:unused_conns, _from, state) do 155 unused_conns = 156 state.conns 157 |> Enum.filter(&filter_conns/1) 158 |> Enum.sort(&sort_conns/2) 159 160 {:reply, unused_conns, state} 161 end 162 163 defp filter_conns({_, %{conn_state: :idle, used_by: []}}), do: true 164 defp filter_conns(_), do: false 165 166 defp sort_conns({_, c1}, {_, c2}) do 167 c1.crf <= c2.crf and c1.last_reference <= c2.last_reference 168 end 169 170 @impl true 171 def handle_info({:gun_up, conn_pid, _protocol}, state) do 172 %{origin_host: host, origin_scheme: scheme, origin_port: port} = Gun.info(conn_pid) 173 174 host = 175 case :inet.ntoa(host) do 176 {:error, :einval} -> host 177 ip -> ip 178 end 179 180 key = "#{scheme}:#{host}:#{port}" 181 182 state = 183 with {key, conn} <- find_conn(state.conns, conn_pid, key), 184 {true, key} <- {Process.alive?(conn_pid), key} do 185 put_in(state.conns[key], %{ 186 conn 187 | gun_state: :up, 188 conn_state: :active, 189 retries: 0 190 }) 191 else 192 {false, key} -> 193 put_in( 194 state.conns, 195 Map.delete(state.conns, key) 196 ) 197 198 nil -> 199 :ok = Gun.close(conn_pid) 200 201 state 202 end 203 204 {:noreply, state} 205 end 206 207 @impl true 208 def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do 209 retries = Config.get([:connections_pool, :retry], 1) 210 # we can't get info on this pid, because pid is dead 211 state = 212 with {key, conn} <- find_conn(state.conns, conn_pid), 213 {true, key} <- {Process.alive?(conn_pid), key} do 214 if conn.retries == retries do 215 :ok = Gun.close(conn.conn) 216 217 put_in( 218 state.conns, 219 Map.delete(state.conns, key) 220 ) 221 else 222 put_in(state.conns[key], %{ 223 conn 224 | gun_state: :down, 225 retries: conn.retries + 1 226 }) 227 end 228 else 229 {false, key} -> 230 put_in( 231 state.conns, 232 Map.delete(state.conns, key) 233 ) 234 235 nil -> 236 Logger.debug(":gun_down for conn which isn't found in state") 237 238 state 239 end 240 241 {:noreply, state} 242 end 243 244 @impl true 245 def handle_info({:DOWN, _ref, :process, conn_pid, reason}, state) do 246 Logger.debug("received DOWN message for #{inspect(conn_pid)} reason -> #{inspect(reason)}") 247 248 state = 249 with {key, conn} <- find_conn(state.conns, conn_pid) do 250 Enum.each(conn.used_by, fn {pid, _ref} -> 251 Process.exit(pid, reason) 252 end) 253 254 put_in( 255 state.conns, 256 Map.delete(state.conns, key) 257 ) 258 else 259 nil -> 260 Logger.debug(":DOWN for conn which isn't found in state") 261 262 state 263 end 264 265 {:noreply, state} 266 end 267 268 defp find_conn(conns, conn_pid) do 269 Enum.find(conns, fn {_key, conn} -> 270 conn.conn == conn_pid 271 end) 272 end 273 274 defp find_conn(conns, conn_pid, conn_key) do 275 Enum.find(conns, fn {key, conn} -> 276 key == conn_key and conn.conn == conn_pid 277 end) 278 end 279 280 def crf(current, steps, crf) do 281 1 + :math.pow(0.5, current / steps) * crf 282 end 283 end