logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://hacktivis.me/git/pleroma.git

database.ex (7570B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Mix.Tasks.Pleroma.Database do
  5. alias Pleroma.Conversation
  6. alias Pleroma.Maintenance
  7. alias Pleroma.Object
  8. alias Pleroma.Repo
  9. alias Pleroma.User
  10. require Logger
  11. require Pleroma.Constants
  12. import Ecto.Query
  13. import Mix.Pleroma
  14. use Mix.Task
  15. @shortdoc "A collection of database related tasks"
  16. @moduledoc File.read!("docs/administration/CLI_tasks/database.md")
  17. def run(["remove_embedded_objects" | args]) do
  18. {options, [], []} =
  19. OptionParser.parse(
  20. args,
  21. strict: [
  22. vacuum: :boolean
  23. ]
  24. )
  25. start_pleroma()
  26. Logger.info("Removing embedded objects")
  27. Repo.query!(
  28. "update activities set data = safe_jsonb_set(data, '{object}'::text[], data->'object'->'id') where data->'object'->>'id' is not null;",
  29. [],
  30. timeout: :infinity
  31. )
  32. if Keyword.get(options, :vacuum) do
  33. Maintenance.vacuum("full")
  34. end
  35. end
  36. def run(["bump_all_conversations"]) do
  37. start_pleroma()
  38. Conversation.bump_for_all_activities()
  39. end
  40. def run(["update_users_following_followers_counts"]) do
  41. start_pleroma()
  42. Repo.transaction(
  43. fn ->
  44. from(u in User, select: u)
  45. |> Repo.stream()
  46. |> Stream.each(&User.update_follower_count/1)
  47. |> Stream.run()
  48. end,
  49. timeout: :infinity
  50. )
  51. end
  52. def run(["prune_objects" | args]) do
  53. {options, [], []} =
  54. OptionParser.parse(
  55. args,
  56. strict: [
  57. vacuum: :boolean
  58. ]
  59. )
  60. start_pleroma()
  61. deadline = Pleroma.Config.get([:instance, :remote_post_retention_days])
  62. Logger.info("Pruning objects older than #{deadline} days")
  63. time_deadline =
  64. NaiveDateTime.utc_now()
  65. |> NaiveDateTime.add(-(deadline * 86_400))
  66. from(o in Object,
  67. where:
  68. fragment(
  69. "?->'to' \\? ? OR ?->'cc' \\? ?",
  70. o.data,
  71. ^Pleroma.Constants.as_public(),
  72. o.data,
  73. ^Pleroma.Constants.as_public()
  74. ),
  75. where: o.inserted_at < ^time_deadline,
  76. where:
  77. fragment("split_part(?->>'actor', '/', 3) != ?", o.data, ^Pleroma.Web.Endpoint.host())
  78. )
  79. |> Repo.delete_all(timeout: :infinity)
  80. prune_hashtags_query = """
  81. DELETE FROM hashtags AS ht
  82. WHERE NOT EXISTS (
  83. SELECT 1 FROM hashtags_objects hto
  84. WHERE ht.id = hto.hashtag_id)
  85. """
  86. Repo.query(prune_hashtags_query)
  87. if Keyword.get(options, :vacuum) do
  88. Maintenance.vacuum("full")
  89. end
  90. end
  91. def run(["fix_likes_collections"]) do
  92. start_pleroma()
  93. from(object in Object,
  94. where: fragment("(?)->>'likes' is not null", object.data),
  95. select: %{id: object.id, likes: fragment("(?)->>'likes'", object.data)}
  96. )
  97. |> Pleroma.Repo.chunk_stream(100, :batches)
  98. |> Stream.each(fn objects ->
  99. ids =
  100. objects
  101. |> Enum.filter(fn object -> object.likes |> Jason.decode!() |> is_map() end)
  102. |> Enum.map(& &1.id)
  103. Object
  104. |> where([object], object.id in ^ids)
  105. |> update([object],
  106. set: [
  107. data:
  108. fragment(
  109. "safe_jsonb_set(?, '{likes}', '[]'::jsonb, true)",
  110. object.data
  111. )
  112. ]
  113. )
  114. |> Repo.update_all([], timeout: :infinity)
  115. end)
  116. |> Stream.run()
  117. end
  118. def run(["vacuum", args]) do
  119. start_pleroma()
  120. Maintenance.vacuum(args)
  121. end
  122. def run(["ensure_expiration"]) do
  123. start_pleroma()
  124. days = Pleroma.Config.get([:mrf_activity_expiration, :days], 365)
  125. Pleroma.Activity
  126. |> join(:inner, [a], o in Object,
  127. on:
  128. fragment(
  129. "(?->>'id') = associated_object_id((?))",
  130. o.data,
  131. a.data
  132. )
  133. )
  134. |> where(local: true)
  135. |> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data))
  136. |> where([_a, o], fragment("?->>'type' = 'Note'", o.data))
  137. |> Pleroma.Repo.chunk_stream(100, :batches)
  138. |> Stream.each(fn activities ->
  139. Enum.each(activities, fn activity ->
  140. expires_at =
  141. activity.inserted_at
  142. |> DateTime.from_naive!("Etc/UTC")
  143. |> Timex.shift(days: days)
  144. Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
  145. activity_id: activity.id,
  146. expires_at: expires_at
  147. })
  148. end)
  149. end)
  150. |> Stream.run()
  151. end
  152. def run(["set_text_search_config", tsconfig]) do
  153. start_pleroma()
  154. %{rows: [[tsc]]} = Ecto.Adapters.SQL.query!(Pleroma.Repo, "SHOW default_text_search_config;")
  155. shell_info("Current default_text_search_config: #{tsc}")
  156. %{rows: [[db]]} = Ecto.Adapters.SQL.query!(Pleroma.Repo, "SELECT current_database();")
  157. shell_info("Update default_text_search_config: #{tsconfig}")
  158. %{messages: msg} =
  159. Ecto.Adapters.SQL.query!(
  160. Pleroma.Repo,
  161. "ALTER DATABASE #{db} SET default_text_search_config = '#{tsconfig}';"
  162. )
  163. # non-exist config will not raise exception but only give >0 messages
  164. if length(msg) > 0 do
  165. shell_info("Error: #{inspect(msg, pretty: true)}")
  166. else
  167. rum_enabled = Pleroma.Config.get([:database, :rum_enabled])
  168. shell_info("Recreate index, RUM: #{rum_enabled}")
  169. # Note SQL below needs to be kept up-to-date with latest GIN or RUM index definition in future
  170. if rum_enabled do
  171. Ecto.Adapters.SQL.query!(
  172. Pleroma.Repo,
  173. "CREATE OR REPLACE FUNCTION objects_fts_update() RETURNS trigger AS $$ BEGIN
  174. new.fts_content := to_tsvector(new.data->>'content');
  175. RETURN new;
  176. END
  177. $$ LANGUAGE plpgsql",
  178. [],
  179. timeout: :infinity
  180. )
  181. shell_info("Refresh RUM index")
  182. Ecto.Adapters.SQL.query!(Pleroma.Repo, "UPDATE objects SET updated_at = NOW();")
  183. else
  184. Ecto.Adapters.SQL.query!(Pleroma.Repo, "DROP INDEX IF EXISTS objects_fts;")
  185. Ecto.Adapters.SQL.query!(
  186. Pleroma.Repo,
  187. "CREATE INDEX CONCURRENTLY objects_fts ON objects USING gin(to_tsvector('#{tsconfig}', data->>'content')); ",
  188. [],
  189. timeout: :infinity
  190. )
  191. end
  192. shell_info('Done.')
  193. end
  194. end
  195. # Rolls back a specific migration (leaving subsequent migrations applied).
  196. # WARNING: imposes a risk of unrecoverable data loss — proceed at your own responsibility.
  197. # Based on https://stackoverflow.com/a/53825840
  198. def run(["rollback", version]) do
  199. prompt = "SEVERE WARNING: this operation may result in unrecoverable data loss. Continue?"
  200. if shell_prompt(prompt, "n") in ~w(Yn Y y) do
  201. {_, result, _} =
  202. Ecto.Migrator.with_repo(Pleroma.Repo, fn repo ->
  203. version = String.to_integer(version)
  204. re = ~r/^#{version}_.*\.exs/
  205. path = Ecto.Migrator.migrations_path(repo)
  206. with {_, "" <> file} <- {:find, Enum.find(File.ls!(path), &String.match?(&1, re))},
  207. {_, [{mod, _} | _]} <- {:compile, Code.compile_file(Path.join(path, file))},
  208. {_, :ok} <- {:rollback, Ecto.Migrator.down(repo, version, mod)} do
  209. {:ok, "Reversed migration: #{file}"}
  210. else
  211. {:find, _} -> {:error, "No migration found with version prefix: #{version}"}
  212. {:compile, e} -> {:error, "Problem compiling migration module: #{inspect(e)}"}
  213. {:rollback, e} -> {:error, "Problem reversing migration: #{inspect(e)}"}
  214. end
  215. end)
  216. shell_info(inspect(result))
  217. end
  218. end
  219. end