commit: 61a3b793165e92d6f23a2e59fb9b95e06737cf25
parent 895eea5c752212cb36fb6c908456cdd2f72ad2b7
Author: Mark Felder <feld@feld.me>
Date: Sat, 25 May 2024 14:20:47 -0400
Search backend healthcheck process
Diffstat:
8 files changed, 116 insertions(+), 2 deletions(-)
diff --git a/changelog.d/search-healthcheck.add b/changelog.d/search-healthcheck.add
@@ -0,0 +1 @@
+Monitoring of search backend health to control the processing of jobs in the search indexing Oban queue
diff --git a/config/config.exs b/config/config.exs
@@ -579,7 +579,7 @@ config :pleroma, Oban,
attachments_cleanup: 1,
new_users_digest: 1,
mute_expire: 5,
- search_indexing: 10,
+ search_indexing: [limit: 10, paused: true],
rich_media_expiration: 2
],
plugins: [Oban.Plugins.Pruner],
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
@@ -109,7 +109,8 @@ defmodule Pleroma.Application do
streamer_registry() ++
background_migrators() ++
shout_child(shout_enabled?()) ++
- [Pleroma.Gopher.Server]
+ [Pleroma.Gopher.Server] ++
+ [Pleroma.Search.Healthcheck]
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
# for other strategies and supported options
diff --git a/lib/pleroma/search.ex b/lib/pleroma/search.ex
@@ -14,4 +14,9 @@ defmodule Pleroma.Search do
search_module.search(options[:for_user], query, options)
end
+
+ def healthcheck_endpoints do
+ search_module = Pleroma.Config.get([Pleroma.Search, :module], Pleroma.Activity)
+ search_module.healthcheck_endpoints
+ end
end
diff --git a/lib/pleroma/search/database_search.ex b/lib/pleroma/search/database_search.ex
@@ -48,6 +48,9 @@ defmodule Pleroma.Search.DatabaseSearch do
@impl true
def remove_from_index(_object), do: :ok
+ @impl true
+ def healthcheck_endpoints, do: nil
+
def maybe_restrict_author(query, %User{} = author) do
Activity.Queries.by_author(query, author)
end
diff --git a/lib/pleroma/search/healthcheck.ex b/lib/pleroma/search/healthcheck.ex
@@ -0,0 +1,85 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2024 Pleroma Authors <https://pleroma.social/>
+# SPDX-License-Identifier: AGPL-3.0-only
+defmodule Pleroma.Search.Healthcheck do
+ @doc """
+ Monitors health of search backend to control processing of events based on health and availability.
+ """
+ use GenServer
+ require Logger
+
+ @tick :timer.seconds(60)
+ @queue :search_indexing
+
+ def start_link(_) do
+ GenServer.start_link(__MODULE__, [], name: __MODULE__)
+ end
+
+ @impl true
+ def init(_) do
+ state = %{healthy: false}
+ {:ok, state, {:continue, :start}}
+ end
+
+ @impl true
+ def handle_continue(:start, state) do
+ tick()
+ {:noreply, state}
+ end
+
+ @impl true
+ def handle_info(:check, state) do
+ urls = Pleroma.Search.healthcheck_endpoints()
+
+ new_state =
+ if healthy?(urls) do
+ Oban.resume_queue(queue: @queue)
+ Map.put(state, :healthy, true)
+ else
+ Oban.pause_queue(queue: @queue)
+ Map.put(state, :healthy, false)
+ end
+
+ maybe_log_state_change(state, new_state)
+
+ tick()
+ {:noreply, new_state}
+ end
+
+ @impl true
+ def handle_call(:check, _from, state) do
+ status = Map.get(state, :healthy)
+
+ {:reply, status, state, :hibernate}
+ end
+
+ defp healthy?([]), do: true
+
+ defp healthy?(urls) when is_list(urls) do
+ Enum.all?(
+ urls,
+ fn url ->
+ case Pleroma.HTTP.get(url) do
+ {:ok, %{status: 200}} -> true
+ _ -> false
+ end
+ end
+ )
+ end
+
+ defp healthy?(_), do: true
+
+ defp tick do
+ Process.send_after(self(), :check, @tick)
+ end
+
+ defp maybe_log_state_change(%{healthy: true}, %{healthy: false}) do
+ Logger.error("Pausing Oban queue #{@queue} due to search backend healthcheck failure")
+ end
+
+ defp maybe_log_state_change(%{healthy: false}, %{healthy: true}) do
+ Logger.info("Resuming Oban queue #{@queue} due to search backend healthcheck pass")
+ end
+
+ defp maybe_log_state_change(_, _), do: :ok
+end
diff --git a/lib/pleroma/search/meilisearch.ex b/lib/pleroma/search/meilisearch.ex
@@ -178,4 +178,15 @@ defmodule Pleroma.Search.Meilisearch do
def remove_from_index(object) do
meili_delete("/indexes/objects/documents/#{object.id}")
end
+
+ @impl true
+ def healthcheck_endpoints do
+ endpoint =
+ Config.get([Pleroma.Search.Meilisearch, :url])
+ |> URI.parse()
+ |> Map.put(:path, "/health")
+ |> URI.to_string()
+
+ [endpoint]
+ end
end
diff --git a/lib/pleroma/search/search_backend.ex b/lib/pleroma/search/search_backend.ex
@@ -21,4 +21,12 @@ defmodule Pleroma.Search.SearchBackend do
from index.
"""
@callback remove_from_index(object :: Pleroma.Object.t()) :: :ok | {:error, any()}
+
+ @doc """
+ Healthcheck endpoints of search backend infrastructure to monitor for controlling
+ processing of jobs in the Oban queue.
+
+ It is expected a 200 response is healthy and other responses are unhealthy.
+ """
+ @callback healthcheck_endpoints :: list() | nil
end