logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://anongit.hacktivis.me/git/pleroma.git/

database.ex (11998B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Mix.Tasks.Pleroma.Database do
  5. alias Pleroma.Conversation
  6. alias Pleroma.Maintenance
  7. alias Pleroma.Object
  8. alias Pleroma.Repo
  9. alias Pleroma.User
  10. require Logger
  11. require Pleroma.Constants
  12. import Ecto.Query
  13. import Mix.Pleroma
  14. use Mix.Task
  15. @shortdoc "A collection of database related tasks"
  16. @moduledoc File.read!("docs/administration/CLI_tasks/database.md")
  17. def run(["remove_embedded_objects" | args]) do
  18. {options, [], []} =
  19. OptionParser.parse(
  20. args,
  21. strict: [
  22. vacuum: :boolean
  23. ]
  24. )
  25. start_pleroma()
  26. Logger.info("Removing embedded objects")
  27. Repo.query!(
  28. "update activities set data = safe_jsonb_set(data, '{object}'::text[], data->'object'->'id') where data->'object'->>'id' is not null;",
  29. [],
  30. timeout: :infinity
  31. )
  32. if Keyword.get(options, :vacuum) do
  33. Maintenance.vacuum("full")
  34. end
  35. end
  36. def run(["bump_all_conversations"]) do
  37. start_pleroma()
  38. Conversation.bump_for_all_activities()
  39. end
  40. def run(["update_users_following_followers_counts"]) do
  41. start_pleroma()
  42. Repo.transaction(
  43. fn ->
  44. from(u in User, select: u)
  45. |> Repo.stream()
  46. |> Stream.each(&User.update_follower_count/1)
  47. |> Stream.run()
  48. end,
  49. timeout: :infinity
  50. )
  51. end
  52. def run(["prune_objects" | args]) do
  53. {options, [], []} =
  54. OptionParser.parse(
  55. args,
  56. strict: [
  57. vacuum: :boolean,
  58. keep_threads: :boolean,
  59. keep_non_public: :boolean,
  60. prune_orphaned_activities: :boolean
  61. ]
  62. )
  63. start_pleroma()
  64. deadline = Pleroma.Config.get([:instance, :remote_post_retention_days])
  65. time_deadline = NaiveDateTime.utc_now() |> NaiveDateTime.add(-(deadline * 86_400))
  66. log_message = "Pruning objects older than #{deadline} days"
  67. log_message =
  68. if Keyword.get(options, :keep_non_public) do
  69. log_message <> ", keeping non public posts"
  70. else
  71. log_message
  72. end
  73. log_message =
  74. if Keyword.get(options, :keep_threads) do
  75. log_message <> ", keeping threads intact"
  76. else
  77. log_message
  78. end
  79. log_message =
  80. if Keyword.get(options, :prune_orphaned_activities) do
  81. log_message <> ", pruning orphaned activities"
  82. else
  83. log_message
  84. end
  85. log_message =
  86. if Keyword.get(options, :vacuum) do
  87. log_message <>
  88. ", doing a full vacuum (you shouldn't do this as a recurring maintanance task)"
  89. else
  90. log_message
  91. end
  92. Logger.info(log_message)
  93. if Keyword.get(options, :keep_threads) do
  94. # We want to delete objects from threads where
  95. # 1. the newest post is still old
  96. # 2. none of the activities is local
  97. # 3. none of the activities is bookmarked
  98. # 4. optionally none of the posts is non-public
  99. deletable_context =
  100. if Keyword.get(options, :keep_non_public) do
  101. Pleroma.Activity
  102. |> join(:left, [a], b in Pleroma.Bookmark, on: a.id == b.activity_id)
  103. |> group_by([a], fragment("? ->> 'context'::text", a.data))
  104. |> having(
  105. [a],
  106. not fragment(
  107. # Posts (checked on Create Activity) is non-public
  108. "bool_or((not(?->'to' \\? ? OR ?->'cc' \\? ?)) and ? ->> 'type' = 'Create')",
  109. a.data,
  110. ^Pleroma.Constants.as_public(),
  111. a.data,
  112. ^Pleroma.Constants.as_public(),
  113. a.data
  114. )
  115. )
  116. else
  117. Pleroma.Activity
  118. |> join(:left, [a], b in Pleroma.Bookmark, on: a.id == b.activity_id)
  119. |> group_by([a], fragment("? ->> 'context'::text", a.data))
  120. end
  121. |> having([a], max(a.updated_at) < ^time_deadline)
  122. |> having([a], not fragment("bool_or(?)", a.local))
  123. |> having([_, b], fragment("max(?::text) is null", b.id))
  124. |> select([a], fragment("? ->> 'context'::text", a.data))
  125. Pleroma.Object
  126. |> where([o], fragment("? ->> 'context'::text", o.data) in subquery(deletable_context))
  127. else
  128. if Keyword.get(options, :keep_non_public) do
  129. Pleroma.Object
  130. |> where(
  131. [o],
  132. fragment(
  133. "?->'to' \\? ? OR ?->'cc' \\? ?",
  134. o.data,
  135. ^Pleroma.Constants.as_public(),
  136. o.data,
  137. ^Pleroma.Constants.as_public()
  138. )
  139. )
  140. else
  141. Pleroma.Object
  142. end
  143. |> where([o], o.updated_at < ^time_deadline)
  144. |> where(
  145. [o],
  146. fragment("split_part(?->>'actor', '/', 3) != ?", o.data, ^Pleroma.Web.Endpoint.host())
  147. )
  148. end
  149. |> Repo.delete_all(timeout: :infinity)
  150. if !Keyword.get(options, :keep_threads) do
  151. # Without the --keep-threads option, it's possible that bookmarked
  152. # objects have been deleted. We remove the corresponding bookmarks.
  153. """
  154. delete from public.bookmarks
  155. where id in (
  156. select b.id from public.bookmarks b
  157. left join public.activities a on b.activity_id = a.id
  158. left join public.objects o on a."data" ->> 'object' = o.data ->> 'id'
  159. where o.id is null
  160. )
  161. """
  162. |> Repo.query([], timeout: :infinity)
  163. end
  164. if Keyword.get(options, :prune_orphaned_activities) do
  165. # Prune activities who link to a single object
  166. """
  167. delete from public.activities
  168. where id in (
  169. select a.id from public.activities a
  170. left join public.objects o on a.data ->> 'object' = o.data ->> 'id'
  171. left join public.activities a2 on a.data ->> 'object' = a2.data ->> 'id'
  172. left join public.users u on a.data ->> 'object' = u.ap_id
  173. where not a.local
  174. and jsonb_typeof(a."data" -> 'object') = 'string'
  175. and o.id is null
  176. and a2.id is null
  177. and u.id is null
  178. )
  179. """
  180. |> Repo.query([], timeout: :infinity)
  181. # Prune activities who link to an array of objects
  182. """
  183. delete from public.activities
  184. where id in (
  185. select a.id from public.activities a
  186. join json_array_elements_text((a."data" -> 'object')::json) as j on jsonb_typeof(a."data" -> 'object') = 'array'
  187. left join public.objects o on j.value = o.data ->> 'id'
  188. left join public.activities a2 on j.value = a2.data ->> 'id'
  189. left join public.users u on j.value = u.ap_id
  190. group by a.id
  191. having max(o.data ->> 'id') is null
  192. and max(a2.data ->> 'id') is null
  193. and max(u.ap_id) is null
  194. )
  195. """
  196. |> Repo.query([], timeout: :infinity)
  197. end
  198. """
  199. DELETE FROM hashtags AS ht
  200. WHERE NOT EXISTS (
  201. SELECT 1 FROM hashtags_objects hto
  202. WHERE ht.id = hto.hashtag_id)
  203. """
  204. |> Repo.query()
  205. if Keyword.get(options, :vacuum) do
  206. Maintenance.vacuum("full")
  207. end
  208. end
  209. def run(["fix_likes_collections"]) do
  210. start_pleroma()
  211. from(object in Object,
  212. where: fragment("(?)->>'likes' is not null", object.data),
  213. select: %{id: object.id, likes: fragment("(?)->>'likes'", object.data)}
  214. )
  215. |> Pleroma.Repo.chunk_stream(100, :batches)
  216. |> Stream.each(fn objects ->
  217. ids =
  218. objects
  219. |> Enum.filter(fn object -> object.likes |> Jason.decode!() |> is_map() end)
  220. |> Enum.map(& &1.id)
  221. Object
  222. |> where([object], object.id in ^ids)
  223. |> update([object],
  224. set: [
  225. data:
  226. fragment(
  227. "safe_jsonb_set(?, '{likes}', '[]'::jsonb, true)",
  228. object.data
  229. )
  230. ]
  231. )
  232. |> Repo.update_all([], timeout: :infinity)
  233. end)
  234. |> Stream.run()
  235. end
  236. def run(["vacuum", args]) do
  237. start_pleroma()
  238. Maintenance.vacuum(args)
  239. end
  240. def run(["ensure_expiration"]) do
  241. start_pleroma()
  242. days = Pleroma.Config.get([:mrf_activity_expiration, :days], 365)
  243. Pleroma.Activity
  244. |> join(:inner, [a], o in Object,
  245. on:
  246. fragment(
  247. "(?->>'id') = associated_object_id((?))",
  248. o.data,
  249. a.data
  250. )
  251. )
  252. |> where(local: true)
  253. |> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data))
  254. |> where([_a, o], fragment("?->>'type' = 'Note'", o.data))
  255. |> Pleroma.Repo.chunk_stream(100, :batches)
  256. |> Stream.each(fn activities ->
  257. Enum.each(activities, fn activity ->
  258. expires_at =
  259. activity.inserted_at
  260. |> DateTime.from_naive!("Etc/UTC")
  261. |> Timex.shift(days: days)
  262. Pleroma.Workers.PurgeExpiredActivity.enqueue(
  263. %{
  264. activity_id: activity.id
  265. },
  266. scheduled_at: expires_at
  267. )
  268. end)
  269. end)
  270. |> Stream.run()
  271. end
  272. def run(["set_text_search_config", tsconfig]) do
  273. start_pleroma()
  274. %{rows: [[tsc]]} = Ecto.Adapters.SQL.query!(Pleroma.Repo, "SHOW default_text_search_config;")
  275. shell_info("Current default_text_search_config: #{tsc}")
  276. %{rows: [[db]]} = Ecto.Adapters.SQL.query!(Pleroma.Repo, "SELECT current_database();")
  277. shell_info("Update default_text_search_config: #{tsconfig}")
  278. %{messages: msg} =
  279. Ecto.Adapters.SQL.query!(
  280. Pleroma.Repo,
  281. "ALTER DATABASE #{db} SET default_text_search_config = '#{tsconfig}';"
  282. )
  283. # non-exist config will not raise exception but only give >0 messages
  284. if length(msg) > 0 do
  285. shell_info("Error: #{inspect(msg, pretty: true)}")
  286. else
  287. rum_enabled = Pleroma.Config.get([:database, :rum_enabled])
  288. shell_info("Recreate index, RUM: #{rum_enabled}")
  289. # Note SQL below needs to be kept up-to-date with latest GIN or RUM index definition in future
  290. if rum_enabled do
  291. Ecto.Adapters.SQL.query!(
  292. Pleroma.Repo,
  293. "CREATE OR REPLACE FUNCTION objects_fts_update() RETURNS trigger AS $$ BEGIN
  294. new.fts_content := to_tsvector(new.data->>'content');
  295. RETURN new;
  296. END
  297. $$ LANGUAGE plpgsql",
  298. [],
  299. timeout: :infinity
  300. )
  301. shell_info("Refresh RUM index")
  302. Ecto.Adapters.SQL.query!(Pleroma.Repo, "UPDATE objects SET updated_at = NOW();")
  303. else
  304. Ecto.Adapters.SQL.query!(Pleroma.Repo, "DROP INDEX IF EXISTS objects_fts;")
  305. Ecto.Adapters.SQL.query!(
  306. Pleroma.Repo,
  307. "CREATE INDEX CONCURRENTLY objects_fts ON objects USING gin(to_tsvector('#{tsconfig}', data->>'content')); ",
  308. [],
  309. timeout: :infinity
  310. )
  311. end
  312. shell_info(~c"Done.")
  313. end
  314. end
  315. # Rolls back a specific migration (leaving subsequent migrations applied).
  316. # WARNING: imposes a risk of unrecoverable data loss — proceed at your own responsibility.
  317. # Based on https://stackoverflow.com/a/53825840
  318. def run(["rollback", version]) do
  319. prompt = "SEVERE WARNING: this operation may result in unrecoverable data loss. Continue?"
  320. if shell_prompt(prompt, "n") in ~w(Yn Y y) do
  321. {_, result, _} =
  322. Ecto.Migrator.with_repo(Pleroma.Repo, fn repo ->
  323. version = String.to_integer(version)
  324. re = ~r/^#{version}_.*\.exs/
  325. path = Ecto.Migrator.migrations_path(repo)
  326. with {_, "" <> file} <- {:find, Enum.find(File.ls!(path), &String.match?(&1, re))},
  327. {_, [{mod, _} | _]} <- {:compile, Code.compile_file(Path.join(path, file))},
  328. {_, :ok} <- {:rollback, Ecto.Migrator.down(repo, version, mod)} do
  329. {:ok, "Reversed migration: #{file}"}
  330. else
  331. {:find, _} -> {:error, "No migration found with version prefix: #{version}"}
  332. {:compile, e} -> {:error, "Problem compiling migration module: #{inspect(e)}"}
  333. {:rollback, e} -> {:error, "Problem reversing migration: #{inspect(e)}"}
  334. end
  335. end)
  336. shell_info(inspect(result))
  337. end
  338. end
  339. end