logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://hacktivis.me/git/pleroma.git

backup.ex (10457B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.User.Backup do
  5. use Ecto.Schema
  6. import Ecto.Changeset
  7. import Ecto.Query
  8. import Pleroma.Web.Gettext
  9. require Logger
  10. require Pleroma.Constants
  11. alias Pleroma.Activity
  12. alias Pleroma.Bookmark
  13. alias Pleroma.Repo
  14. alias Pleroma.User
  15. alias Pleroma.User.Backup.State
  16. alias Pleroma.Web.ActivityPub.ActivityPub
  17. alias Pleroma.Web.ActivityPub.Transmogrifier
  18. alias Pleroma.Web.ActivityPub.UserView
  19. alias Pleroma.Workers.BackupWorker
  20. @type t :: %__MODULE__{}
  21. schema "backups" do
  22. field(:content_type, :string)
  23. field(:file_name, :string)
  24. field(:file_size, :integer, default: 0)
  25. field(:processed, :boolean, default: false)
  26. field(:state, State, default: :invalid)
  27. field(:processed_number, :integer, default: 0)
  28. belongs_to(:user, User, type: FlakeId.Ecto.CompatType)
  29. timestamps()
  30. end
  31. @config_impl Application.compile_env(:pleroma, [__MODULE__, :config_impl], Pleroma.Config)
  32. def create(user, admin_id \\ nil) do
  33. with :ok <- validate_limit(user, admin_id),
  34. {:ok, backup} <- user |> new() |> Repo.insert() do
  35. BackupWorker.process(backup, admin_id)
  36. end
  37. end
  38. def new(user) do
  39. rand_str = :crypto.strong_rand_bytes(32) |> Base.url_encode64(padding: false)
  40. datetime = Calendar.NaiveDateTime.Format.iso8601_basic(NaiveDateTime.utc_now())
  41. name = "archive-#{user.nickname}-#{datetime}-#{rand_str}.zip"
  42. %__MODULE__{
  43. user_id: user.id,
  44. content_type: "application/zip",
  45. file_name: name,
  46. state: :pending
  47. }
  48. end
  49. def delete(backup) do
  50. uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])
  51. with :ok <- uploader.delete_file(Path.join("backups", backup.file_name)) do
  52. Repo.delete(backup)
  53. end
  54. end
  55. defp validate_limit(_user, admin_id) when is_binary(admin_id), do: :ok
  56. defp validate_limit(user, nil) do
  57. case get_last(user.id) do
  58. %__MODULE__{inserted_at: inserted_at} ->
  59. days = Pleroma.Config.get([__MODULE__, :limit_days])
  60. diff = Timex.diff(NaiveDateTime.utc_now(), inserted_at, :days)
  61. if diff > days do
  62. :ok
  63. else
  64. {:error,
  65. dngettext(
  66. "errors",
  67. "Last export was less than a day ago",
  68. "Last export was less than %{days} days ago",
  69. days,
  70. days: days
  71. )}
  72. end
  73. nil ->
  74. :ok
  75. end
  76. end
  77. def get_last(user_id) do
  78. __MODULE__
  79. |> where(user_id: ^user_id)
  80. |> order_by(desc: :id)
  81. |> limit(1)
  82. |> Repo.one()
  83. end
  84. def list(%User{id: user_id}) do
  85. __MODULE__
  86. |> where(user_id: ^user_id)
  87. |> order_by(desc: :id)
  88. |> Repo.all()
  89. end
  90. def remove_outdated(%__MODULE__{id: latest_id, user_id: user_id}) do
  91. __MODULE__
  92. |> where(user_id: ^user_id)
  93. |> where([b], b.id != ^latest_id)
  94. |> Repo.all()
  95. |> Enum.each(&BackupWorker.delete/1)
  96. end
  97. def get(id), do: Repo.get(__MODULE__, id)
  98. defp set_state(backup, state, processed_number \\ nil) do
  99. struct =
  100. %{state: state}
  101. |> Pleroma.Maps.put_if_present(:processed_number, processed_number)
  102. backup
  103. |> cast(struct, [:state, :processed_number])
  104. |> Repo.update()
  105. end
  106. def process(
  107. %__MODULE__{} = backup,
  108. processor_module \\ __MODULE__.Processor
  109. ) do
  110. set_state(backup, :running, 0)
  111. current_pid = self()
  112. task =
  113. Task.Supervisor.async_nolink(
  114. Pleroma.TaskSupervisor,
  115. processor_module,
  116. :do_process,
  117. [backup, current_pid]
  118. )
  119. wait_backup(backup, backup.processed_number, task)
  120. end
  121. defp wait_backup(backup, current_processed, task) do
  122. wait_time = @config_impl.get([__MODULE__, :process_wait_time])
  123. receive do
  124. {:progress, new_processed} ->
  125. total_processed = current_processed + new_processed
  126. set_state(backup, :running, total_processed)
  127. wait_backup(backup, total_processed, task)
  128. {:DOWN, _ref, _proc, _pid, reason} ->
  129. backup = get(backup.id)
  130. if reason != :normal do
  131. Logger.error("Backup #{backup.id} process ended abnormally: #{inspect(reason)}")
  132. {:ok, backup} = set_state(backup, :failed)
  133. cleanup(backup)
  134. {:error,
  135. %{
  136. backup: backup,
  137. reason: :exit,
  138. details: reason
  139. }}
  140. else
  141. {:ok, backup}
  142. end
  143. after
  144. wait_time ->
  145. Logger.error(
  146. "Backup #{backup.id} timed out after no response for #{wait_time}ms, terminating"
  147. )
  148. Task.Supervisor.terminate_child(Pleroma.TaskSupervisor, task.pid)
  149. {:ok, backup} = set_state(backup, :failed)
  150. cleanup(backup)
  151. {:error,
  152. %{
  153. backup: backup,
  154. reason: :timeout
  155. }}
  156. end
  157. end
  158. @files ['actor.json', 'outbox.json', 'likes.json', 'bookmarks.json']
  159. @spec export(Pleroma.User.Backup.t(), pid()) :: {:ok, String.t()} | :error
  160. def export(%__MODULE__{} = backup, caller_pid) do
  161. backup = Repo.preload(backup, :user)
  162. dir = backup_tempdir(backup)
  163. with :ok <- File.mkdir(dir),
  164. :ok <- actor(dir, backup.user, caller_pid),
  165. :ok <- statuses(dir, backup.user, caller_pid),
  166. :ok <- likes(dir, backup.user, caller_pid),
  167. :ok <- bookmarks(dir, backup.user, caller_pid),
  168. {:ok, zip_path} <- :zip.create(backup.file_name, @files, cwd: dir),
  169. {:ok, _} <- File.rm_rf(dir) do
  170. {:ok, zip_path}
  171. else
  172. _ -> :error
  173. end
  174. end
  175. def dir(name) do
  176. dir = Pleroma.Config.get([__MODULE__, :dir]) || System.tmp_dir!()
  177. Path.join(dir, name)
  178. end
  179. def upload(%__MODULE__{} = backup, zip_path) do
  180. uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])
  181. upload = %Pleroma.Upload{
  182. name: backup.file_name,
  183. tempfile: zip_path,
  184. content_type: backup.content_type,
  185. path: Path.join("backups", backup.file_name)
  186. }
  187. with {:ok, _} <- Pleroma.Uploaders.Uploader.put_file(uploader, upload),
  188. :ok <- File.rm(zip_path) do
  189. {:ok, upload}
  190. end
  191. end
  192. defp actor(dir, user, caller_pid) do
  193. with {:ok, json} <-
  194. UserView.render("user.json", %{user: user})
  195. |> Map.merge(%{"likes" => "likes.json", "bookmarks" => "bookmarks.json"})
  196. |> Jason.encode() do
  197. send(caller_pid, {:progress, 1})
  198. File.write(Path.join(dir, "actor.json"), json)
  199. end
  200. end
  201. defp write_header(file, name) do
  202. IO.write(
  203. file,
  204. """
  205. {
  206. "@context": "https://www.w3.org/ns/activitystreams",
  207. "id": "#{name}.json",
  208. "type": "OrderedCollection",
  209. "orderedItems": [
  210. """
  211. )
  212. end
  213. defp should_report?(num, chunk_size), do: rem(num, chunk_size) == 0
  214. defp backup_tempdir(backup) do
  215. name = String.trim_trailing(backup.file_name, ".zip")
  216. dir(name)
  217. end
  218. defp cleanup(backup) do
  219. dir = backup_tempdir(backup)
  220. File.rm_rf(dir)
  221. end
  222. defp write(query, dir, name, fun, caller_pid) do
  223. path = Path.join(dir, "#{name}.json")
  224. chunk_size = Pleroma.Config.get([__MODULE__, :process_chunk_size])
  225. with {:ok, file} <- File.open(path, [:write, :utf8]),
  226. :ok <- write_header(file, name) do
  227. total =
  228. query
  229. |> Pleroma.Repo.chunk_stream(chunk_size, _returns_as = :one, timeout: :infinity)
  230. |> Enum.reduce(0, fn i, acc ->
  231. with {:ok, data} <-
  232. (try do
  233. fun.(i)
  234. rescue
  235. e -> {:error, e}
  236. end),
  237. {:ok, str} <- Jason.encode(data),
  238. :ok <- IO.write(file, str <> ",\n") do
  239. if should_report?(acc + 1, chunk_size) do
  240. send(caller_pid, {:progress, chunk_size})
  241. end
  242. acc + 1
  243. else
  244. {:error, e} ->
  245. Logger.warning(
  246. "Error processing backup item: #{inspect(e)}\n The item is: #{inspect(i)}"
  247. )
  248. acc
  249. _ ->
  250. acc
  251. end
  252. end)
  253. send(caller_pid, {:progress, rem(total, chunk_size)})
  254. with :ok <- :file.pwrite(file, {:eof, -2}, "\n],\n \"totalItems\": #{total}}") do
  255. File.close(file)
  256. end
  257. end
  258. end
  259. defp bookmarks(dir, %{id: user_id} = _user, caller_pid) do
  260. Bookmark
  261. |> where(user_id: ^user_id)
  262. |> join(:inner, [b], activity in assoc(b, :activity))
  263. |> select([b, a], %{id: b.id, object: fragment("(?)->>'object'", a.data)})
  264. |> write(dir, "bookmarks", fn a -> {:ok, a.object} end, caller_pid)
  265. end
  266. defp likes(dir, user, caller_pid) do
  267. user.ap_id
  268. |> Activity.Queries.by_actor()
  269. |> Activity.Queries.by_type("Like")
  270. |> select([like], %{id: like.id, object: fragment("(?)->>'object'", like.data)})
  271. |> write(dir, "likes", fn a -> {:ok, a.object} end, caller_pid)
  272. end
  273. defp statuses(dir, user, caller_pid) do
  274. opts =
  275. %{}
  276. |> Map.put(:type, ["Create", "Announce"])
  277. |> Map.put(:actor_id, user.ap_id)
  278. [
  279. [Pleroma.Constants.as_public(), user.ap_id],
  280. User.following(user),
  281. Pleroma.List.memberships(user)
  282. ]
  283. |> Enum.concat()
  284. |> ActivityPub.fetch_activities_query(opts)
  285. |> write(
  286. dir,
  287. "outbox",
  288. fn a ->
  289. with {:ok, activity} <- Transmogrifier.prepare_outgoing(a.data) do
  290. {:ok, Map.delete(activity, "@context")}
  291. end
  292. end,
  293. caller_pid
  294. )
  295. end
  296. end
  297. defmodule Pleroma.User.Backup.ProcessorAPI do
  298. @callback do_process(%Pleroma.User.Backup{}, pid()) ::
  299. {:ok, %Pleroma.User.Backup{}} | {:error, any()}
  300. end
  301. defmodule Pleroma.User.Backup.Processor do
  302. @behaviour Pleroma.User.Backup.ProcessorAPI
  303. alias Pleroma.Repo
  304. alias Pleroma.User.Backup
  305. import Ecto.Changeset
  306. @impl true
  307. def do_process(backup, current_pid) do
  308. with {:ok, zip_file} <- Backup.export(backup, current_pid),
  309. {:ok, %{size: size}} <- File.stat(zip_file),
  310. {:ok, _upload} <- Backup.upload(backup, zip_file) do
  311. backup
  312. |> cast(
  313. %{
  314. file_size: size,
  315. processed: true,
  316. state: :complete
  317. },
  318. [:file_size, :processed, :state]
  319. )
  320. |> Repo.update()
  321. else
  322. e -> {:error, e}
  323. end
  324. end
  325. end