logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma
commit: 833161b5d21f85e2276cd0cee3e148ecbe6e1f05
parent: a39a094cdabe9c2497fbd5cc4947650aff933c0d
Author: kaniini <nenolod@gmail.com>
Date:   Mon, 18 Feb 2019 19:43:06 +0000

Merge branch 'feature/jobs' into 'develop'

Job Queue

See merge request pleroma/pleroma!732

Diffstat:

Mconfig/config.exs6++++--
Mconfig/test.exs2++
Mdocs/config.md31++++++++++++++++++++++++-------
Mlib/pleroma/application.ex5+++--
Alib/pleroma/jobs.ex152+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mlib/pleroma/web/activity_pub/activity_pub.ex22++++++++++------------
Mlib/pleroma/web/activity_pub/activity_pub_controller.ex4++--
Mlib/pleroma/web/activity_pub/utils.ex2+-
Mlib/pleroma/web/federator/federator.ex145+++++++++++++++++++++++++++++--------------------------------------------------
Mlib/pleroma/web/ostatus/ostatus_controller.ex2+-
Mlib/pleroma/web/salmon/salmon.ex2+-
Mlib/pleroma/web/websub/websub.ex7++++---
Mlib/pleroma/web/websub/websub_controller.ex2+-
Atest/jobs_test.exs83+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Atest/support/jobs_worker_mock.ex19+++++++++++++++++++
Mtest/web/federator_test.exs38++++++++++----------------------------
16 files changed, 370 insertions(+), 152 deletions(-)

diff --git a/config/config.exs b/config/config.exs @@ -332,14 +332,16 @@ config :pleroma, Pleroma.User, "web" ] -config :pleroma, Pleroma.Web.Federator, max_jobs: 50 - config :pleroma, Pleroma.Web.Federator.RetryQueue, enabled: false, max_jobs: 20, initial_timeout: 30, max_retries: 5 +config :pleroma, Pleroma.Jobs, + federator_incoming: [max_jobs: 50], + federator_outgoing: [max_jobs: 50] + # Import environment specific config. This must remain at the bottom # of this file so it overrides the configuration defined above. import_config "#{Mix.env()}.exs" diff --git a/config/test.exs b/config/test.exs @@ -44,6 +44,8 @@ config :web_push_encryption, :vapid_details, "BLH1qVhJItRGCfxgTtONfsOKDc9VRAraXw-3NsmjMngWSh7NxOizN6bkuRA7iLTMPS82PjwJAr3UoK9EC1IFrz4", private_key: "_-XZ0iebPrRfZ_o0-IatTdszYa8VCH1yLN-JauK7HHA" +config :pleroma, Pleroma.Jobs, testing: [max_jobs: 2] + try do import_config "test.secret.exs" rescue diff --git a/docs/config.md b/docs/config.md @@ -36,14 +36,15 @@ This filter replaces the filename (not the path) of an upload. For complete obfu An example for Sendgrid adapter: -``` +```exs config :pleroma, Pleroma.Mailer, adapter: Swoosh.Adapters.Sendgrid, api_key: "YOUR_API_KEY" ``` An example for SMTP adapter: -``` + +```exs config :pleroma, Pleroma.Mailer, adapter: Swoosh.Adapters.SMTP, relay: "smtp.gmail.com", @@ -209,7 +210,7 @@ their ActivityPub ID. An example: -``` +```exs config :pleroma, :mrf_user_allowlist, "example.org": ["https://example.org/users/admin"] ``` @@ -238,18 +239,34 @@ the source code is here: https://github.com/koto-bank/kocaptcha. The default end Allows to set a token that can be used to authenticate with the admin api without using an actual user by giving it as the 'admin_token' parameter. Example: -``` +```exs config :pleroma, :admin_token, "somerandomtoken" ``` You can then do -``` + +```sh curl "http://localhost:4000/api/pleroma/admin/invite_token?admin_token=somerandomtoken" ``` -## Pleroma.Web.Federator +## Pleroma.Jobs + +A list of job queues and their settings. + +Job queue settings: + +* `max_jobs`: The maximum amount of parallel jobs running at the same time. + +Example: + +```exs +config :pleroma, Pleroma.Jobs, + federator_incoming: [max_jobs: 50], + federator_outgoing: [max_jobs: 50] +``` + +This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the `max_jobs` set to `50`. -* `max_jobs`: The maximum amount of parallel federation jobs running at the same time. ## Pleroma.Web.Federator.RetryQueue diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex @@ -108,9 +108,10 @@ defmodule Pleroma.Application do hackney_pool_children() ++ [ worker(Pleroma.Web.Federator.RetryQueue, []), - worker(Pleroma.Web.Federator, []), worker(Pleroma.Stats, []), - worker(Pleroma.Web.Push, []) + worker(Pleroma.Web.Push, []), + worker(Pleroma.Jobs, []), + worker(Task, [&Pleroma.Web.Federator.init/0], restart: :temporary) ] ++ streamer_child() ++ chat_child() ++ diff --git a/lib/pleroma/jobs.ex b/lib/pleroma/jobs.ex @@ -0,0 +1,152 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Jobs do + @moduledoc """ + A basic job queue + """ + use GenServer + + require Logger + + def init(args) do + {:ok, args} + end + + def start_link do + queues = + Pleroma.Config.get(Pleroma.Jobs) + |> Enum.map(fn {name, _} -> create_queue(name) end) + |> Enum.into(%{}) + + state = %{ + queues: queues, + refs: %{} + } + + GenServer.start_link(__MODULE__, state, name: __MODULE__) + end + + def create_queue(name) do + {name, {:sets.new(), []}} + end + + @doc """ + Enqueues a job. + + Returns `:ok`. + + ## Arguments + + - `queue_name` - a queue name(must be specified in the config). + - `mod` - a worker module (must have `perform` function). + - `args` - a list of arguments for the `perform` function of the worker module. + - `priority` - a job priority (`0` by default). + + ## Examples + + Enqueue `Module.perform/0` with `priority=1`: + + iex> Pleroma.Jobs.enqueue(:example_queue, Module, []) + :ok + + Enqueue `Module.perform(:job_name)` with `priority=5`: + + iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:job_name], 5) + :ok + + Enqueue `Module.perform(:another_job, data)` with `priority=1`: + + iex> data = "foobar" + iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:another_job, data]) + :ok + + Enqueue `Module.perform(:foobar_job, :foo, :bar, 42)` with `priority=1`: + + iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:foobar_job, :foo, :bar, 42]) + :ok + + """ + + def enqueue(queue_name, mod, args, priority \\ 1) + + if Mix.env() == :test do + def enqueue(_queue_name, mod, args, _priority) do + apply(mod, :perform, args) + end + else + @spec enqueue(atom(), atom(), [any()], integer()) :: :ok + def enqueue(queue_name, mod, args, priority) do + GenServer.cast(__MODULE__, {:enqueue, queue_name, mod, args, priority}) + end + end + + def handle_cast({:enqueue, queue_name, mod, args, priority}, state) do + {running_jobs, queue} = state[:queues][queue_name] + + queue = enqueue_sorted(queue, {mod, args}, priority) + + state = + state + |> update_queue(queue_name, {running_jobs, queue}) + |> maybe_start_job(queue_name, running_jobs, queue) + + {:noreply, state} + end + + def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do + queue_name = state.refs[ref] + + {running_jobs, queue} = state[:queues][queue_name] + + running_jobs = :sets.del_element(ref, running_jobs) + + state = + state + |> remove_ref(ref) + |> update_queue(queue_name, {running_jobs, queue}) + |> maybe_start_job(queue_name, running_jobs, queue) + + {:noreply, state} + end + + def maybe_start_job(state, queue_name, running_jobs, queue) do + if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, queue_name, :max_jobs]) && + queue != [] do + {{mod, args}, queue} = queue_pop(queue) + {:ok, pid} = Task.start(fn -> apply(mod, :perform, args) end) + mref = Process.monitor(pid) + + state + |> add_ref(queue_name, mref) + |> update_queue(queue_name, {:sets.add_element(mref, running_jobs), queue}) + else + state + end + end + + def enqueue_sorted(queue, element, priority) do + [%{item: element, priority: priority} | queue] + |> Enum.sort_by(fn %{priority: priority} -> priority end) + end + + def queue_pop([%{item: element} | queue]) do + {element, queue} + end + + defp add_ref(state, queue_name, ref) do + refs = Map.put(state[:refs], ref, queue_name) + Map.put(state, :refs, refs) + end + + defp remove_ref(state, ref) do + refs = Map.delete(state[:refs], ref) + Map.put(state, :refs, refs) + end + + defp update_queue(state, queue_name, data) do + queues = Map.put(state[:queues], queue_name, data) + Map.put(state, :queues, queues) + end +end diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -757,21 +757,19 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do public = is_public?(activity) - reachable_inboxes_metadata = - (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers) - |> Enum.filter(fn user -> User.ap_enabled?(user) end) - |> Enum.map(fn %{info: %{source_data: data}} -> - (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"] - end) - |> Enum.uniq() - |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) - |> Instances.filter_reachable() - {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) json = Jason.encode!(data) - Enum.each(reachable_inboxes_metadata, fn {inbox, unreachable_since} -> - Federator.enqueue(:publish_single_ap, %{ + (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers) + |> Enum.filter(fn user -> User.ap_enabled?(user) end) + |> Enum.map(fn %{info: %{source_data: data}} -> + (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"] + end) + |> Enum.uniq() + |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) + |> Instances.filter_reachable() + |> Enum.each(fn {inbox, unreachable_since} -> + Federator.publish_single_ap(%{ inbox: inbox, json: json, actor: actor, diff --git a/lib/pleroma/web/activity_pub/activity_pub_controller.ex b/lib/pleroma/web/activity_pub/activity_pub_controller.ex @@ -155,13 +155,13 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubController do with %User{} = user <- User.get_cached_by_nickname(nickname), true <- Utils.recipient_in_message(user.ap_id, params), params <- Utils.maybe_splice_recipient(user.ap_id, params) do - Federator.enqueue(:incoming_ap_doc, params) + Federator.incoming_ap_doc(params) json(conn, "ok") end end def inbox(%{assigns: %{valid_signature: true}} = conn, params) do - Federator.enqueue(:incoming_ap_doc, params) + Federator.incoming_ap_doc(params) json(conn, "ok") end diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex @@ -164,7 +164,7 @@ defmodule Pleroma.Web.ActivityPub.Utils do _ -> 5 end - Pleroma.Web.Federator.enqueue(:publish, activity, priority) + Pleroma.Web.Federator.publish(activity, priority) :ok end diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex @@ -3,8 +3,6 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Federator do - use GenServer - alias Pleroma.Activity alias Pleroma.User alias Pleroma.Web.WebFinger @@ -16,45 +14,71 @@ defmodule Pleroma.Web.Federator do alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.Federator.RetryQueue alias Pleroma.Web.OStatus + alias Pleroma.Jobs require Logger @websub Application.get_env(:pleroma, :websub) @ostatus Application.get_env(:pleroma, :ostatus) - def init(args) do - {:ok, args} + def init() do + # 1 minute + Process.sleep(1000 * 60 * 1) + refresh_subscriptions() end - def start_link do - spawn(fn -> - # 1 minute - Process.sleep(1000 * 60) - enqueue(:refresh_subscriptions, nil) - end) + # Client API + + def incoming_doc(doc) do + Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc]) + end + + def incoming_ap_doc(params) do + Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params]) + end + + def publish(activity, priority \\ 1) do + Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority) + end + + def publish_single_ap(params) do + Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_ap, params]) + end - GenServer.start_link( - __MODULE__, - %{ - in: {:sets.new(), []}, - out: {:sets.new(), []} - }, - name: __MODULE__ - ) + def publish_single_websub(websub) do + Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_websub, websub]) end - def handle(:refresh_subscriptions, _) do + def verify_websub(websub) do + Jobs.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub]) + end + + def request_subscription(sub) do + Jobs.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub]) + end + + def refresh_subscriptions() do + Jobs.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions]) + end + + def publish_single_salmon(params) do + Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_salmon, params]) + end + + # Job Worker Callbacks + + def perform(:refresh_subscriptions) do Logger.debug("Federator running refresh subscriptions") Websub.refresh_subscriptions() spawn(fn -> # 6 hours Process.sleep(1000 * 60 * 60 * 6) - enqueue(:refresh_subscriptions, nil) + refresh_subscriptions() end) end - def handle(:request_subscription, websub) do + def perform(:request_subscription, websub) do Logger.debug("Refreshing #{websub.topic}") with {:ok, websub} <- Websub.request_subscription(websub) do @@ -64,7 +88,7 @@ defmodule Pleroma.Web.Federator do end end - def handle(:publish, activity) do + def perform(:publish, activity) do Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end) with actor when not is_nil(actor) <- User.get_cached_by_ap_id(activity.data["actor"]) do @@ -90,7 +114,7 @@ defmodule Pleroma.Web.Federator do end end - def handle(:verify_websub, websub) do + def perform(:verify_websub, websub) do Logger.debug(fn -> "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" end) @@ -98,12 +122,12 @@ defmodule Pleroma.Web.Federator do @websub.verify(websub) end - def handle(:incoming_doc, doc) do + def perform(:incoming_doc, doc) do Logger.info("Got document, trying to parse") @ostatus.handle_incoming(doc) end - def handle(:incoming_ap_doc, params) do + def perform(:incoming_ap_doc, params) do Logger.info("Handling incoming AP activity") params = Utils.normalize_params(params) @@ -128,11 +152,11 @@ defmodule Pleroma.Web.Federator do end end - def handle(:publish_single_salmon, params) do + def perform(:publish_single_salmon, params) do Salmon.send_to_user(params) end - def handle(:publish_single_ap, params) do + def perform(:publish_single_ap, params) do case ActivityPub.publish_one(params) do {:ok, _} -> :ok @@ -142,7 +166,7 @@ defmodule Pleroma.Web.Federator do end end - def handle( + def perform( :publish_single_websub, %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params ) do @@ -155,74 +179,11 @@ defmodule Pleroma.Web.Federator do end end - def handle(type, _) do + def perform(type, _) do Logger.debug(fn -> "Unknown task: #{type}" end) {:error, "Don't know what to do with this"} end - if Mix.env() == :test do - def enqueue(type, payload, _priority \\ 1) do - if Pleroma.Config.get([:instance, :federating]) do - handle(type, payload) - end - end - else - def enqueue(type, payload, priority \\ 1) do - if Pleroma.Config.get([:instance, :federating]) do - GenServer.cast(__MODULE__, {:enqueue, type, payload, priority}) - end - end - end - - def maybe_start_job(running_jobs, queue) do - if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, :max_jobs]) && queue != [] do - {{type, payload}, queue} = queue_pop(queue) - {:ok, pid} = Task.start(fn -> handle(type, payload) end) - mref = Process.monitor(pid) - {:sets.add_element(mref, running_jobs), queue} - else - {running_jobs, queue} - end - end - - def handle_cast({:enqueue, type, payload, _priority}, state) - when type in [:incoming_doc, :incoming_ap_doc] do - %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state - i_queue = enqueue_sorted(i_queue, {type, payload}, 1) - {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue) - {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} - end - - def handle_cast({:enqueue, type, payload, _priority}, state) do - %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state - o_queue = enqueue_sorted(o_queue, {type, payload}, 1) - {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue) - {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} - end - - def handle_cast(_, state) do - {:noreply, state} - end - - def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do - %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state - i_running_jobs = :sets.del_element(ref, i_running_jobs) - o_running_jobs = :sets.del_element(ref, o_running_jobs) - {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue) - {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue) - - {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} - end - - def enqueue_sorted(queue, element, priority) do - [%{item: element, priority: priority} | queue] - |> Enum.sort_by(fn %{priority: priority} -> priority end) - end - - def queue_pop([%{item: element} | queue]) do - {element, queue} - end - def ap_enabled_actor(id) do user = User.get_by_ap_id(id) diff --git a/lib/pleroma/web/ostatus/ostatus_controller.ex b/lib/pleroma/web/ostatus/ostatus_controller.ex @@ -87,7 +87,7 @@ defmodule Pleroma.Web.OStatus.OStatusController do {:ok, body, _conn} = read_body(conn) {:ok, doc} = decode_or_retry(body) - Federator.enqueue(:incoming_doc, doc) + Federator.incoming_doc(doc) conn |> send_resp(200, "") diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex @@ -229,7 +229,7 @@ defmodule Pleroma.Web.Salmon do |> Enum.each(fn remote_user -> Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end) - Pleroma.Web.Federator.enqueue(:publish_single_salmon, %{ + Pleroma.Web.Federator.publish_single_salmon(%{ recipient: remote_user, feed: feed, poster: poster, diff --git a/lib/pleroma/web/websub/websub.ex b/lib/pleroma/web/websub/websub.ex @@ -13,6 +13,7 @@ defmodule Pleroma.Web.Websub do alias Pleroma.Web.Endpoint alias Pleroma.Web.OStatus alias Pleroma.Web.Router.Helpers + alias Pleroma.Web.Federator require Logger import Ecto.Query @@ -87,7 +88,7 @@ defmodule Pleroma.Web.Websub do unreachable_since: reachable_callbacks_metadata[sub.callback] } - Pleroma.Web.Federator.enqueue(:publish_single_websub, data) + Federator.publish_single_websub(data) end) end @@ -119,7 +120,7 @@ defmodule Pleroma.Web.Websub do websub = Repo.update!(change) - Pleroma.Web.Federator.enqueue(:verify_websub, websub) + Federator.verify_websub(websub) {:ok, websub} else @@ -269,7 +270,7 @@ defmodule Pleroma.Web.Websub do subs = Repo.all(query) Enum.each(subs, fn sub -> - Pleroma.Web.Federator.enqueue(:request_subscription, sub) + Federator.request_subscription(sub) end) end diff --git a/lib/pleroma/web/websub/websub_controller.ex b/lib/pleroma/web/websub/websub_controller.ex @@ -84,7 +84,7 @@ defmodule Pleroma.Web.Websub.WebsubController do %WebsubClientSubscription{} = websub <- Repo.get(WebsubClientSubscription, id), {:ok, body, _conn} = read_body(conn), ^signature <- Websub.sign(websub.secret, body) do - Federator.enqueue(:incoming_doc, body) + Federator.incoming_doc(body) conn |> send_resp(200, "OK") diff --git a/test/jobs_test.exs b/test/jobs_test.exs @@ -0,0 +1,83 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.JobsTest do + use ExUnit.Case, async: true + + alias Pleroma.Jobs + alias Jobs.WorkerMock + + setup do + state = %{ + queues: Enum.into([Jobs.create_queue(:testing)], %{}), + refs: %{} + } + + [state: state] + end + + test "creates queue" do + queue = Jobs.create_queue(:foobar) + + assert {:foobar, set} = queue + assert :set == elem(set, 0) |> elem(0) + end + + test "enqueues an element according to priority" do + queue = [%{item: 1, priority: 2}] + + new_queue = Jobs.enqueue_sorted(queue, 2, 1) + assert new_queue == [%{item: 2, priority: 1}, %{item: 1, priority: 2}] + + new_queue = Jobs.enqueue_sorted(queue, 2, 3) + assert new_queue == [%{item: 1, priority: 2}, %{item: 2, priority: 3}] + end + + test "pop first item" do + queue = [%{item: 2, priority: 1}, %{item: 1, priority: 2}] + + assert {2, [%{item: 1, priority: 2}]} = Jobs.queue_pop(queue) + end + + test "enqueue a job", %{state: state} do + assert {:noreply, new_state} = + Jobs.handle_cast({:enqueue, :testing, WorkerMock, [:test_job, :foo, :bar], 3}, state) + + assert %{queues: %{testing: {running_jobs, []}}, refs: _} = new_state + assert :sets.size(running_jobs) == 1 + assert [ref] = :sets.to_list(running_jobs) + assert %{refs: %{^ref => :testing}} = new_state + end + + test "max jobs setting", %{state: state} do + max_jobs = Pleroma.Config.get([Jobs, :testing, :max_jobs]) + + {:noreply, state} = + Enum.reduce(1..(max_jobs + 1), {:noreply, state}, fn _, {:noreply, state} -> + Jobs.handle_cast({:enqueue, :testing, WorkerMock, [:test_job, :foo, :bar], 3}, state) + end) + + assert %{ + queues: %{ + testing: + {running_jobs, [%{item: {WorkerMock, [:test_job, :foo, :bar]}, priority: 3}]} + } + } = state + + assert :sets.size(running_jobs) == max_jobs + end + + test "remove job after it finished", %{state: state} do + {:noreply, new_state} = + Jobs.handle_cast({:enqueue, :testing, WorkerMock, [:test_job, :foo, :bar], 3}, state) + + %{queues: %{testing: {running_jobs, []}}} = new_state + [ref] = :sets.to_list(running_jobs) + + assert {:noreply, %{queues: %{testing: {running_jobs, []}}, refs: %{}}} = + Jobs.handle_info({:DOWN, ref, :process, nil, nil}, new_state) + + assert :sets.size(running_jobs) == 0 + end +end diff --git a/test/support/jobs_worker_mock.ex b/test/support/jobs_worker_mock.ex @@ -0,0 +1,19 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Jobs.WorkerMock do + require Logger + + def perform(:test_job, arg, arg2) do + Logger.debug({:perform, :test_job, arg, arg2}) + end + + def perform(:test_job, payload) do + Logger.debug({:perform, :test_job, payload}) + end + + def test_job(payload) do + Pleroma.Jobs.enqueue(:testing, __MODULE__, [:test_job, payload]) + end +end diff --git a/test/web/federator_test.exs b/test/web/federator_test.exs @@ -15,22 +15,6 @@ defmodule Pleroma.Web.FederatorTest do :ok end - test "enqueues an element according to priority" do - queue = [%{item: 1, priority: 2}] - - new_queue = Federator.enqueue_sorted(queue, 2, 1) - assert new_queue == [%{item: 2, priority: 1}, %{item: 1, priority: 2}] - - new_queue = Federator.enqueue_sorted(queue, 2, 3) - assert new_queue == [%{item: 1, priority: 2}, %{item: 2, priority: 3}] - end - - test "pop first item" do - queue = [%{item: 2, priority: 1}, %{item: 1, priority: 2}] - - assert {2, [%{item: 1, priority: 2}]} = Federator.queue_pop(queue) - end - describe "Publish an activity" do setup do user = insert(:user) @@ -50,7 +34,7 @@ defmodule Pleroma.Web.FederatorTest do relay_mock: relay_mock } do with_mocks([relay_mock]) do - Federator.handle(:publish, activity) + Federator.publish(activity) end assert_received :relay_publish @@ -63,7 +47,7 @@ defmodule Pleroma.Web.FederatorTest do Pleroma.Config.put([:instance, :allow_relay], false) with_mocks([relay_mock]) do - Federator.handle(:publish, activity) + Federator.publish(activity) end refute_received :relay_publish @@ -104,11 +88,9 @@ defmodule Pleroma.Web.FederatorTest do {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"}) - assert called( - Federator.enqueue(:publish_single_ap, %{inbox: inbox1, unreachable_since: dt}) - ) + assert called(Federator.publish_single_ap(%{inbox: inbox1, unreachable_since: dt})) - refute called(Federator.enqueue(:publish_single_ap, %{inbox: inbox2})) + refute called(Federator.publish_single_ap(%{inbox: inbox2})) end test_with_mock "it federates only to reachable instances via Websub", @@ -140,13 +122,13 @@ defmodule Pleroma.Web.FederatorTest do {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI"}) assert called( - Federator.enqueue(:publish_single_websub, %{ + Federator.publish_single_websub(%{ callback: sub2.callback, unreachable_since: dt }) ) - refute called(Federator.enqueue(:publish_single_websub, %{callback: sub1.callback})) + refute called(Federator.publish_single_websub(%{callback: sub1.callback})) end test_with_mock "it federates only to reachable instances via Salmon", @@ -180,13 +162,13 @@ defmodule Pleroma.Web.FederatorTest do CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"}) assert called( - Federator.enqueue(:publish_single_salmon, %{ + Federator.publish_single_salmon(%{ recipient: remote_user2, unreachable_since: dt }) ) - refute called(Federator.enqueue(:publish_single_websub, %{recipient: remote_user1})) + refute called(Federator.publish_single_websub(%{recipient: remote_user1})) end end @@ -206,7 +188,7 @@ defmodule Pleroma.Web.FederatorTest do "to" => ["https://www.w3.org/ns/activitystreams#Public"] } - {:ok, _activity} = Federator.handle(:incoming_ap_doc, params) + {:ok, _activity} = Federator.incoming_ap_doc(params) end test "rejects incoming AP docs with incorrect origin" do @@ -224,7 +206,7 @@ defmodule Pleroma.Web.FederatorTest do "to" => ["https://www.w3.org/ns/activitystreams#Public"] } - :error = Federator.handle(:incoming_ap_doc, params) + :error = Federator.incoming_ap_doc(params) end end end