jobs.ex (3904B)
1 # Pleroma: A lightweight social networking server 2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> 3 # SPDX-License-Identifier: AGPL-3.0-only 4 5 defmodule Pleroma.Jobs do 6 @moduledoc """ 7 A basic job queue 8 """ 9 use GenServer 10 11 require Logger 12 13 def init(args) do 14 {:ok, args} 15 end 16 17 def start_link do 18 queues = 19 Pleroma.Config.get(Pleroma.Jobs) 20 |> Enum.map(fn {name, _} -> create_queue(name) end) 21 |> Enum.into(%{}) 22 23 state = %{ 24 queues: queues, 25 refs: %{} 26 } 27 28 GenServer.start_link(__MODULE__, state, name: __MODULE__) 29 end 30 31 def create_queue(name) do 32 {name, {:sets.new(), []}} 33 end 34 35 @doc """ 36 Enqueues a job. 37 38 Returns `:ok`. 39 40 ## Arguments 41 42 - `queue_name` - a queue name(must be specified in the config). 43 - `mod` - a worker module (must have `perform` function). 44 - `args` - a list of arguments for the `perform` function of the worker module. 45 - `priority` - a job priority (`0` by default). 46 47 ## Examples 48 49 Enqueue `Module.perform/0` with `priority=1`: 50 51 iex> Pleroma.Jobs.enqueue(:example_queue, Module, []) 52 :ok 53 54 Enqueue `Module.perform(:job_name)` with `priority=5`: 55 56 iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:job_name], 5) 57 :ok 58 59 Enqueue `Module.perform(:another_job, data)` with `priority=1`: 60 61 iex> data = "foobar" 62 iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:another_job, data]) 63 :ok 64 65 Enqueue `Module.perform(:foobar_job, :foo, :bar, 42)` with `priority=1`: 66 67 iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:foobar_job, :foo, :bar, 42]) 68 :ok 69 70 """ 71 72 def enqueue(queue_name, mod, args, priority \\ 1) 73 74 if Mix.env() == :test do 75 def enqueue(_queue_name, mod, args, _priority) do 76 apply(mod, :perform, args) 77 end 78 else 79 @spec enqueue(atom(), atom(), [any()], integer()) :: :ok 80 def enqueue(queue_name, mod, args, priority) do 81 GenServer.cast(__MODULE__, {:enqueue, queue_name, mod, args, priority}) 82 end 83 end 84 85 def handle_cast({:enqueue, queue_name, mod, args, priority}, state) do 86 {running_jobs, queue} = state[:queues][queue_name] 87 88 queue = enqueue_sorted(queue, {mod, args}, priority) 89 90 state = 91 state 92 |> update_queue(queue_name, {running_jobs, queue}) 93 |> maybe_start_job(queue_name, running_jobs, queue) 94 95 {:noreply, state} 96 end 97 98 def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do 99 queue_name = state.refs[ref] 100 101 {running_jobs, queue} = state[:queues][queue_name] 102 103 running_jobs = :sets.del_element(ref, running_jobs) 104 105 state = 106 state 107 |> remove_ref(ref) 108 |> update_queue(queue_name, {running_jobs, queue}) 109 |> maybe_start_job(queue_name, running_jobs, queue) 110 111 {:noreply, state} 112 end 113 114 def maybe_start_job(state, queue_name, running_jobs, queue) do 115 if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, queue_name, :max_jobs]) && 116 queue != [] do 117 {{mod, args}, queue} = queue_pop(queue) 118 {:ok, pid} = Task.start(fn -> apply(mod, :perform, args) end) 119 mref = Process.monitor(pid) 120 121 state 122 |> add_ref(queue_name, mref) 123 |> update_queue(queue_name, {:sets.add_element(mref, running_jobs), queue}) 124 else 125 state 126 end 127 end 128 129 def enqueue_sorted(queue, element, priority) do 130 [%{item: element, priority: priority} | queue] 131 |> Enum.sort_by(fn %{priority: priority} -> priority end) 132 end 133 134 def queue_pop([%{item: element} | queue]) do 135 {element, queue} 136 end 137 138 defp add_ref(state, queue_name, ref) do 139 refs = Map.put(state[:refs], ref, queue_name) 140 Map.put(state, :refs, refs) 141 end 142 143 defp remove_ref(state, ref) do 144 refs = Map.delete(state[:refs], ref) 145 Map.put(state, :refs, refs) 146 end 147 148 defp update_queue(state, queue_name, data) do 149 queues = Map.put(state[:queues], queue_name, data) 150 Map.put(state, :queues, queues) 151 end 152 end