commit: a9aa810d3dadaac5a40d18f56ab41b6276206db1
parent 649e51b581327eb34d31e0160ea70d1cba281f9a
Author: Mark Felder <feld@feld.me>
Date: Thu, 22 Aug 2024 12:49:32 -0400
Change imports to generate an Oban job per each task
Diffstat:
6 files changed, 170 insertions(+), 109 deletions(-)
diff --git a/changelog.d/user-imports.fix b/changelog.d/user-imports.fix
@@ -1 +1 @@
-Imports of blocks, mutes, and following would retry until Oban runs out of attempts due to incorrect return value being considered an error.
+Imports of blocks, mutes, and follows would retry repeatedly due to incorrect error handling and all work executed in a single job
diff --git a/lib/pleroma/user/import.ex b/lib/pleroma/user/import.ex
@@ -5,6 +5,7 @@
defmodule Pleroma.User.Import do
use Ecto.Schema
+ alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.CommonAPI
alias Pleroma.Workers.BackgroundWorker
@@ -12,80 +13,103 @@ defmodule Pleroma.User.Import do
require Logger
@spec perform(atom(), User.t(), list()) :: :ok | list() | {:error, any()}
- def perform(:mutes_import, %User{} = user, [_ | _] = identifiers) do
- Enum.map(
- identifiers,
- fn identifier ->
- with {:ok, %User{} = muted_user} <- User.get_or_fetch(identifier),
- {:ok, _} <- User.mute(user, muted_user) do
- {:ok, muted_user}
- else
- error -> handle_error(:mutes_import, identifier, error)
- end
- end
- )
+ def perform(:mute_import, %User{} = user, actor) do
+ with {:ok, %User{} = muted_user} <- User.get_or_fetch(actor),
+ {_, false} <- {:existing_mute, User.mutes_user?(user, muted_user)},
+ {:ok, _} <- User.mute(user, muted_user),
+ # User.mute/2 returns a FollowingRelationship not a %User{} like we get
+ # from CommonAPI.block/2 or CommonAPI.follow/2, so we fetch again to
+ # return the target actor for consistency
+ {:ok, muted_user} <- User.get_or_fetch(actor) do
+ {:ok, muted_user}
+ else
+ {:existing_mute, true} -> :ok
+ error -> handle_error(:mutes_import, actor, error)
+ end
end
- def perform(:blocks_import, %User{} = blocker, [_ | _] = identifiers) do
- Enum.map(
- identifiers,
- fn identifier ->
- with {:ok, %User{} = blocked} <- User.get_or_fetch(identifier),
- {:ok, _block} <- CommonAPI.block(blocked, blocker) do
- {:ok, blocked}
- else
- error -> handle_error(:blocks_import, identifier, error)
- end
- end
- )
+ def perform(:block_import, %User{} = user, actor) do
+ with {:ok, %User{} = blocked} <- User.get_or_fetch(actor),
+ {_, false} <- {:existing_block, User.blocks_user?(user, blocked)},
+ {:ok, _block} <- CommonAPI.block(blocked, user) do
+ {:ok, blocked}
+ else
+ {:existing_block, true} -> :ok
+ error -> handle_error(:blocks_import, actor, error)
+ end
end
- def perform(:follow_import, %User{} = follower, [_ | _] = identifiers) do
- Enum.map(
- identifiers,
- fn identifier ->
- with {:ok, %User{} = followed} <- User.get_or_fetch(identifier),
- {:ok, follower, followed} <- User.maybe_direct_follow(follower, followed),
- {:ok, _, _, _} <- CommonAPI.follow(followed, follower) do
- {:ok, followed}
- else
- error -> handle_error(:follow_import, identifier, error)
- end
- end
- )
+ def perform(:follow_import, %User{} = user, actor) do
+ with {:ok, %User{} = followed} <- User.get_or_fetch(actor),
+ {_, false} <- {:existing_follow, User.following?(user, followed)},
+ {:ok, user, followed} <- User.maybe_direct_follow(user, followed),
+ {:ok, _, _, _} <- CommonAPI.follow(followed, user) do
+ {:ok, followed}
+ else
+ {:existing_follow, true} -> :ok
+ error -> handle_error(:follow_import, actor, error)
+ end
end
- def perform(_, _, _), do: :ok
-
defp handle_error(op, user_id, error) do
Logger.debug("#{op} failed for #{user_id} with: #{inspect(error)}")
error
end
- def blocks_import(%User{} = blocker, [_ | _] = identifiers) do
- BackgroundWorker.new(%{
- "op" => "blocks_import",
- "user_id" => blocker.id,
- "identifiers" => identifiers
- })
- |> Oban.insert()
+ def blocks_import(%User{} = user, [_ | _] = actors) do
+ jobs =
+ Repo.checkout(fn ->
+ Enum.reduce(actors, [], fn actor, acc ->
+ {:ok, job} =
+ BackgroundWorker.new(%{
+ "op" => "block_import",
+ "user_id" => user.id,
+ "actor" => actor
+ })
+ |> Oban.insert()
+
+ acc ++ [job]
+ end)
+ end)
+
+ {:ok, jobs}
end
- def follow_import(%User{} = follower, [_ | _] = identifiers) do
- BackgroundWorker.new(%{
- "op" => "follow_import",
- "user_id" => follower.id,
- "identifiers" => identifiers
- })
- |> Oban.insert()
+ def follows_import(%User{} = user, [_ | _] = actors) do
+ jobs =
+ Repo.checkout(fn ->
+ Enum.reduce(actors, [], fn actor, acc ->
+ {:ok, job} =
+ BackgroundWorker.new(%{
+ "op" => "follow_import",
+ "user_id" => user.id,
+ "actor" => actor
+ })
+ |> Oban.insert()
+
+ acc ++ [job]
+ end)
+ end)
+
+ {:ok, jobs}
end
- def mutes_import(%User{} = user, [_ | _] = identifiers) do
- BackgroundWorker.new(%{
- "op" => "mutes_import",
- "user_id" => user.id,
- "identifiers" => identifiers
- })
- |> Oban.insert()
+ def mutes_import(%User{} = user, [_ | _] = actors) do
+ jobs =
+ Repo.checkout(fn ->
+ Enum.reduce(actors, [], fn actor, acc ->
+ {:ok, job} =
+ BackgroundWorker.new(%{
+ "op" => "mute_import",
+ "user_id" => user.id,
+ "actor" => actor
+ })
+ |> Oban.insert()
+
+ acc ++ [job]
+ end)
+ end)
+
+ {:ok, jobs}
end
end
diff --git a/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex b/lib/pleroma/web/pleroma_api/controllers/user_import_controller.ex
@@ -38,8 +38,8 @@ defmodule Pleroma.Web.PleromaAPI.UserImportController do
|> Enum.map(&(&1 |> String.trim() |> String.trim_leading("@")))
|> Enum.reject(&(&1 == ""))
- User.Import.follow_import(follower, identifiers)
- json(conn, "job started")
+ User.Import.follows_import(follower, identifiers)
+ json(conn, "jobs started")
end
def blocks(
@@ -55,7 +55,7 @@ defmodule Pleroma.Web.PleromaAPI.UserImportController do
defp do_block(%{assigns: %{user: blocker}} = conn, list) do
User.Import.blocks_import(blocker, prepare_user_identifiers(list))
- json(conn, "job started")
+ json(conn, "jobs started")
end
def mutes(
@@ -71,7 +71,7 @@ defmodule Pleroma.Web.PleromaAPI.UserImportController do
defp do_mute(%{assigns: %{user: user}} = conn, list) do
User.Import.mutes_import(user, prepare_user_identifiers(list))
- json(conn, "job started")
+ json(conn, "jobs started")
end
defp prepare_user_identifiers(list) do
diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex
@@ -19,10 +19,10 @@ defmodule Pleroma.Workers.BackgroundWorker do
User.perform(:force_password_reset, user)
end
- def perform(%Job{args: %{"op" => op, "user_id" => user_id, "identifiers" => identifiers}})
- when op in ["blocks_import", "follow_import", "mutes_import"] do
+ def perform(%Job{args: %{"op" => op, "user_id" => user_id, "actor" => actor}})
+ when op in ["block_import", "follow_import", "mute_import"] do
user = User.get_cached_by_id(user_id)
- {:ok, User.Import.perform(String.to_existing_atom(op), user, identifiers)}
+ User.Import.perform(String.to_existing_atom(op), user, actor)
end
def perform(%Job{
diff --git a/test/pleroma/user/import_test.exs b/test/pleroma/user/import_test.exs
@@ -25,11 +25,12 @@ defmodule Pleroma.User.ImportTest do
user3.nickname
]
- {:ok, job} = User.Import.follow_import(user1, identifiers)
+ {:ok, jobs} = User.Import.follows_import(user1, identifiers)
+
+ for job <- jobs do
+ assert {:ok, %User{}} = ObanHelpers.perform(job)
+ end
- assert {:ok, result} = ObanHelpers.perform(job)
- assert is_list(result)
- assert result == [{:ok, refresh_record(user2)}, {:ok, refresh_record(user3)}]
assert User.following?(user1, user2)
assert User.following?(user1, user3)
end
@@ -44,11 +45,12 @@ defmodule Pleroma.User.ImportTest do
user3.nickname
]
- {:ok, job} = User.Import.blocks_import(user1, identifiers)
+ {:ok, jobs} = User.Import.blocks_import(user1, identifiers)
+
+ for job <- jobs do
+ assert {:ok, %User{}} = ObanHelpers.perform(job)
+ end
- assert {:ok, result} = ObanHelpers.perform(job)
- assert is_list(result)
- assert result == [{:ok, user2}, {:ok, user3}]
assert User.blocks?(user1, user2)
assert User.blocks?(user1, user3)
end
@@ -63,11 +65,12 @@ defmodule Pleroma.User.ImportTest do
user3.nickname
]
- {:ok, job} = User.Import.mutes_import(user1, identifiers)
+ {:ok, jobs} = User.Import.mutes_import(user1, identifiers)
+
+ for job <- jobs do
+ assert {:ok, %User{}} = ObanHelpers.perform(job)
+ end
- assert {:ok, result} = ObanHelpers.perform(job)
- assert is_list(result)
- assert result == [{:ok, user2}, {:ok, user3}]
assert User.mutes?(user1, user2)
assert User.mutes?(user1, user3)
end
diff --git a/test/pleroma/web/pleroma_api/controllers/user_import_controller_test.exs b/test/pleroma/web/pleroma_api/controllers/user_import_controller_test.exs
@@ -22,7 +22,7 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do
test "it returns HTTP 200", %{conn: conn} do
user2 = insert(:user)
- assert "job started" ==
+ assert "jobs started" ==
conn
|> put_req_header("content-type", "application/json")
|> post("/api/pleroma/follow_import", %{"list" => "#{user2.ap_id}"})
@@ -38,7 +38,7 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do
"Account address,Show boosts\n#{user2.ap_id},true"
end}
]) do
- assert "job started" ==
+ assert "jobs started" ==
conn
|> put_req_header("content-type", "application/json")
|> post("/api/pleroma/follow_import", %{
@@ -46,9 +46,9 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do
})
|> json_response_and_validate_schema(200)
- assert [{:ok, job_result}] = ObanHelpers.perform_all()
- assert job_result == [refresh_record(user2)]
- assert [%Pleroma.User{follower_count: 1}] = job_result
+ assert [{:ok, updated_user}] = ObanHelpers.perform_all()
+ assert updated_user.id == user2.id
+ assert updated_user.follower_count == 1
end
end
@@ -63,7 +63,7 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do
})
|> json_response_and_validate_schema(200)
- assert response == "job started"
+ assert response == "jobs started"
end
test "requires 'follow' or 'write:follows' permissions" do
@@ -102,14 +102,20 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do
]
|> Enum.join("\n")
- assert "job started" ==
+ assert "jobs started" ==
conn
|> put_req_header("content-type", "application/json")
|> post("/api/pleroma/follow_import", %{"list" => identifiers})
|> json_response_and_validate_schema(200)
- assert [{:ok, job_result}] = ObanHelpers.perform_all()
- assert job_result == Enum.map(users, &refresh_record/1)
+ results = ObanHelpers.perform_all()
+
+ returned_users =
+ for {_, returned_user} <- results do
+ returned_user
+ end
+
+ assert returned_users == Enum.map(users, &refresh_record/1)
end
end
@@ -120,7 +126,7 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do
test "it returns HTTP 200", %{conn: conn} do
user2 = insert(:user)
- assert "job started" ==
+ assert "jobs started" ==
conn
|> put_req_header("content-type", "application/json")
|> post("/api/pleroma/blocks_import", %{"list" => "#{user2.ap_id}"})
@@ -133,7 +139,7 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do
with_mocks([
{File, [], read!: fn "blocks_list.txt" -> "#{user2.ap_id} #{user3.ap_id}" end}
]) do
- assert "job started" ==
+ assert "jobs started" ==
conn
|> put_req_header("content-type", "application/json")
|> post("/api/pleroma/blocks_import", %{
@@ -141,8 +147,14 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do
})
|> json_response_and_validate_schema(200)
- assert [{:ok, job_result}] = ObanHelpers.perform_all()
- assert job_result == users
+ results = ObanHelpers.perform_all()
+
+ returned_users =
+ for {_, returned_user} <- results do
+ returned_user
+ end
+
+ assert returned_users == users
end
end
@@ -159,14 +171,25 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do
]
|> Enum.join(" ")
- assert "job started" ==
+ assert "jobs started" ==
conn
|> put_req_header("content-type", "application/json")
|> post("/api/pleroma/blocks_import", %{"list" => identifiers})
|> json_response_and_validate_schema(200)
- assert [{:ok, job_result}] = ObanHelpers.perform_all()
- assert job_result == users
+ results = ObanHelpers.perform_all()
+
+ returned_user_ids =
+ for {_, user} <- results do
+ user.id
+ end
+
+ original_user_ids =
+ for user <- users do
+ user.id
+ end
+
+ assert match?(^original_user_ids, returned_user_ids)
end
end
@@ -177,24 +200,25 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do
test "it returns HTTP 200", %{user: user, conn: conn} do
user2 = insert(:user)
- assert "job started" ==
+ assert "jobs started" ==
conn
|> put_req_header("content-type", "application/json")
|> post("/api/pleroma/mutes_import", %{"list" => "#{user2.ap_id}"})
|> json_response_and_validate_schema(200)
- assert [{:ok, job_result}] = ObanHelpers.perform_all()
- assert job_result == [user2]
+ [{:ok, result_user}] = ObanHelpers.perform_all()
+
+ assert result_user == refresh_record(user2)
assert Pleroma.User.mutes?(user, user2)
end
test "it imports mutes users from file", %{user: user, conn: conn} do
- users = [user2, user3] = insert_list(2, :user)
+ [user2, user3] = insert_list(2, :user)
with_mocks([
{File, [], read!: fn "mutes_list.txt" -> "#{user2.ap_id} #{user3.ap_id}" end}
]) do
- assert "job started" ==
+ assert "jobs started" ==
conn
|> put_req_header("content-type", "application/json")
|> post("/api/pleroma/mutes_import", %{
@@ -202,14 +226,19 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do
})
|> json_response_and_validate_schema(200)
- assert [{:ok, job_result}] = ObanHelpers.perform_all()
- assert job_result == users
- assert Enum.all?(users, &Pleroma.User.mutes?(user, &1))
+ results = ObanHelpers.perform_all()
+
+ returned_users =
+ for {_, returned_user} <- results do
+ returned_user
+ end
+
+ assert Enum.all?(returned_users, &Pleroma.User.mutes?(user, &1))
end
end
test "it imports mutes with different nickname variations", %{user: user, conn: conn} do
- users = [user2, user3, user4, user5, user6] = insert_list(5, :user)
+ [user2, user3, user4, user5, user6] = insert_list(5, :user)
identifiers =
[
@@ -221,15 +250,20 @@ defmodule Pleroma.Web.PleromaAPI.UserImportControllerTest do
]
|> Enum.join(" ")
- assert "job started" ==
+ assert "jobs started" ==
conn
|> put_req_header("content-type", "application/json")
|> post("/api/pleroma/mutes_import", %{"list" => identifiers})
|> json_response_and_validate_schema(200)
- assert [{:ok, job_result}] = ObanHelpers.perform_all()
- assert job_result == users
- assert Enum.all?(users, &Pleroma.User.mutes?(user, &1))
+ results = ObanHelpers.perform_all()
+
+ returned_users =
+ for {_, returned_user} <- results do
+ returned_user
+ end
+
+ assert Enum.all?(returned_users, &Pleroma.User.mutes?(user, &1))
end
end
end