backup.ex (10457B)
- # Pleroma: A lightweight social networking server
- # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
- # SPDX-License-Identifier: AGPL-3.0-only
- defmodule Pleroma.User.Backup do
- use Ecto.Schema
- import Ecto.Changeset
- import Ecto.Query
- import Pleroma.Web.Gettext
- require Logger
- require Pleroma.Constants
- alias Pleroma.Activity
- alias Pleroma.Bookmark
- alias Pleroma.Repo
- alias Pleroma.User
- alias Pleroma.User.Backup.State
- alias Pleroma.Web.ActivityPub.ActivityPub
- alias Pleroma.Web.ActivityPub.Transmogrifier
- alias Pleroma.Web.ActivityPub.UserView
- alias Pleroma.Workers.BackupWorker
- @type t :: %__MODULE__{}
- schema "backups" do
- field(:content_type, :string)
- field(:file_name, :string)
- field(:file_size, :integer, default: 0)
- field(:processed, :boolean, default: false)
- field(:state, State, default: :invalid)
- field(:processed_number, :integer, default: 0)
- belongs_to(:user, User, type: FlakeId.Ecto.CompatType)
- timestamps()
- end
- @config_impl Application.compile_env(:pleroma, [__MODULE__, :config_impl], Pleroma.Config)
- def create(user, admin_id \\ nil) do
- with :ok <- validate_limit(user, admin_id),
- {:ok, backup} <- user |> new() |> Repo.insert() do
- BackupWorker.process(backup, admin_id)
- end
- end
- def new(user) do
- rand_str = :crypto.strong_rand_bytes(32) |> Base.url_encode64(padding: false)
- datetime = Calendar.NaiveDateTime.Format.iso8601_basic(NaiveDateTime.utc_now())
- name = "archive-#{user.nickname}-#{datetime}-#{rand_str}.zip"
- %__MODULE__{
- user_id: user.id,
- content_type: "application/zip",
- file_name: name,
- state: :pending
- }
- end
- def delete(backup) do
- uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])
- with :ok <- uploader.delete_file(Path.join("backups", backup.file_name)) do
- Repo.delete(backup)
- end
- end
- defp validate_limit(_user, admin_id) when is_binary(admin_id), do: :ok
- defp validate_limit(user, nil) do
- case get_last(user.id) do
- %__MODULE__{inserted_at: inserted_at} ->
- days = Pleroma.Config.get([__MODULE__, :limit_days])
- diff = Timex.diff(NaiveDateTime.utc_now(), inserted_at, :days)
- if diff > days do
- :ok
- else
- {:error,
- dngettext(
- "errors",
- "Last export was less than a day ago",
- "Last export was less than %{days} days ago",
- days,
- days: days
- )}
- end
- nil ->
- :ok
- end
- end
- def get_last(user_id) do
- __MODULE__
- |> where(user_id: ^user_id)
- |> order_by(desc: :id)
- |> limit(1)
- |> Repo.one()
- end
- def list(%User{id: user_id}) do
- __MODULE__
- |> where(user_id: ^user_id)
- |> order_by(desc: :id)
- |> Repo.all()
- end
- def remove_outdated(%__MODULE__{id: latest_id, user_id: user_id}) do
- __MODULE__
- |> where(user_id: ^user_id)
- |> where([b], b.id != ^latest_id)
- |> Repo.all()
- |> Enum.each(&BackupWorker.delete/1)
- end
- def get(id), do: Repo.get(__MODULE__, id)
- defp set_state(backup, state, processed_number \\ nil) do
- struct =
- %{state: state}
- |> Pleroma.Maps.put_if_present(:processed_number, processed_number)
- backup
- |> cast(struct, [:state, :processed_number])
- |> Repo.update()
- end
- def process(
- %__MODULE__{} = backup,
- processor_module \\ __MODULE__.Processor
- ) do
- set_state(backup, :running, 0)
- current_pid = self()
- task =
- Task.Supervisor.async_nolink(
- Pleroma.TaskSupervisor,
- processor_module,
- :do_process,
- [backup, current_pid]
- )
- wait_backup(backup, backup.processed_number, task)
- end
- defp wait_backup(backup, current_processed, task) do
- wait_time = @config_impl.get([__MODULE__, :process_wait_time])
- receive do
- {:progress, new_processed} ->
- total_processed = current_processed + new_processed
- set_state(backup, :running, total_processed)
- wait_backup(backup, total_processed, task)
- {:DOWN, _ref, _proc, _pid, reason} ->
- backup = get(backup.id)
- if reason != :normal do
- Logger.error("Backup #{backup.id} process ended abnormally: #{inspect(reason)}")
- {:ok, backup} = set_state(backup, :failed)
- cleanup(backup)
- {:error,
- %{
- backup: backup,
- reason: :exit,
- details: reason
- }}
- else
- {:ok, backup}
- end
- after
- wait_time ->
- Logger.error(
- "Backup #{backup.id} timed out after no response for #{wait_time}ms, terminating"
- )
- Task.Supervisor.terminate_child(Pleroma.TaskSupervisor, task.pid)
- {:ok, backup} = set_state(backup, :failed)
- cleanup(backup)
- {:error,
- %{
- backup: backup,
- reason: :timeout
- }}
- end
- end
- @files ['actor.json', 'outbox.json', 'likes.json', 'bookmarks.json']
- @spec export(Pleroma.User.Backup.t(), pid()) :: {:ok, String.t()} | :error
- def export(%__MODULE__{} = backup, caller_pid) do
- backup = Repo.preload(backup, :user)
- dir = backup_tempdir(backup)
- with :ok <- File.mkdir(dir),
- :ok <- actor(dir, backup.user, caller_pid),
- :ok <- statuses(dir, backup.user, caller_pid),
- :ok <- likes(dir, backup.user, caller_pid),
- :ok <- bookmarks(dir, backup.user, caller_pid),
- {:ok, zip_path} <- :zip.create(backup.file_name, @files, cwd: dir),
- {:ok, _} <- File.rm_rf(dir) do
- {:ok, zip_path}
- else
- _ -> :error
- end
- end
- def dir(name) do
- dir = Pleroma.Config.get([__MODULE__, :dir]) || System.tmp_dir!()
- Path.join(dir, name)
- end
- def upload(%__MODULE__{} = backup, zip_path) do
- uploader = Pleroma.Config.get([Pleroma.Upload, :uploader])
- upload = %Pleroma.Upload{
- name: backup.file_name,
- tempfile: zip_path,
- content_type: backup.content_type,
- path: Path.join("backups", backup.file_name)
- }
- with {:ok, _} <- Pleroma.Uploaders.Uploader.put_file(uploader, upload),
- :ok <- File.rm(zip_path) do
- {:ok, upload}
- end
- end
- defp actor(dir, user, caller_pid) do
- with {:ok, json} <-
- UserView.render("user.json", %{user: user})
- |> Map.merge(%{"likes" => "likes.json", "bookmarks" => "bookmarks.json"})
- |> Jason.encode() do
- send(caller_pid, {:progress, 1})
- File.write(Path.join(dir, "actor.json"), json)
- end
- end
- defp write_header(file, name) do
- IO.write(
- file,
- """
- {
- "@context": "https://www.w3.org/ns/activitystreams",
- "id": "#{name}.json",
- "type": "OrderedCollection",
- "orderedItems": [
- """
- )
- end
- defp should_report?(num, chunk_size), do: rem(num, chunk_size) == 0
- defp backup_tempdir(backup) do
- name = String.trim_trailing(backup.file_name, ".zip")
- dir(name)
- end
- defp cleanup(backup) do
- dir = backup_tempdir(backup)
- File.rm_rf(dir)
- end
- defp write(query, dir, name, fun, caller_pid) do
- path = Path.join(dir, "#{name}.json")
- chunk_size = Pleroma.Config.get([__MODULE__, :process_chunk_size])
- with {:ok, file} <- File.open(path, [:write, :utf8]),
- :ok <- write_header(file, name) do
- total =
- query
- |> Pleroma.Repo.chunk_stream(chunk_size, _returns_as = :one, timeout: :infinity)
- |> Enum.reduce(0, fn i, acc ->
- with {:ok, data} <-
- (try do
- fun.(i)
- rescue
- e -> {:error, e}
- end),
- {:ok, str} <- Jason.encode(data),
- :ok <- IO.write(file, str <> ",\n") do
- if should_report?(acc + 1, chunk_size) do
- send(caller_pid, {:progress, chunk_size})
- end
- acc + 1
- else
- {:error, e} ->
- Logger.warning(
- "Error processing backup item: #{inspect(e)}\n The item is: #{inspect(i)}"
- )
- acc
- _ ->
- acc
- end
- end)
- send(caller_pid, {:progress, rem(total, chunk_size)})
- with :ok <- :file.pwrite(file, {:eof, -2}, "\n],\n \"totalItems\": #{total}}") do
- File.close(file)
- end
- end
- end
- defp bookmarks(dir, %{id: user_id} = _user, caller_pid) do
- Bookmark
- |> where(user_id: ^user_id)
- |> join(:inner, [b], activity in assoc(b, :activity))
- |> select([b, a], %{id: b.id, object: fragment("(?)->>'object'", a.data)})
- |> write(dir, "bookmarks", fn a -> {:ok, a.object} end, caller_pid)
- end
- defp likes(dir, user, caller_pid) do
- user.ap_id
- |> Activity.Queries.by_actor()
- |> Activity.Queries.by_type("Like")
- |> select([like], %{id: like.id, object: fragment("(?)->>'object'", like.data)})
- |> write(dir, "likes", fn a -> {:ok, a.object} end, caller_pid)
- end
- defp statuses(dir, user, caller_pid) do
- opts =
- %{}
- |> Map.put(:type, ["Create", "Announce"])
- |> Map.put(:actor_id, user.ap_id)
- [
- [Pleroma.Constants.as_public(), user.ap_id],
- User.following(user),
- Pleroma.List.memberships(user)
- ]
- |> Enum.concat()
- |> ActivityPub.fetch_activities_query(opts)
- |> write(
- dir,
- "outbox",
- fn a ->
- with {:ok, activity} <- Transmogrifier.prepare_outgoing(a.data) do
- {:ok, Map.delete(activity, "@context")}
- end
- end,
- caller_pid
- )
- end
- end
- defmodule Pleroma.User.Backup.ProcessorAPI do
- @callback do_process(%Pleroma.User.Backup{}, pid()) ::
- {:ok, %Pleroma.User.Backup{}} | {:error, any()}
- end
- defmodule Pleroma.User.Backup.Processor do
- @behaviour Pleroma.User.Backup.ProcessorAPI
- alias Pleroma.Repo
- alias Pleroma.User.Backup
- import Ecto.Changeset
- @impl true
- def do_process(backup, current_pid) do
- with {:ok, zip_file} <- Backup.export(backup, current_pid),
- {:ok, %{size: size}} <- File.stat(zip_file),
- {:ok, _upload} <- Backup.upload(backup, zip_file) do
- backup
- |> cast(
- %{
- file_size: size,
- processed: true,
- state: :complete
- },
- [:file_size, :processed, :state]
- )
- |> Repo.update()
- else
- e -> {:error, e}
- end
- end
- end