logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://hacktivis.me/git/pleroma.git

context_objects_deletion_migrator.ex (4193B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Migrators.ContextObjectsDeletionMigrator do
  5. defmodule State do
  6. use Pleroma.Migrators.Support.BaseMigratorState
  7. @impl Pleroma.Migrators.Support.BaseMigratorState
  8. defdelegate data_migration(), to: Pleroma.DataMigration, as: :delete_context_objects
  9. end
  10. use Pleroma.Migrators.Support.BaseMigrator
  11. alias Pleroma.Migrators.Support.BaseMigrator
  12. alias Pleroma.Object
  13. @doc "This migration removes objects created exclusively for contexts, containing only an `id` field."
  14. @impl BaseMigrator
  15. def feature_config_path, do: [:features, :delete_context_objects]
  16. @impl BaseMigrator
  17. def fault_rate_allowance, do: Config.get([:delete_context_objects, :fault_rate_allowance], 0)
  18. @impl BaseMigrator
  19. def perform do
  20. data_migration_id = data_migration_id()
  21. max_processed_id = get_stat(:max_processed_id, 0)
  22. Logger.info("Deleting context objects from `objects` (from oid: #{max_processed_id})...")
  23. query()
  24. |> where([object], object.id > ^max_processed_id)
  25. |> Repo.chunk_stream(100, :batches, timeout: :infinity)
  26. |> Stream.each(fn objects ->
  27. object_ids = Enum.map(objects, & &1.id)
  28. results = Enum.map(object_ids, &delete_context_object(&1))
  29. failed_ids =
  30. results
  31. |> Enum.filter(&(elem(&1, 0) == :error))
  32. |> Enum.map(&elem(&1, 1))
  33. chunk_affected_count =
  34. results
  35. |> Enum.filter(&(elem(&1, 0) == :ok))
  36. |> length()
  37. for failed_id <- failed_ids do
  38. _ =
  39. Repo.query(
  40. "INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <>
  41. "VALUES ($1, $2) ON CONFLICT DO NOTHING;",
  42. [data_migration_id, failed_id]
  43. )
  44. end
  45. _ =
  46. Repo.query(
  47. "DELETE FROM data_migration_failed_ids " <>
  48. "WHERE data_migration_id = $1 AND record_id = ANY($2)",
  49. [data_migration_id, object_ids -- failed_ids]
  50. )
  51. max_object_id = Enum.at(object_ids, -1)
  52. put_stat(:max_processed_id, max_object_id)
  53. increment_stat(:iteration_processed_count, length(object_ids))
  54. increment_stat(:processed_count, length(object_ids))
  55. increment_stat(:failed_count, length(failed_ids))
  56. increment_stat(:affected_count, chunk_affected_count)
  57. put_stat(:records_per_second, records_per_second())
  58. persist_state()
  59. # A quick and dirty approach to controlling the load this background migration imposes
  60. sleep_interval = Config.get([:delete_context_objects, :sleep_interval_ms], 0)
  61. Process.sleep(sleep_interval)
  62. end)
  63. |> Stream.run()
  64. end
  65. @impl BaseMigrator
  66. def query do
  67. # Context objects have no activity type, and only one field, `id`.
  68. # Only those context objects are without types.
  69. from(
  70. object in Object,
  71. where: fragment("(?)->'type' IS NULL", object.data),
  72. select: %{
  73. id: object.id
  74. }
  75. )
  76. end
  77. @spec delete_context_object(integer()) :: {:ok | :error, integer()}
  78. defp delete_context_object(id) do
  79. result =
  80. %Object{id: id}
  81. |> Repo.delete()
  82. |> elem(0)
  83. {result, id}
  84. end
  85. @impl BaseMigrator
  86. def retry_failed do
  87. data_migration_id = data_migration_id()
  88. failed_objects_query()
  89. |> Repo.chunk_stream(100, :one)
  90. |> Stream.each(fn object ->
  91. with {res, _} when res != :error <- delete_context_object(object.id) do
  92. _ =
  93. Repo.query(
  94. "DELETE FROM data_migration_failed_ids " <>
  95. "WHERE data_migration_id = $1 AND record_id = $2",
  96. [data_migration_id, object.id]
  97. )
  98. end
  99. end)
  100. |> Stream.run()
  101. put_stat(:failed_count, failures_count())
  102. persist_state()
  103. force_continue()
  104. end
  105. defp failed_objects_query do
  106. from(o in Object)
  107. |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"),
  108. on: dmf.record_id == o.id
  109. )
  110. |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id())
  111. |> order_by([o], asc: o.id)
  112. end
  113. end