commit: ba8b5682ccd77a819f644e714deab217195b4879
parent 367d5c65f6b5ad34833ff82104b69ac0dd1186d1
Author: nicole mikołajczyk <me@mkljczk.pl>
Date: Sat, 29 Nov 2025 16:49:29 +0100
Merge branch 'stream-marker-updates' into 'develop'
Stream marker updates
See merge request pleroma/pleroma!4354
Diffstat:
6 files changed, 69 insertions(+), 7 deletions(-)
diff --git a/changelog.d/stream-marker-updates.add b/changelog.d/stream-marker-updates.add
@@ -0,0 +1 @@
+Stream marker updates
diff --git a/lib/pleroma/notification.ex b/lib/pleroma/notification.ex
@@ -282,10 +282,15 @@ defmodule Pleroma.Notification do
select: n.id
)
- Multi.new()
- |> Multi.update_all(:ids, query, set: [seen: true, updated_at: NaiveDateTime.utc_now()])
- |> Marker.multi_set_last_read_id(user, "notifications")
- |> Repo.transaction()
+ {:ok, %{marker: marker}} =
+ Multi.new()
+ |> Multi.update_all(:ids, query, set: [seen: true, updated_at: NaiveDateTime.utc_now()])
+ |> Marker.multi_set_last_read_id(user, "notifications")
+ |> Repo.transaction()
+
+ Streamer.stream(["user", "user:notification"], marker)
+
+ {:ok, %{marker: marker}}
end
@spec read_one(User.t(), String.t()) ::
diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex
@@ -10,6 +10,7 @@ defmodule Pleroma.Web.Streamer do
alias Pleroma.Chat.MessageReference
alias Pleroma.Config
alias Pleroma.Conversation.Participation
+ alias Pleroma.Marker
alias Pleroma.Notification
alias Pleroma.Object
alias Pleroma.User
@@ -321,6 +322,16 @@ defmodule Pleroma.Web.Streamer do
end)
end
+ defp do_stream(topic, %Marker{} = marker) do
+ Registry.dispatch(@registry, "#{topic}:#{marker.user_id}", fn list ->
+ Enum.each(list, fn {pid, _auth} ->
+ text = StreamerView.render("marker.json", marker)
+
+ send(pid, {:text, text})
+ end)
+ end)
+ end
+
defp do_stream(topic, item) do
Logger.debug("Trying to push to #{topic}")
Logger.debug("Pushing item to #{topic}")
diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex
@@ -7,6 +7,7 @@ defmodule Pleroma.Web.StreamerView do
alias Pleroma.Activity
alias Pleroma.Conversation.Participation
+ alias Pleroma.Marker
alias Pleroma.Notification
alias Pleroma.User
alias Pleroma.Web.MastodonAPI.NotificationView
@@ -164,6 +165,19 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!()
end
+ def render("marker.json", %Marker{} = marker) do
+ %{
+ event: "marker",
+ payload:
+ Pleroma.Web.MastodonAPI.MarkerView.render(
+ "markers.json",
+ markers: [marker]
+ )
+ |> Jason.encode!()
+ }
+ |> Jason.encode!()
+ end
+
def render("pleroma_respond.json", %{type: type, result: result} = params) do
%{
event: "pleroma:respond",
diff --git a/test/pleroma/notification_test.exs b/test/pleroma/notification_test.exs
@@ -17,6 +17,7 @@ defmodule Pleroma.NotificationTest do
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.MastodonAPI.NotificationView
+ alias Pleroma.Web.Streamer
setup do
Mox.stub_with(Pleroma.UnstubbedConfigMock, Pleroma.Test.StaticConfig)
@@ -446,8 +447,7 @@ defmodule Pleroma.NotificationTest do
describe "set_read_up_to()" do
test "it sets all notifications as read up to a specified notification ID" do
- user = insert(:user)
- other_user = insert(:user)
+ [user, other_user] = insert_pair(:user)
{:ok, _activity} =
CommonAPI.post(user, %{
@@ -486,6 +486,37 @@ defmodule Pleroma.NotificationTest do
assert m.last_read_id == to_string(n2.id)
end
+
+ @tag needs_streamer: true
+ test "it sends updated marker to the 'user' and the 'user:notification' stream" do
+ %{user: user, token: oauth_token} = oauth_access(["read"])
+ other_user = insert(:user)
+
+ {:ok, _activity} =
+ CommonAPI.post(other_user, %{
+ status: "hi @#{user.nickname}!"
+ })
+
+ [%{id: notification_id}] = Notification.for_user(user)
+
+ notification_id = to_string(notification_id)
+
+ task =
+ Task.async(fn ->
+ {:ok, _topic} =
+ Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
+
+ assert_receive {:text, event}, 4_000
+
+ assert %{"event" => "marker", "payload" => payload} = Jason.decode!(event)
+
+ assert %{"notifications" => %{"last_read_id" => ^notification_id}} =
+ Jason.decode!(payload)
+ end)
+
+ Notification.set_read_up_to(user, notification_id)
+ Task.await(task)
+ end
end
describe "for_user_since/2" do
diff --git a/test/pleroma/web/streamer_test.exs b/test/pleroma/web/streamer_test.exs
@@ -883,7 +883,7 @@ defmodule Pleroma.Web.StreamerTest do
assert Streamer.filtered_by_user?(user1, notif)
end
- test "it send non-reblog notification for reblog-muted actors", %{
+ test "it sends non-reblog notification for reblog-muted actors", %{
user: user1,
token: user1_token
} do