logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma

jobs.ex (3904B)


      1 # Pleroma: A lightweight social networking server
      2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
      3 # SPDX-License-Identifier: AGPL-3.0-only
      4 
      5 defmodule Pleroma.Jobs do
      6   @moduledoc """
      7   A basic job queue
      8   """
      9   use GenServer
     10 
     11   require Logger
     12 
     13   def init(args) do
     14     {:ok, args}
     15   end
     16 
     17   def start_link do
     18     queues =
     19       Pleroma.Config.get(Pleroma.Jobs)
     20       |> Enum.map(fn {name, _} -> create_queue(name) end)
     21       |> Enum.into(%{})
     22 
     23     state = %{
     24       queues: queues,
     25       refs: %{}
     26     }
     27 
     28     GenServer.start_link(__MODULE__, state, name: __MODULE__)
     29   end
     30 
     31   def create_queue(name) do
     32     {name, {:sets.new(), []}}
     33   end
     34 
     35   @doc """
     36   Enqueues a job.
     37 
     38   Returns `:ok`.
     39 
     40   ## Arguments
     41 
     42   - `queue_name` - a queue name(must be specified in the config).
     43   - `mod` - a worker module (must have `perform` function).
     44   - `args` - a list of arguments for the `perform` function of the worker module.
     45   - `priority` - a job priority (`0` by default).
     46 
     47   ## Examples
     48 
     49   Enqueue `Module.perform/0` with `priority=1`:
     50 
     51       iex> Pleroma.Jobs.enqueue(:example_queue, Module, [])
     52       :ok
     53 
     54   Enqueue `Module.perform(:job_name)` with `priority=5`:
     55 
     56       iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:job_name], 5)
     57       :ok
     58 
     59   Enqueue `Module.perform(:another_job, data)` with `priority=1`:
     60 
     61       iex> data = "foobar"
     62       iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:another_job, data])
     63       :ok
     64 
     65   Enqueue `Module.perform(:foobar_job, :foo, :bar, 42)` with `priority=1`:
     66 
     67       iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:foobar_job, :foo, :bar, 42])
     68       :ok
     69 
     70   """
     71 
     72   def enqueue(queue_name, mod, args, priority \\ 1)
     73 
     74   if Mix.env() == :test do
     75     def enqueue(_queue_name, mod, args, _priority) do
     76       apply(mod, :perform, args)
     77     end
     78   else
     79     @spec enqueue(atom(), atom(), [any()], integer()) :: :ok
     80     def enqueue(queue_name, mod, args, priority) do
     81       GenServer.cast(__MODULE__, {:enqueue, queue_name, mod, args, priority})
     82     end
     83   end
     84 
     85   def handle_cast({:enqueue, queue_name, mod, args, priority}, state) do
     86     {running_jobs, queue} = state[:queues][queue_name]
     87 
     88     queue = enqueue_sorted(queue, {mod, args}, priority)
     89 
     90     state =
     91       state
     92       |> update_queue(queue_name, {running_jobs, queue})
     93       |> maybe_start_job(queue_name, running_jobs, queue)
     94 
     95     {:noreply, state}
     96   end
     97 
     98   def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
     99     queue_name = state.refs[ref]
    100 
    101     {running_jobs, queue} = state[:queues][queue_name]
    102 
    103     running_jobs = :sets.del_element(ref, running_jobs)
    104 
    105     state =
    106       state
    107       |> remove_ref(ref)
    108       |> update_queue(queue_name, {running_jobs, queue})
    109       |> maybe_start_job(queue_name, running_jobs, queue)
    110 
    111     {:noreply, state}
    112   end
    113 
    114   def maybe_start_job(state, queue_name, running_jobs, queue) do
    115     if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, queue_name, :max_jobs]) &&
    116          queue != [] do
    117       {{mod, args}, queue} = queue_pop(queue)
    118       {:ok, pid} = Task.start(fn -> apply(mod, :perform, args) end)
    119       mref = Process.monitor(pid)
    120 
    121       state
    122       |> add_ref(queue_name, mref)
    123       |> update_queue(queue_name, {:sets.add_element(mref, running_jobs), queue})
    124     else
    125       state
    126     end
    127   end
    128 
    129   def enqueue_sorted(queue, element, priority) do
    130     [%{item: element, priority: priority} | queue]
    131     |> Enum.sort_by(fn %{priority: priority} -> priority end)
    132   end
    133 
    134   def queue_pop([%{item: element} | queue]) do
    135     {element, queue}
    136   end
    137 
    138   defp add_ref(state, queue_name, ref) do
    139     refs = Map.put(state[:refs], ref, queue_name)
    140     Map.put(state, :refs, refs)
    141   end
    142 
    143   defp remove_ref(state, ref) do
    144     refs = Map.delete(state[:refs], ref)
    145     Map.put(state, :refs, refs)
    146   end
    147 
    148   defp update_queue(state, queue_name, data) do
    149     queues = Map.put(state[:queues], queue_name, data)
    150     Map.put(state, :queues, queues)
    151   end
    152 end