retry_queue.ex (6801B)
1 # Pleroma: A lightweight social networking server 2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> 3 # SPDX-License-Identifier: AGPL-3.0-only 4 5 defmodule Pleroma.Web.Federator.RetryQueue do 6 use GenServer 7 8 require Logger 9 10 def init(args) do 11 queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected]) 12 13 {:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}} 14 end 15 16 def start_link(_) do 17 enabled = 18 if Pleroma.Config.get(:env) == :test, 19 do: true, 20 else: Pleroma.Config.get([__MODULE__, :enabled], false) 21 22 if enabled do 23 Logger.info("Starting retry queue") 24 25 linkres = 26 GenServer.start_link( 27 __MODULE__, 28 %{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil}, 29 name: __MODULE__ 30 ) 31 32 maybe_kickoff_timer() 33 linkres 34 else 35 Logger.info("Retry queue disabled") 36 :ignore 37 end 38 end 39 40 def enqueue(data, transport, retries \\ 0) do 41 GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1}) 42 end 43 44 def get_stats do 45 GenServer.call(__MODULE__, :get_stats) 46 end 47 48 def reset_stats do 49 GenServer.call(__MODULE__, :reset_stats) 50 end 51 52 def get_retry_params(retries) do 53 if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do 54 {:drop, "Max retries reached"} 55 else 56 {:retry, growth_function(retries)} 57 end 58 end 59 60 def get_retry_timer_interval do 61 Pleroma.Config.get([:retry_queue, :interval], 1000) 62 end 63 64 defp ets_count_expires(table, current_time) do 65 :ets.select_count( 66 table, 67 [ 68 { 69 {:"$1", :"$2"}, 70 [{:"=<", :"$1", {:const, current_time}}], 71 [true] 72 } 73 ] 74 ) 75 end 76 77 defp ets_pop_n_expired(table, current_time, desired) do 78 {popped, _continuation} = 79 :ets.select( 80 table, 81 [ 82 { 83 {:"$1", :"$2"}, 84 [{:"=<", :"$1", {:const, current_time}}], 85 [:"$_"] 86 } 87 ], 88 desired 89 ) 90 91 popped 92 |> Enum.each(fn e -> 93 :ets.delete_object(table, e) 94 end) 95 96 popped 97 end 98 99 def maybe_start_job(running_jobs, queue_table) do 100 # we don't want to hit the ets or the DateTime more times than we have to 101 # could optimize slightly further by not using the count, and instead grabbing 102 # up to N objects early... 103 current_time = DateTime.to_unix(DateTime.utc_now()) 104 n_running_jobs = :sets.size(running_jobs) 105 106 if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do 107 n_ready_jobs = ets_count_expires(queue_table, current_time) 108 109 if n_ready_jobs > 0 do 110 # figure out how many we could start 111 available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs 112 start_n_jobs(running_jobs, queue_table, current_time, available_job_slots) 113 else 114 running_jobs 115 end 116 else 117 running_jobs 118 end 119 end 120 121 defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do 122 running_jobs 123 end 124 125 defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots) 126 when available_job_slots > 0 do 127 candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots) 128 129 candidates 130 |> List.foldl(running_jobs, fn {_, e}, rj -> 131 {:ok, pid} = Task.start(fn -> worker(e) end) 132 mref = Process.monitor(pid) 133 :sets.add_element(mref, rj) 134 end) 135 end 136 137 def worker({:send, data, transport, retries}) do 138 case transport.publish_one(data) do 139 {:ok, _} -> 140 GenServer.cast(__MODULE__, :inc_delivered) 141 :delivered 142 143 {:error, _reason} -> 144 enqueue(data, transport, retries) 145 :retry 146 end 147 end 148 149 def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do 150 {:reply, %{delivered: delivery_count, dropped: drop_count}, state} 151 end 152 153 def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do 154 {:reply, %{delivered: delivery_count, dropped: drop_count}, 155 %{state | delivered: 0, dropped: 0}} 156 end 157 158 def handle_cast(:reset_stats, state) do 159 {:noreply, %{state | delivered: 0, dropped: 0}} 160 end 161 162 def handle_cast( 163 {:maybe_enqueue, data, transport, retries}, 164 %{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state 165 ) do 166 case get_retry_params(retries) do 167 {:retry, timeout} -> 168 :ets.insert(queue_table, {timeout, {:send, data, transport, retries}}) 169 running_jobs = maybe_start_job(running_jobs, queue_table) 170 {:noreply, %{state | running_jobs: running_jobs}} 171 172 {:drop, message} -> 173 Logger.debug(message) 174 {:noreply, %{state | dropped: drop_count + 1}} 175 end 176 end 177 178 def handle_cast(:kickoff_timer, state) do 179 retry_interval = get_retry_timer_interval() 180 Process.send_after(__MODULE__, :retry_timer_run, retry_interval) 181 {:noreply, state} 182 end 183 184 def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do 185 {:noreply, %{state | delivered: delivery_count + 1}} 186 end 187 188 def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do 189 {:noreply, %{state | dropped: drop_count + 1}} 190 end 191 192 def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do 193 case transport.publish_one(data) do 194 {:ok, _} -> 195 {:noreply, %{state | delivered: delivery_count + 1}} 196 197 {:error, _reason} -> 198 enqueue(data, transport, retries) 199 {:noreply, state} 200 end 201 end 202 203 def handle_info( 204 :retry_timer_run, 205 %{queue_table: queue_table, running_jobs: running_jobs} = state 206 ) do 207 maybe_kickoff_timer() 208 running_jobs = maybe_start_job(running_jobs, queue_table) 209 {:noreply, %{state | running_jobs: running_jobs}} 210 end 211 212 def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do 213 %{running_jobs: running_jobs, queue_table: queue_table} = state 214 running_jobs = :sets.del_element(ref, running_jobs) 215 running_jobs = maybe_start_job(running_jobs, queue_table) 216 {:noreply, %{state | running_jobs: running_jobs}} 217 end 218 219 def handle_info(unknown, state) do 220 Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring") 221 {:noreply, state} 222 end 223 224 if Pleroma.Config.get(:env) == :test do 225 defp growth_function(_retries) do 226 _shutit = Pleroma.Config.get([__MODULE__, :initial_timeout]) 227 DateTime.to_unix(DateTime.utc_now()) - 1 228 end 229 else 230 defp growth_function(retries) do 231 round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) + 232 DateTime.to_unix(DateTime.utc_now()) 233 end 234 end 235 236 defp maybe_kickoff_timer do 237 GenServer.cast(__MODULE__, :kickoff_timer) 238 end 239 end