logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://hacktivis.me/git/pleroma.git

job_queue_monitor.ex (2354B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.JobQueueMonitor do
  5. use GenServer
  6. @initial_state %{workers: %{}, queues: %{}, processed_jobs: 0}
  7. @queue %{processed_jobs: 0, success: 0, failure: 0}
  8. @operation %{processed_jobs: 0, success: 0, failure: 0}
  9. def start_link(_) do
  10. GenServer.start_link(__MODULE__, @initial_state, name: __MODULE__)
  11. end
  12. @impl true
  13. def init(state) do
  14. :telemetry.attach("oban-monitor-failure", [:oban, :job, :exception], &handle_event/4, nil)
  15. :telemetry.attach("oban-monitor-success", [:oban, :job, :stop], &handle_event/4, nil)
  16. {:ok, state}
  17. end
  18. def stats do
  19. GenServer.call(__MODULE__, :stats)
  20. end
  21. def handle_event([:oban, :job, event], %{duration: duration}, meta, _) do
  22. GenServer.cast(
  23. __MODULE__,
  24. {:process_event, mapping_status(event), duration, meta}
  25. )
  26. end
  27. @impl true
  28. def handle_call(:stats, _from, state) do
  29. {:reply, state, state}
  30. end
  31. @impl true
  32. def handle_cast({:process_event, status, duration, meta}, state) do
  33. state =
  34. state
  35. |> Map.update!(:workers, fn workers ->
  36. workers
  37. |> Map.put_new(meta.worker, %{})
  38. |> Map.update!(meta.worker, &update_worker(&1, status, meta, duration))
  39. end)
  40. |> Map.update!(:queues, fn workers ->
  41. workers
  42. |> Map.put_new(meta.queue, @queue)
  43. |> Map.update!(meta.queue, &update_queue(&1, status, meta, duration))
  44. end)
  45. |> Map.update!(:processed_jobs, &(&1 + 1))
  46. {:noreply, state}
  47. end
  48. defp update_worker(worker, status, meta, duration) do
  49. worker
  50. |> Map.put_new(meta.args["op"], @operation)
  51. |> Map.update!(meta.args["op"], &update_op(&1, status, meta, duration))
  52. end
  53. defp update_op(op, :enqueue, _meta, _duration) do
  54. op
  55. |> Map.update!(:enqueued, &(&1 + 1))
  56. end
  57. defp update_op(op, status, _meta, _duration) do
  58. op
  59. |> Map.update!(:processed_jobs, &(&1 + 1))
  60. |> Map.update!(status, &(&1 + 1))
  61. end
  62. defp update_queue(queue, status, _meta, _duration) do
  63. queue
  64. |> Map.update!(:processed_jobs, &(&1 + 1))
  65. |> Map.update!(status, &(&1 + 1))
  66. end
  67. defp mapping_status(:stop), do: :success
  68. defp mapping_status(:exception), do: :failure
  69. end