logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://anongit.hacktivis.me/git/pleroma.git/
commit: 32bc8ec5805fe45ef8f976f68539a4119a3989e5
parent d1d7dd18277985fbfb3df25c9f49630bb5537321
Author: nicole mikołajczyk <git@mkljczk.pl>
Date:   Sun,  7 Sep 2025 23:12:31 +0200

Stream marker updates

Signed-off-by: nicole mikołajczyk <git@mkljczk.pl>

Diffstat:

Achangelog.d/stream-marker-updates.add1+
Mlib/pleroma/notification.ex13+++++++++----
Mlib/pleroma/web/streamer.ex11+++++++++++
Mlib/pleroma/web/views/streamer_view.ex14++++++++++++++
Mtest/pleroma/notification_test.exs35+++++++++++++++++++++++++++++++++--
5 files changed, 68 insertions(+), 6 deletions(-)

diff --git a/changelog.d/stream-marker-updates.add b/changelog.d/stream-marker-updates.add @@ -0,0 +1 @@ +Stream marker updates diff --git a/lib/pleroma/notification.ex b/lib/pleroma/notification.ex @@ -281,10 +281,15 @@ defmodule Pleroma.Notification do select: n.id ) - Multi.new() - |> Multi.update_all(:ids, query, set: [seen: true, updated_at: NaiveDateTime.utc_now()]) - |> Marker.multi_set_last_read_id(user, "notifications") - |> Repo.transaction() + {:ok, %{marker: marker}} = + Multi.new() + |> Multi.update_all(:ids, query, set: [seen: true, updated_at: NaiveDateTime.utc_now()]) + |> Marker.multi_set_last_read_id(user, "notifications") + |> Repo.transaction() + + Streamer.stream(["user", "user:notification"], marker) + + {:ok, %{marker: marker}} end @spec read_one(User.t(), String.t()) :: diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex @@ -10,6 +10,7 @@ defmodule Pleroma.Web.Streamer do alias Pleroma.Chat.MessageReference alias Pleroma.Config alias Pleroma.Conversation.Participation + alias Pleroma.Marker alias Pleroma.Notification alias Pleroma.Object alias Pleroma.User @@ -321,6 +322,16 @@ defmodule Pleroma.Web.Streamer do end) end + defp do_stream(topic, %Marker{} = marker) do + Registry.dispatch(@registry, "#{topic}:#{marker.user_id}", fn list -> + Enum.each(list, fn {pid, _auth} -> + text = StreamerView.render("marker.json", marker) + + send(pid, {:text, text}) + end) + end) + end + defp do_stream(topic, item) do Logger.debug("Trying to push to #{topic}") Logger.debug("Pushing item to #{topic}") diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex @@ -7,6 +7,7 @@ defmodule Pleroma.Web.StreamerView do alias Pleroma.Activity alias Pleroma.Conversation.Participation + alias Pleroma.Marker alias Pleroma.Notification alias Pleroma.User alias Pleroma.Web.MastodonAPI.NotificationView @@ -164,6 +165,19 @@ defmodule Pleroma.Web.StreamerView do |> Jason.encode!() end + def render("marker.json", %Marker{} = marker) do + %{ + event: "marker", + payload: + Pleroma.Web.MastodonAPI.MarkerView.render( + "markers.json", + markers: [marker] + ) + |> Jason.encode!() + } + |> Jason.encode!() + end + def render("pleroma_respond.json", %{type: type, result: result} = params) do %{ event: "pleroma:respond", diff --git a/test/pleroma/notification_test.exs b/test/pleroma/notification_test.exs @@ -17,6 +17,7 @@ defmodule Pleroma.NotificationTest do alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.CommonAPI alias Pleroma.Web.MastodonAPI.NotificationView + alias Pleroma.Web.Streamer setup do Mox.stub_with(Pleroma.UnstubbedConfigMock, Pleroma.Test.StaticConfig) @@ -446,8 +447,7 @@ defmodule Pleroma.NotificationTest do describe "set_read_up_to()" do test "it sets all notifications as read up to a specified notification ID" do - user = insert(:user) - other_user = insert(:user) + [user, other_user] = insert_pair(:user) {:ok, _activity} = CommonAPI.post(user, %{ @@ -486,6 +486,37 @@ defmodule Pleroma.NotificationTest do assert m.last_read_id == to_string(n2.id) end + + @tag needs_streamer: true + test "it send updated marker to the 'user' and the 'user:notification' stream" do + %{user: user, token: oauth_token} = oauth_access(["read"]) + other_user = insert(:user) + + {:ok, _activity} = + CommonAPI.post(other_user, %{ + status: "hi @#{user.nickname}!" + }) + + [%{id: notification_id}] = Notification.for_user(user) + + notification_id = to_string(notification_id) + + task = + Task.async(fn -> + {:ok, _topic} = + Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) + + assert_receive {:text, event}, 4_000 + + assert %{"event" => "marker", "payload" => payload} = Jason.decode!(event) + + assert %{"notifications" => %{"last_read_id" => ^notification_id}} = + Jason.decode!(payload) + end) + + Notification.set_read_up_to(user, notification_id) + Task.await(task) + end end describe "for_user_since/2" do