logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma
commit: 692919c7d260b85eb653089765d7f10425c06cf9
parent: dfd031c26adb78fbbbf44943a81ef506fbbd1324
Author: lambda <lain@soykaf.club>
Date:   Tue, 14 May 2019 15:27:34 +0000

Merge branch 'refactor/use-job-queue-everywhere' into 'develop'

use job queue everywhere

Closes #862

See merge request pleroma/pleroma!1142

Diffstat:

Mlib/pleroma/user.ex120+++++++++++++++++++++++++++++++++++++++++++++++--------------------------------
Mlib/pleroma/web/activity_pub/activity_pub.ex4+---
Mlib/pleroma/web/rich_media/helpers.ex2++
Mlib/pleroma/web/twitter_api/controllers/util_controller.ex18++++++++++++++----
4 files changed, 89 insertions(+), 55 deletions(-)

diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex @@ -425,24 +425,6 @@ defmodule Pleroma.User do Enum.member?(follower.following, followed.follower_address) end - def follow_import(%User{} = follower, followed_identifiers) - when is_list(followed_identifiers) do - Enum.map( - followed_identifiers, - fn followed_identifier -> - with {:ok, %User{} = followed} <- get_or_fetch(followed_identifier), - {:ok, follower} <- maybe_direct_follow(follower, followed), - {:ok, _} <- ActivityPub.follow(follower, followed) do - followed - else - err -> - Logger.debug("follow_import failed for #{followed_identifier} with: #{inspect(err)}") - err - end - end - ) - end - def locked?(%User{} = user) do user.info.locked || false end @@ -564,8 +546,7 @@ defmodule Pleroma.User do with [_nick, _domain] <- String.split(nickname, "@"), {:ok, user} <- fetch_by_nickname(nickname) do if Pleroma.Config.get([:fetch_initial_posts, :enabled]) do - # TODO turn into job - {:ok, _} = Task.start(__MODULE__, :fetch_initial_posts, [user]) + fetch_initial_posts(user) end {:ok, user} @@ -576,15 +557,8 @@ defmodule Pleroma.User do end @doc "Fetch some posts when the user has just been federated with" - def fetch_initial_posts(user) do - pages = Pleroma.Config.get!([:fetch_initial_posts, :pages]) - - Enum.each( - # Insert all the posts in reverse order, so they're in the right order on the timeline - Enum.reverse(Utils.fetch_ordered_collection(user.info.source_data["outbox"], pages)), - &Pleroma.Web.Federator.incoming_ap_doc/1 - ) - end + def fetch_initial_posts(user), + do: PleromaJobQueue.enqueue(:background, __MODULE__, [:fetch_initial_posts, user]) @spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t() def get_followers_query(%User{} = user, nil) do @@ -866,23 +840,6 @@ defmodule Pleroma.User do |> restrict_deactivated() end - def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do - Enum.map( - blocked_identifiers, - fn blocked_identifier -> - with {:ok, %User{} = blocked} <- get_or_fetch(blocked_identifier), - {:ok, blocker} <- block(blocker, blocked), - {:ok, _} <- ActivityPub.block(blocker, blocked) do - blocked - else - err -> - Logger.debug("blocks_import failed for #{blocked_identifier} with: #{inspect(err)}") - err - end - end - ) - end - def mute(muter, %User{ap_id: ap_id}) do info_cng = muter.info @@ -1104,6 +1061,73 @@ defmodule Pleroma.User do delete_user_activities(user) end + @spec perform(atom(), User.t()) :: {:ok, User.t()} + def perform(:fetch_initial_posts, %User{} = user) do + pages = Pleroma.Config.get!([:fetch_initial_posts, :pages]) + + Enum.each( + # Insert all the posts in reverse order, so they're in the right order on the timeline + Enum.reverse(Utils.fetch_ordered_collection(user.info.source_data["outbox"], pages)), + &Pleroma.Web.Federator.incoming_ap_doc/1 + ) + + {:ok, user} + end + + @spec perform(atom(), User.t(), list()) :: list() | {:error, any()} + def perform(:blocks_import, %User{} = blocker, blocked_identifiers) + when is_list(blocked_identifiers) do + Enum.map( + blocked_identifiers, + fn blocked_identifier -> + with {:ok, %User{} = blocked} <- get_or_fetch(blocked_identifier), + {:ok, blocker} <- block(blocker, blocked), + {:ok, _} <- ActivityPub.block(blocker, blocked) do + blocked + else + err -> + Logger.debug("blocks_import failed for #{blocked_identifier} with: #{inspect(err)}") + err + end + end + ) + end + + @spec perform(atom(), User.t(), list()) :: list() | {:error, any()} + def perform(:follow_import, %User{} = follower, followed_identifiers) + when is_list(followed_identifiers) do + Enum.map( + followed_identifiers, + fn followed_identifier -> + with {:ok, %User{} = followed} <- get_or_fetch(followed_identifier), + {:ok, follower} <- maybe_direct_follow(follower, followed), + {:ok, _} <- ActivityPub.follow(follower, followed) do + followed + else + err -> + Logger.debug("follow_import failed for #{followed_identifier} with: #{inspect(err)}") + err + end + end + ) + end + + def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers), + do: + PleromaJobQueue.enqueue(:background, __MODULE__, [ + :blocks_import, + blocker, + blocked_identifiers + ]) + + def follow_import(%User{} = follower, followed_identifiers) when is_list(followed_identifiers), + do: + PleromaJobQueue.enqueue(:background, __MODULE__, [ + :follow_import, + follower, + followed_identifiers + ]) + def delete_user_activities(%User{ap_id: ap_id} = user) do stream = ap_id @@ -1157,8 +1181,8 @@ defmodule Pleroma.User do resp = fetch_by_ap_id(ap_id) if should_fetch_initial do - with {:ok, %User{} = user} = resp do - {:ok, _} = Task.start(__MODULE__, :fetch_initial_posts, [user]) + with {:ok, %User{} = user} <- resp do + fetch_initial_posts(user) end end diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -133,9 +133,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do activity end - Task.start(fn -> - Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) - end) + PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity]) Notification.create_notifications(activity) diff --git a/lib/pleroma/web/rich_media/helpers.ex b/lib/pleroma/web/rich_media/helpers.ex @@ -34,4 +34,6 @@ defmodule Pleroma.Web.RichMedia.Helpers do end def fetch_data_for_activity(_), do: %{} + + def perform(:fetch, %Activity{} = activity), do: fetch_data_for_activity(activity) end diff --git a/lib/pleroma/web/twitter_api/controllers/util_controller.ex b/lib/pleroma/web/twitter_api/controllers/util_controller.ex @@ -309,8 +309,13 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do Enum.map(lines, fn line -> String.split(line, ",") |> List.first() end) - |> List.delete("Account address"), - {:ok, _} = Task.start(fn -> User.follow_import(follower, followed_identifiers) end) do + |> List.delete("Account address") do + PleromaJobQueue.enqueue(:background, User, [ + :follow_import, + follower, + followed_identifiers + ]) + json(conn, "job started") end end @@ -320,8 +325,13 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do end def blocks_import(%{assigns: %{user: blocker}} = conn, %{"list" => list}) do - with blocked_identifiers <- String.split(list), - {:ok, _} = Task.start(fn -> User.blocks_import(blocker, blocked_identifiers) end) do + with blocked_identifiers <- String.split(list) do + PleromaJobQueue.enqueue(:background, User, [ + :blocks_import, + blocker, + blocked_identifiers + ]) + json(conn, "job started") end end