logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma

retry_queue.ex (6801B)


      1 # Pleroma: A lightweight social networking server
      2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
      3 # SPDX-License-Identifier: AGPL-3.0-only
      4 
      5 defmodule Pleroma.Web.Federator.RetryQueue do
      6   use GenServer
      7 
      8   require Logger
      9 
     10   def init(args) do
     11     queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected])
     12 
     13     {:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}}
     14   end
     15 
     16   def start_link(_) do
     17     enabled =
     18       if Pleroma.Config.get(:env) == :test,
     19         do: true,
     20         else: Pleroma.Config.get([__MODULE__, :enabled], false)
     21 
     22     if enabled do
     23       Logger.info("Starting retry queue")
     24 
     25       linkres =
     26         GenServer.start_link(
     27           __MODULE__,
     28           %{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil},
     29           name: __MODULE__
     30         )
     31 
     32       maybe_kickoff_timer()
     33       linkres
     34     else
     35       Logger.info("Retry queue disabled")
     36       :ignore
     37     end
     38   end
     39 
     40   def enqueue(data, transport, retries \\ 0) do
     41     GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
     42   end
     43 
     44   def get_stats do
     45     GenServer.call(__MODULE__, :get_stats)
     46   end
     47 
     48   def reset_stats do
     49     GenServer.call(__MODULE__, :reset_stats)
     50   end
     51 
     52   def get_retry_params(retries) do
     53     if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do
     54       {:drop, "Max retries reached"}
     55     else
     56       {:retry, growth_function(retries)}
     57     end
     58   end
     59 
     60   def get_retry_timer_interval do
     61     Pleroma.Config.get([:retry_queue, :interval], 1000)
     62   end
     63 
     64   defp ets_count_expires(table, current_time) do
     65     :ets.select_count(
     66       table,
     67       [
     68         {
     69           {:"$1", :"$2"},
     70           [{:"=<", :"$1", {:const, current_time}}],
     71           [true]
     72         }
     73       ]
     74     )
     75   end
     76 
     77   defp ets_pop_n_expired(table, current_time, desired) do
     78     {popped, _continuation} =
     79       :ets.select(
     80         table,
     81         [
     82           {
     83             {:"$1", :"$2"},
     84             [{:"=<", :"$1", {:const, current_time}}],
     85             [:"$_"]
     86           }
     87         ],
     88         desired
     89       )
     90 
     91     popped
     92     |> Enum.each(fn e ->
     93       :ets.delete_object(table, e)
     94     end)
     95 
     96     popped
     97   end
     98 
     99   def maybe_start_job(running_jobs, queue_table) do
    100     # we don't want to hit the ets or the DateTime more times than we have to
    101     # could optimize slightly further by not using the count, and instead grabbing
    102     # up to N objects early...
    103     current_time = DateTime.to_unix(DateTime.utc_now())
    104     n_running_jobs = :sets.size(running_jobs)
    105 
    106     if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do
    107       n_ready_jobs = ets_count_expires(queue_table, current_time)
    108 
    109       if n_ready_jobs > 0 do
    110         # figure out how many we could start
    111         available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs
    112         start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
    113       else
    114         running_jobs
    115       end
    116     else
    117       running_jobs
    118     end
    119   end
    120 
    121   defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do
    122     running_jobs
    123   end
    124 
    125   defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
    126        when available_job_slots > 0 do
    127     candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots)
    128 
    129     candidates
    130     |> List.foldl(running_jobs, fn {_, e}, rj ->
    131       {:ok, pid} = Task.start(fn -> worker(e) end)
    132       mref = Process.monitor(pid)
    133       :sets.add_element(mref, rj)
    134     end)
    135   end
    136 
    137   def worker({:send, data, transport, retries}) do
    138     case transport.publish_one(data) do
    139       {:ok, _} ->
    140         GenServer.cast(__MODULE__, :inc_delivered)
    141         :delivered
    142 
    143       {:error, _reason} ->
    144         enqueue(data, transport, retries)
    145         :retry
    146     end
    147   end
    148 
    149   def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
    150     {:reply, %{delivered: delivery_count, dropped: drop_count}, state}
    151   end
    152 
    153   def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
    154     {:reply, %{delivered: delivery_count, dropped: drop_count},
    155      %{state | delivered: 0, dropped: 0}}
    156   end
    157 
    158   def handle_cast(:reset_stats, state) do
    159     {:noreply, %{state | delivered: 0, dropped: 0}}
    160   end
    161 
    162   def handle_cast(
    163         {:maybe_enqueue, data, transport, retries},
    164         %{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state
    165       ) do
    166     case get_retry_params(retries) do
    167       {:retry, timeout} ->
    168         :ets.insert(queue_table, {timeout, {:send, data, transport, retries}})
    169         running_jobs = maybe_start_job(running_jobs, queue_table)
    170         {:noreply, %{state | running_jobs: running_jobs}}
    171 
    172       {:drop, message} ->
    173         Logger.debug(message)
    174         {:noreply, %{state | dropped: drop_count + 1}}
    175     end
    176   end
    177 
    178   def handle_cast(:kickoff_timer, state) do
    179     retry_interval = get_retry_timer_interval()
    180     Process.send_after(__MODULE__, :retry_timer_run, retry_interval)
    181     {:noreply, state}
    182   end
    183 
    184   def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do
    185     {:noreply, %{state | delivered: delivery_count + 1}}
    186   end
    187 
    188   def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do
    189     {:noreply, %{state | dropped: drop_count + 1}}
    190   end
    191 
    192   def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
    193     case transport.publish_one(data) do
    194       {:ok, _} ->
    195         {:noreply, %{state | delivered: delivery_count + 1}}
    196 
    197       {:error, _reason} ->
    198         enqueue(data, transport, retries)
    199         {:noreply, state}
    200     end
    201   end
    202 
    203   def handle_info(
    204         :retry_timer_run,
    205         %{queue_table: queue_table, running_jobs: running_jobs} = state
    206       ) do
    207     maybe_kickoff_timer()
    208     running_jobs = maybe_start_job(running_jobs, queue_table)
    209     {:noreply, %{state | running_jobs: running_jobs}}
    210   end
    211 
    212   def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
    213     %{running_jobs: running_jobs, queue_table: queue_table} = state
    214     running_jobs = :sets.del_element(ref, running_jobs)
    215     running_jobs = maybe_start_job(running_jobs, queue_table)
    216     {:noreply, %{state | running_jobs: running_jobs}}
    217   end
    218 
    219   def handle_info(unknown, state) do
    220     Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
    221     {:noreply, state}
    222   end
    223 
    224   if Pleroma.Config.get(:env) == :test do
    225     defp growth_function(_retries) do
    226       _shutit = Pleroma.Config.get([__MODULE__, :initial_timeout])
    227       DateTime.to_unix(DateTime.utc_now()) - 1
    228     end
    229   else
    230     defp growth_function(retries) do
    231       round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) +
    232         DateTime.to_unix(DateTime.utc_now())
    233     end
    234   end
    235 
    236   defp maybe_kickoff_timer do
    237     GenServer.cast(__MODULE__, :kickoff_timer)
    238   end
    239 end