logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma
commit: 27110793a7d8e3aa6434bde63a1f687424ae779c
parent: 27e914955ebf150a08bcda04545f7df591dae151
Author: lain <lain@soykaf.club>
Date:   Fri,  4 Oct 2019 12:52:11 +0000

Merge branch 'featrue/job-monitor' into 'develop'

Add Pleroma.JobQueueMonitor

Closes #1274

See merge request pleroma/pleroma!1721

Diffstat:

MCHANGELOG.md1+
Mdocs/API/pleroma_api.md9+++++----
Mlib/pleroma/application.ex1+
Mlib/pleroma/healthcheck.ex8++++++++
Alib/pleroma/job_queue_monitor.ex78++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtest/healthcheck_test.exs9++++++++-
Atest/job_queue_monitor_test.exs70++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
7 files changed, 171 insertions(+), 5 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## [Unreleased] ### Added - Refreshing poll results for remote polls +- Job queue stats to the healthcheck page - Admin API: Add ability to require password reset - Mastodon API: Account entities now include `follow_requests_count` (planned Mastodon 3.x addition) - Pleroma API: `GET /api/v1/pleroma/accounts/:id/scrobbles` to get a list of recently scrobbled items diff --git a/docs/API/pleroma_api.md b/docs/API/pleroma_api.md @@ -317,7 +317,8 @@ See [Admin-API](admin_api.md) "active": 0, # active processes "idle": 0, # idle processes "memory_used": 0.00, # Memory used - "healthy": true # Instance state + "healthy": true, # Instance state + "job_queue_stats": {} # Job queue stats } ``` @@ -391,7 +392,7 @@ The status posting endpoint takes an additional parameter, `in_reply_to_conversa ### Update a file in a custom emoji pack * Method `POST` * Authentication: required -* Params: +* Params: * if the `action` is `add`, adds an emoji named `shortcode` to the pack `pack_name`, that means that the emoji file needs to be uploaded with the request (thus requiring it to be a multipart request) and be named `file`. @@ -408,7 +409,7 @@ The status posting endpoint takes an additional parameter, `in_reply_to_conversa ### Updates (replaces) pack metadata * Method `POST` * Authentication: required -* Params: +* Params: * `new_data`: new metadata to replace the old one * Response: JSON, updated "metadata" section of the pack and 200 status or 400 if there was a problem with the new metadata (the error is specified in the "error" part of the response JSON) @@ -417,7 +418,7 @@ The status posting endpoint takes an additional parameter, `in_reply_to_conversa ### Requests the instance to download the pack from another instance * Method `POST` * Authentication: required -* Params: +* Params: * `instance_address`: the address of the instance to download from * `pack_name`: the pack to download from that instance * Response: JSON, "ok" and 200 status if the pack was downloaded, or 500 if there were diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex @@ -42,6 +42,7 @@ defmodule Pleroma.Application do hackney_pool_children() ++ [ Pleroma.Stats, + Pleroma.JobQueueMonitor, {Oban, Pleroma.Config.get(Oban)} ] ++ task_children(@env) ++ diff --git a/lib/pleroma/healthcheck.ex b/lib/pleroma/healthcheck.ex @@ -14,6 +14,7 @@ defmodule Pleroma.Healthcheck do active: 0, idle: 0, memory_used: 0, + job_queue_stats: nil, healthy: true @type t :: %__MODULE__{ @@ -21,6 +22,7 @@ defmodule Pleroma.Healthcheck do active: non_neg_integer(), idle: non_neg_integer(), memory_used: number(), + job_queue_stats: map(), healthy: boolean() } @@ -30,6 +32,7 @@ defmodule Pleroma.Healthcheck do memory_used: Float.round(:erlang.memory(:total) / 1024 / 1024, 2) } |> assign_db_info() + |> assign_job_queue_stats() |> check_health() end @@ -55,6 +58,11 @@ defmodule Pleroma.Healthcheck do Map.merge(healthcheck, db_info) end + defp assign_job_queue_stats(healthcheck) do + stats = Pleroma.JobQueueMonitor.stats() + Map.put(healthcheck, :job_queue_stats, stats) + end + @spec check_health(Healthcheck.t()) :: Healthcheck.t() def check_health(%{pool_size: pool_size, active: active} = check) when active >= pool_size do diff --git a/lib/pleroma/job_queue_monitor.ex b/lib/pleroma/job_queue_monitor.ex @@ -0,0 +1,78 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.JobQueueMonitor do + use GenServer + + @initial_state %{workers: %{}, queues: %{}, processed_jobs: 0} + @queue %{processed_jobs: 0, success: 0, failure: 0} + @operation %{processed_jobs: 0, success: 0, failure: 0} + + def start_link(_) do + GenServer.start_link(__MODULE__, @initial_state, name: __MODULE__) + end + + @impl true + def init(state) do + :telemetry.attach("oban-monitor-failure", [:oban, :failure], &handle_event/4, nil) + :telemetry.attach("oban-monitor-success", [:oban, :success], &handle_event/4, nil) + + {:ok, state} + end + + def stats do + GenServer.call(__MODULE__, :stats) + end + + def handle_event([:oban, status], %{duration: duration}, meta, _) do + GenServer.cast(__MODULE__, {:process_event, status, duration, meta}) + end + + @impl true + def handle_call(:stats, _from, state) do + {:reply, state, state} + end + + @impl true + def handle_cast({:process_event, status, duration, meta}, state) do + state = + state + |> Map.update!(:workers, fn workers -> + workers + |> Map.put_new(meta.worker, %{}) + |> Map.update!(meta.worker, &update_worker(&1, status, meta, duration)) + end) + |> Map.update!(:queues, fn workers -> + workers + |> Map.put_new(meta.queue, @queue) + |> Map.update!(meta.queue, &update_queue(&1, status, meta, duration)) + end) + |> Map.update!(:processed_jobs, &(&1 + 1)) + + {:noreply, state} + end + + defp update_worker(worker, status, meta, duration) do + worker + |> Map.put_new(meta.args["op"], @operation) + |> Map.update!(meta.args["op"], &update_op(&1, status, meta, duration)) + end + + defp update_op(op, :enqueue, _meta, _duration) do + op + |> Map.update!(:enqueued, &(&1 + 1)) + end + + defp update_op(op, status, _meta, _duration) do + op + |> Map.update!(:processed_jobs, &(&1 + 1)) + |> Map.update!(status, &(&1 + 1)) + end + + defp update_queue(queue, status, _meta, _duration) do + queue + |> Map.update!(:processed_jobs, &(&1 + 1)) + |> Map.update!(status, &(&1 + 1)) + end +end diff --git a/test/healthcheck_test.exs b/test/healthcheck_test.exs @@ -9,7 +9,14 @@ defmodule Pleroma.HealthcheckTest do test "system_info/0" do result = Healthcheck.system_info() |> Map.from_struct() - assert Map.keys(result) == [:active, :healthy, :idle, :memory_used, :pool_size] + assert Map.keys(result) == [ + :active, + :healthy, + :idle, + :job_queue_stats, + :memory_used, + :pool_size + ] end describe "check_health/1" do diff --git a/test/job_queue_monitor_test.exs b/test/job_queue_monitor_test.exs @@ -0,0 +1,70 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.JobQueueMonitorTest do + use ExUnit.Case, async: true + + alias Pleroma.JobQueueMonitor + + @success {:process_event, :success, 1337, + %{ + args: %{"op" => "refresh_subscriptions"}, + attempt: 1, + id: 339, + max_attempts: 5, + queue: "federator_outgoing", + worker: "Pleroma.Workers.SubscriberWorker" + }} + + @failure {:process_event, :failure, 22_521_134, + %{ + args: %{"op" => "force_password_reset", "user_id" => "9nJG6n6Nbu7tj9GJX6"}, + attempt: 1, + error: %RuntimeError{message: "oops"}, + id: 345, + kind: :exception, + max_attempts: 1, + queue: "background", + stack: [ + {Pleroma.Workers.BackgroundWorker, :perform, 2, + [file: 'lib/pleroma/workers/background_worker.ex', line: 31]}, + {Oban.Queue.Executor, :safe_call, 1, + [file: 'lib/oban/queue/executor.ex', line: 42]}, + {:timer, :tc, 3, [file: 'timer.erl', line: 197]}, + {Oban.Queue.Executor, :call, 2, [file: 'lib/oban/queue/executor.ex', line: 23]}, + {Task.Supervised, :invoke_mfa, 2, [file: 'lib/task/supervised.ex', line: 90]}, + {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]} + ], + worker: "Pleroma.Workers.BackgroundWorker" + }} + + test "stats/0" do + assert %{processed_jobs: _, queues: _, workers: _} = JobQueueMonitor.stats() + end + + test "handle_cast/2" do + state = %{workers: %{}, queues: %{}, processed_jobs: 0} + + assert {:noreply, state} = JobQueueMonitor.handle_cast(@success, state) + assert {:noreply, state} = JobQueueMonitor.handle_cast(@failure, state) + assert {:noreply, state} = JobQueueMonitor.handle_cast(@success, state) + assert {:noreply, state} = JobQueueMonitor.handle_cast(@failure, state) + + assert state == %{ + processed_jobs: 4, + queues: %{ + "background" => %{failure: 2, processed_jobs: 2, success: 0}, + "federator_outgoing" => %{failure: 0, processed_jobs: 2, success: 2} + }, + workers: %{ + "Pleroma.Workers.BackgroundWorker" => %{ + "force_password_reset" => %{failure: 2, processed_jobs: 2, success: 0} + }, + "Pleroma.Workers.SubscriberWorker" => %{ + "refresh_subscriptions" => %{failure: 0, processed_jobs: 2, success: 2} + } + } + } + end +end