commit: c077a14ce1343f5515fa11938df7d808f23a566c
parent 2380ae6dcc267d7d6ff81a55ae95eed718176563
Author: Mark Felder <feld@feld.me>
Date: Mon, 30 Sep 2024 13:54:56 -0400
Add Oban job to handle poll refreshing and stream out the update
Diffstat:
2 files changed, 31 insertions(+), 7 deletions(-)
diff --git a/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex b/lib/pleroma/web/mastodon_api/controllers/poll_controller.ex
@@ -9,6 +9,7 @@ defmodule Pleroma.Web.MastodonAPI.PollController do
alias Pleroma.Activity
alias Pleroma.Object
+ alias Pleroma.Workers.PollWorker
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Plugs.OAuthScopesPlug
@@ -33,6 +34,9 @@ defmodule Pleroma.Web.MastodonAPI.PollController do
with %Object{} = object <- Object.get_by_id(id),
%Activity{} = activity <- Activity.get_create_by_object_ap_id(object.data["id"]),
true <- Visibility.visible_for_user?(activity, user) do
+ PollWorker.new(%{"op" => "refresh", "activity_id" => activity.id})
+ |> Oban.insert(unique: [period: 60])
+
try_render(conn, "show.json", %{object: object, for: user})
else
error when is_nil(error) or error == false ->
diff --git a/lib/pleroma/workers/poll_worker.ex b/lib/pleroma/workers/poll_worker.ex
@@ -11,27 +11,41 @@ defmodule Pleroma.Workers.PollWorker do
alias Pleroma.Activity
alias Pleroma.Notification
alias Pleroma.Object
+ alias Pleroma.Object.Fetcher
+
+ @stream_out_impl Pleroma.Config.get(
+ [__MODULE__, :stream_out],
+ Pleroma.Web.ActivityPub.ActivityPub
+ )
@impl true
def perform(%Job{args: %{"op" => "poll_end", "activity_id" => activity_id}}) do
- with %Activity{} = activity <- find_poll_activity(activity_id),
+ with {_, %Activity{} = activity} <- {:activity, Activity.get_by_id(activity_id)},
{:ok, notifications} <- Notification.create_poll_notifications(activity) do
Notification.stream(notifications)
else
- {:error, :poll_activity_not_found} = e -> {:cancel, e}
+ {:activity, nil} -> {:cancel, :poll_activity_not_found}
e -> {:error, e}
end
end
- @impl true
- def timeout(_job), do: :timer.seconds(5)
+ def perform(%Job{args: %{"op" => "refresh", "activity_id" => activity_id}}) do
+ with {_, %Activity{object: object}} <-
+ {:activity, Activity.get_by_id_with_object(activity_id)},
+ {_, {:ok, _object}} <- {:refetch, Fetcher.refetch_object(object)} do
+ stream_update(activity_id)
- defp find_poll_activity(activity_id) do
- with nil <- Activity.get_by_id(activity_id) do
- {:error, :poll_activity_not_found}
+ :ok
+ else
+ {:activity, nil} -> {:cancel, :poll_activity_not_found}
+ {:refetch, _} = e -> {:cancel, e}
+ e -> {:error, e}
end
end
+ @impl true
+ def timeout(_job), do: :timer.seconds(5)
+
def schedule_poll_end(%Activity{data: %{"type" => "Create"}, id: activity_id} = activity) do
with %Object{data: %{"type" => "Question", "closed" => closed}} when is_binary(closed) <-
Object.normalize(activity),
@@ -49,4 +63,10 @@ defmodule Pleroma.Workers.PollWorker do
end
def schedule_poll_end(activity), do: {:error, activity}
+
+ defp stream_update(activity_id) do
+ Activity.get_by_id(activity_id)
+ |> Activity.normalize()
+ |> @stream_out_impl.stream_out()
+ end
end