logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://hacktivis.me/git/pleroma.git

scheduled_activity.ex (6234B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.ScheduledActivity do
  5. use Ecto.Schema
  6. alias Ecto.Multi
  7. alias Pleroma.Repo
  8. alias Pleroma.ScheduledActivity
  9. alias Pleroma.User
  10. alias Pleroma.Web.CommonAPI.Utils
  11. alias Pleroma.Workers.ScheduledActivityWorker
  12. import Ecto.Query
  13. import Ecto.Changeset
  14. @type t :: %__MODULE__{}
  15. @min_offset :timer.minutes(5)
  16. @config_impl Application.compile_env(:pleroma, [__MODULE__, :config_impl], Pleroma.Config)
  17. schema "scheduled_activities" do
  18. belongs_to(:user, User, type: FlakeId.Ecto.CompatType)
  19. field(:scheduled_at, :naive_datetime)
  20. field(:params, :map)
  21. timestamps()
  22. end
  23. def changeset(%ScheduledActivity{} = scheduled_activity, attrs) do
  24. scheduled_activity
  25. |> cast(attrs, [:scheduled_at, :params])
  26. |> validate_required([:scheduled_at, :params])
  27. |> validate_scheduled_at()
  28. |> with_media_attachments()
  29. end
  30. defp with_media_attachments(
  31. %{changes: %{params: %{"media_ids" => media_ids} = params}} = changeset
  32. )
  33. when is_list(media_ids) do
  34. media_attachments =
  35. Utils.attachments_from_ids(
  36. %{media_ids: media_ids},
  37. User.get_cached_by_id(changeset.data.user_id)
  38. )
  39. params =
  40. params
  41. |> Map.put("media_attachments", media_attachments)
  42. |> Map.put("media_ids", media_ids)
  43. put_change(changeset, :params, params)
  44. end
  45. defp with_media_attachments(changeset), do: changeset
  46. def update_changeset(%ScheduledActivity{} = scheduled_activity, attrs) do
  47. scheduled_activity
  48. |> cast(attrs, [:scheduled_at])
  49. |> validate_required([:scheduled_at])
  50. |> validate_scheduled_at()
  51. end
  52. def validate_scheduled_at(changeset) do
  53. validate_change(changeset, :scheduled_at, fn _, scheduled_at ->
  54. cond do
  55. not far_enough?(scheduled_at) ->
  56. [scheduled_at: "must be at least 5 minutes from now"]
  57. exceeds_daily_user_limit?(changeset.data.user_id, scheduled_at) ->
  58. [scheduled_at: "daily limit exceeded"]
  59. exceeds_total_user_limit?(changeset.data.user_id) ->
  60. [scheduled_at: "total limit exceeded"]
  61. true ->
  62. []
  63. end
  64. end)
  65. end
  66. def exceeds_daily_user_limit?(user_id, scheduled_at) do
  67. ScheduledActivity
  68. |> where(user_id: ^user_id)
  69. |> where([sa], type(sa.scheduled_at, :date) == type(^scheduled_at, :date))
  70. |> select([sa], count(sa.id))
  71. |> Repo.one()
  72. |> Kernel.>=(@config_impl.get([ScheduledActivity, :daily_user_limit]))
  73. end
  74. def exceeds_total_user_limit?(user_id) do
  75. ScheduledActivity
  76. |> where(user_id: ^user_id)
  77. |> select([sa], count(sa.id))
  78. |> Repo.one()
  79. |> Kernel.>=(@config_impl.get([ScheduledActivity, :total_user_limit]))
  80. end
  81. def far_enough?(scheduled_at) when is_binary(scheduled_at) do
  82. with {:ok, scheduled_at} <- Ecto.Type.cast(:naive_datetime, scheduled_at) do
  83. far_enough?(scheduled_at)
  84. else
  85. _ -> false
  86. end
  87. end
  88. def far_enough?(scheduled_at) do
  89. now = NaiveDateTime.utc_now()
  90. diff = NaiveDateTime.diff(scheduled_at, now, :millisecond)
  91. diff > @min_offset
  92. end
  93. def new(%User{} = user, attrs) do
  94. changeset(%ScheduledActivity{user_id: user.id}, attrs)
  95. end
  96. @doc """
  97. Creates ScheduledActivity and add to queue to perform at scheduled_at date
  98. """
  99. @spec create(User.t(), map()) :: {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
  100. def create(%User{} = user, attrs) do
  101. Multi.new()
  102. |> Multi.insert(:scheduled_activity, new(user, attrs))
  103. |> maybe_add_jobs(@config_impl.get([ScheduledActivity, :enabled]))
  104. |> Repo.transaction()
  105. |> transaction_response
  106. end
  107. defp maybe_add_jobs(multi, true) do
  108. multi
  109. |> Multi.run(:scheduled_activity_job, fn _repo, %{scheduled_activity: activity} ->
  110. %{activity_id: activity.id}
  111. |> ScheduledActivityWorker.new(scheduled_at: activity.scheduled_at)
  112. |> Oban.insert()
  113. end)
  114. end
  115. defp maybe_add_jobs(multi, _), do: multi
  116. def get(%User{} = user, scheduled_activity_id) do
  117. ScheduledActivity
  118. |> where(user_id: ^user.id)
  119. |> where(id: ^scheduled_activity_id)
  120. |> Repo.one()
  121. end
  122. @spec update(ScheduledActivity.t(), map()) ::
  123. {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
  124. def update(%ScheduledActivity{id: id} = scheduled_activity, attrs) do
  125. with {:error, %Ecto.Changeset{valid?: true} = changeset} <-
  126. {:error, update_changeset(scheduled_activity, attrs)} do
  127. Multi.new()
  128. |> Multi.update(:scheduled_activity, changeset)
  129. |> Multi.update_all(:scheduled_job, job_query(id),
  130. set: [scheduled_at: get_field(changeset, :scheduled_at)]
  131. )
  132. |> Repo.transaction()
  133. |> transaction_response
  134. end
  135. end
  136. @doc "Deletes a ScheduledActivity and linked jobs."
  137. @spec delete(ScheduledActivity.t() | binary() | integer) ::
  138. {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
  139. def delete(%ScheduledActivity{id: id} = scheduled_activity) do
  140. Multi.new()
  141. |> Multi.delete(:scheduled_activity, scheduled_activity, stale_error_field: :id)
  142. |> Multi.delete_all(:jobs, job_query(id))
  143. |> Repo.transaction()
  144. |> transaction_response
  145. end
  146. def delete(id) when is_binary(id) or is_integer(id) do
  147. delete(%__MODULE__{id: id})
  148. end
  149. defp transaction_response(result) do
  150. case result do
  151. {:ok, %{scheduled_activity: scheduled_activity}} ->
  152. {:ok, scheduled_activity}
  153. {:error, _, changeset, _} ->
  154. {:error, changeset}
  155. end
  156. end
  157. def for_user_query(%User{} = user) do
  158. ScheduledActivity
  159. |> where(user_id: ^user.id)
  160. end
  161. def due_activities(offset \\ 0) do
  162. naive_datetime =
  163. NaiveDateTime.utc_now()
  164. |> NaiveDateTime.add(offset, :millisecond)
  165. ScheduledActivity
  166. |> where([sa], sa.scheduled_at < ^naive_datetime)
  167. |> Repo.all()
  168. end
  169. def job_query(scheduled_activity_id) do
  170. from(j in Oban.Job,
  171. where: j.queue == "scheduled_activities",
  172. where: fragment("args ->> 'activity_id' = ?::text", ^to_string(scheduled_activity_id))
  173. )
  174. end
  175. end