context_objects_deletion_migrator.ex (4193B)
- # Pleroma: A lightweight social networking server
- # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
- # SPDX-License-Identifier: AGPL-3.0-only
- defmodule Pleroma.Migrators.ContextObjectsDeletionMigrator do
- defmodule State do
- use Pleroma.Migrators.Support.BaseMigratorState
- @impl Pleroma.Migrators.Support.BaseMigratorState
- defdelegate data_migration(), to: Pleroma.DataMigration, as: :delete_context_objects
- end
- use Pleroma.Migrators.Support.BaseMigrator
- alias Pleroma.Migrators.Support.BaseMigrator
- alias Pleroma.Object
- @doc "This migration removes objects created exclusively for contexts, containing only an `id` field."
- @impl BaseMigrator
- def feature_config_path, do: [:features, :delete_context_objects]
- @impl BaseMigrator
- def fault_rate_allowance, do: Config.get([:delete_context_objects, :fault_rate_allowance], 0)
- @impl BaseMigrator
- def perform do
- data_migration_id = data_migration_id()
- max_processed_id = get_stat(:max_processed_id, 0)
- Logger.info("Deleting context objects from `objects` (from oid: #{max_processed_id})...")
- query()
- |> where([object], object.id > ^max_processed_id)
- |> Repo.chunk_stream(100, :batches, timeout: :infinity)
- |> Stream.each(fn objects ->
- object_ids = Enum.map(objects, & &1.id)
- results = Enum.map(object_ids, &delete_context_object(&1))
- failed_ids =
- results
- |> Enum.filter(&(elem(&1, 0) == :error))
- |> Enum.map(&elem(&1, 1))
- chunk_affected_count =
- results
- |> Enum.filter(&(elem(&1, 0) == :ok))
- |> length()
- for failed_id <- failed_ids do
- _ =
- Repo.query(
- "INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <>
- "VALUES ($1, $2) ON CONFLICT DO NOTHING;",
- [data_migration_id, failed_id]
- )
- end
- _ =
- Repo.query(
- "DELETE FROM data_migration_failed_ids " <>
- "WHERE data_migration_id = $1 AND record_id = ANY($2)",
- [data_migration_id, object_ids -- failed_ids]
- )
- max_object_id = Enum.at(object_ids, -1)
- put_stat(:max_processed_id, max_object_id)
- increment_stat(:iteration_processed_count, length(object_ids))
- increment_stat(:processed_count, length(object_ids))
- increment_stat(:failed_count, length(failed_ids))
- increment_stat(:affected_count, chunk_affected_count)
- put_stat(:records_per_second, records_per_second())
- persist_state()
- # A quick and dirty approach to controlling the load this background migration imposes
- sleep_interval = Config.get([:delete_context_objects, :sleep_interval_ms], 0)
- Process.sleep(sleep_interval)
- end)
- |> Stream.run()
- end
- @impl BaseMigrator
- def query do
- # Context objects have no activity type, and only one field, `id`.
- # Only those context objects are without types.
- from(
- object in Object,
- where: fragment("(?)->'type' IS NULL", object.data),
- select: %{
- id: object.id
- }
- )
- end
- @spec delete_context_object(integer()) :: {:ok | :error, integer()}
- defp delete_context_object(id) do
- result =
- %Object{id: id}
- |> Repo.delete()
- |> elem(0)
- {result, id}
- end
- @impl BaseMigrator
- def retry_failed do
- data_migration_id = data_migration_id()
- failed_objects_query()
- |> Repo.chunk_stream(100, :one)
- |> Stream.each(fn object ->
- with {res, _} when res != :error <- delete_context_object(object.id) do
- _ =
- Repo.query(
- "DELETE FROM data_migration_failed_ids " <>
- "WHERE data_migration_id = $1 AND record_id = $2",
- [data_migration_id, object.id]
- )
- end
- end)
- |> Stream.run()
- put_stat(:failed_count, failures_count())
- persist_state()
- force_continue()
- end
- defp failed_objects_query do
- from(o in Object)
- |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"),
- on: dmf.record_id == o.id
- )
- |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id())
- |> order_by([o], asc: o.id)
- end
- end