logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://hacktivis.me/git/pleroma.git

repo.ex (2741B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Repo do
  5. use Ecto.Repo,
  6. otp_app: :pleroma,
  7. adapter: Ecto.Adapters.Postgres,
  8. migration_timestamps: [type: :naive_datetime_usec]
  9. import Ecto.Query
  10. require Logger
  11. @doc """
  12. Dynamically loads the repository url from the
  13. DATABASE_URL environment variable.
  14. """
  15. def init(_, opts) do
  16. {:ok, Keyword.put(opts, :url, System.get_env("DATABASE_URL"))}
  17. end
  18. @doc "find resource based on prepared query"
  19. @spec find_resource(Ecto.Query.t()) :: {:ok, struct()} | {:error, :not_found}
  20. def find_resource(%Ecto.Query{} = query) do
  21. case __MODULE__.one(query) do
  22. nil -> {:error, :not_found}
  23. resource -> {:ok, resource}
  24. end
  25. end
  26. def find_resource(_query), do: {:error, :not_found}
  27. @doc """
  28. Gets association from cache or loads if need
  29. ## Examples
  30. iex> Repo.get_assoc(token, :user)
  31. %User{}
  32. """
  33. @spec get_assoc(struct(), atom()) :: {:ok, struct()} | {:error, :not_found}
  34. def get_assoc(resource, association) do
  35. case __MODULE__.preload(resource, association) do
  36. %{^association => assoc} when not is_nil(assoc) -> {:ok, assoc}
  37. _ -> {:error, :not_found}
  38. end
  39. end
  40. @doc """
  41. Returns a lazy enumerable that emits all entries from the data store matching the given query.
  42. `returns_as` use to group records. use the `batches` option to fetch records in bulk.
  43. ## Examples
  44. # fetch records one-by-one
  45. iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500)
  46. # fetch records in bulk
  47. iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500, :batches)
  48. """
  49. @spec chunk_stream(Ecto.Query.t(), integer(), atom()) :: Enumerable.t()
  50. def chunk_stream(query, chunk_size, returns_as \\ :one, query_options \\ []) do
  51. # We don't actually need start and end functions of resource streaming,
  52. # but it seems to be the only way to not fetch records one-by-one and
  53. # have individual records be the elements of the stream, instead of
  54. # lists of records
  55. Stream.resource(
  56. fn -> 0 end,
  57. fn
  58. last_id ->
  59. query
  60. |> order_by(asc: :id)
  61. |> where([r], r.id > ^last_id)
  62. |> limit(^chunk_size)
  63. |> all(query_options)
  64. |> case do
  65. [] ->
  66. {:halt, last_id}
  67. records ->
  68. last_id = List.last(records).id
  69. if returns_as == :one do
  70. {records, last_id}
  71. else
  72. {[records], last_id}
  73. end
  74. end
  75. end,
  76. fn _ -> :ok end
  77. )
  78. end
  79. end