logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://anongit.hacktivis.me/git/pleroma.git/

streamer_test.exs (39740B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.StreamerTest do
  5. use Pleroma.DataCase
  6. import Pleroma.Factory
  7. alias Pleroma.Chat
  8. alias Pleroma.Chat.MessageReference
  9. alias Pleroma.Conversation.Participation
  10. alias Pleroma.List
  11. alias Pleroma.Object
  12. alias Pleroma.User
  13. alias Pleroma.Web.CommonAPI
  14. alias Pleroma.Web.Streamer
  15. alias Pleroma.Web.StreamerView
  16. @moduletag needs_streamer: true, capture_log: true
  17. setup do: clear_config([:instance, :skip_thread_containment])
  18. describe "get_topic/_ (unauthenticated)" do
  19. test "allows no stream" do
  20. assert {:ok, nil} = Streamer.get_topic(nil, nil, nil)
  21. end
  22. test "allows public" do
  23. assert {:ok, "public"} = Streamer.get_topic("public", nil, nil)
  24. assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil, nil)
  25. assert {:ok, "public:media"} = Streamer.get_topic("public:media", nil, nil)
  26. assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", nil, nil)
  27. end
  28. test "rejects local public streams if restricted_unauthenticated is on" do
  29. clear_config([:restrict_unauthenticated, :timelines, :local], true)
  30. assert {:error, :unauthorized} = Streamer.get_topic("public:local", nil, nil)
  31. assert {:error, :unauthorized} = Streamer.get_topic("public:local:media", nil, nil)
  32. end
  33. test "rejects remote public streams if restricted_unauthenticated is on" do
  34. clear_config([:restrict_unauthenticated, :timelines, :federated], true)
  35. assert {:error, :unauthorized} = Streamer.get_topic("public", nil, nil)
  36. assert {:error, :unauthorized} = Streamer.get_topic("public:media", nil, nil)
  37. assert {:error, :unauthorized} =
  38. Streamer.get_topic("public:remote", nil, nil, %{"instance" => "lain.com"})
  39. assert {:error, :unauthorized} =
  40. Streamer.get_topic("public:remote:media", nil, nil, %{"instance" => "lain.com"})
  41. end
  42. test "allows instance streams" do
  43. assert {:ok, "public:remote:lain.com"} =
  44. Streamer.get_topic("public:remote", nil, nil, %{"instance" => "lain.com"})
  45. assert {:ok, "public:remote:media:lain.com"} =
  46. Streamer.get_topic("public:remote:media", nil, nil, %{"instance" => "lain.com"})
  47. end
  48. test "allows hashtag streams" do
  49. assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", nil, nil, %{"tag" => "cofe"})
  50. end
  51. test "disallows user streams" do
  52. assert {:error, _} = Streamer.get_topic("user", nil, nil)
  53. assert {:error, _} = Streamer.get_topic("user:notification", nil, nil)
  54. assert {:error, _} = Streamer.get_topic("direct", nil, nil)
  55. end
  56. test "disallows list streams" do
  57. assert {:error, _} = Streamer.get_topic("list", nil, nil, %{"list" => 42})
  58. end
  59. end
  60. describe "get_topic/_ (authenticated)" do
  61. setup do: oauth_access(["read"])
  62. test "allows public streams (regardless of OAuth token scopes)", %{
  63. user: user,
  64. token: read_oauth_token
  65. } do
  66. with oauth_token <- [nil, read_oauth_token] do
  67. assert {:ok, "public"} = Streamer.get_topic("public", user, oauth_token)
  68. assert {:ok, "public:local"} = Streamer.get_topic("public:local", user, oauth_token)
  69. assert {:ok, "public:media"} = Streamer.get_topic("public:media", user, oauth_token)
  70. assert {:ok, "public:local:media"} =
  71. Streamer.get_topic("public:local:media", user, oauth_token)
  72. end
  73. end
  74. test "allows local public streams if restricted_unauthenticated is on", %{
  75. user: user,
  76. token: oauth_token
  77. } do
  78. clear_config([:restrict_unauthenticated, :timelines, :local], true)
  79. %{token: read_notifications_token} = oauth_access(["read:notifications"], user: user)
  80. %{token: badly_scoped_token} = oauth_access(["irrelevant:scope"], user: user)
  81. assert {:ok, "public:local"} = Streamer.get_topic("public:local", user, oauth_token)
  82. assert {:ok, "public:local:media"} =
  83. Streamer.get_topic("public:local:media", user, oauth_token)
  84. for token <- [read_notifications_token, badly_scoped_token] do
  85. assert {:error, :unauthorized} = Streamer.get_topic("public:local", user, token)
  86. assert {:error, :unauthorized} = Streamer.get_topic("public:local:media", user, token)
  87. end
  88. end
  89. test "allows remote public streams if restricted_unauthenticated is on", %{
  90. user: user,
  91. token: oauth_token
  92. } do
  93. clear_config([:restrict_unauthenticated, :timelines, :federated], true)
  94. %{token: read_notifications_token} = oauth_access(["read:notifications"], user: user)
  95. %{token: badly_scoped_token} = oauth_access(["irrelevant:scope"], user: user)
  96. assert {:ok, "public"} = Streamer.get_topic("public", user, oauth_token)
  97. assert {:ok, "public:media"} = Streamer.get_topic("public:media", user, oauth_token)
  98. assert {:ok, "public:remote:lain.com"} =
  99. Streamer.get_topic("public:remote", user, oauth_token, %{"instance" => "lain.com"})
  100. assert {:ok, "public:remote:media:lain.com"} =
  101. Streamer.get_topic("public:remote:media", user, oauth_token, %{
  102. "instance" => "lain.com"
  103. })
  104. for token <- [read_notifications_token, badly_scoped_token] do
  105. assert {:error, :unauthorized} = Streamer.get_topic("public", user, token)
  106. assert {:error, :unauthorized} = Streamer.get_topic("public:media", user, token)
  107. assert {:error, :unauthorized} =
  108. Streamer.get_topic("public:remote", user, token, %{
  109. "instance" => "lain.com"
  110. })
  111. assert {:error, :unauthorized} =
  112. Streamer.get_topic("public:remote:media", user, token, %{
  113. "instance" => "lain.com"
  114. })
  115. end
  116. end
  117. test "allows user streams (with proper OAuth token scopes)", %{
  118. user: user,
  119. token: read_oauth_token
  120. } do
  121. %{token: read_notifications_token} = oauth_access(["read:notifications"], user: user)
  122. %{token: read_statuses_token} = oauth_access(["read:statuses"], user: user)
  123. %{token: badly_scoped_token} = oauth_access(["irrelevant:scope"], user: user)
  124. expected_user_topic = "user:#{user.id}"
  125. expected_notification_topic = "user:notification:#{user.id}"
  126. expected_direct_topic = "direct:#{user.id}"
  127. expected_pleroma_chat_topic = "user:pleroma_chat:#{user.id}"
  128. for valid_user_token <- [read_oauth_token, read_statuses_token] do
  129. assert {:ok, ^expected_user_topic} = Streamer.get_topic("user", user, valid_user_token)
  130. assert {:ok, ^expected_direct_topic} =
  131. Streamer.get_topic("direct", user, valid_user_token)
  132. assert {:ok, ^expected_pleroma_chat_topic} =
  133. Streamer.get_topic("user:pleroma_chat", user, valid_user_token)
  134. end
  135. for invalid_user_token <- [read_notifications_token, badly_scoped_token],
  136. user_topic <- ["user", "direct", "user:pleroma_chat"] do
  137. assert {:error, :unauthorized} = Streamer.get_topic(user_topic, user, invalid_user_token)
  138. end
  139. for valid_notification_token <- [read_oauth_token, read_notifications_token] do
  140. assert {:ok, ^expected_notification_topic} =
  141. Streamer.get_topic("user:notification", user, valid_notification_token)
  142. end
  143. for invalid_notification_token <- [read_statuses_token, badly_scoped_token] do
  144. assert {:error, :unauthorized} =
  145. Streamer.get_topic("user:notification", user, invalid_notification_token)
  146. end
  147. end
  148. test "allows hashtag streams (regardless of OAuth token scopes)", %{
  149. user: user,
  150. token: read_oauth_token
  151. } do
  152. for oauth_token <- [nil, read_oauth_token] do
  153. assert {:ok, "hashtag:cofe"} =
  154. Streamer.get_topic("hashtag", user, oauth_token, %{"tag" => "cofe"})
  155. end
  156. end
  157. test "disallows registering to another user's stream", %{user: user, token: read_oauth_token} do
  158. another_user = insert(:user)
  159. assert {:error, _} = Streamer.get_topic("user:#{another_user.id}", user, read_oauth_token)
  160. assert {:error, _} =
  161. Streamer.get_topic("user:notification:#{another_user.id}", user, read_oauth_token)
  162. assert {:error, _} = Streamer.get_topic("direct:#{another_user.id}", user, read_oauth_token)
  163. end
  164. test "allows list stream that are owned by the user (with `read` or `read:lists` scopes)", %{
  165. user: user,
  166. token: read_oauth_token
  167. } do
  168. %{token: read_lists_token} = oauth_access(["read:lists"], user: user)
  169. %{token: invalid_token} = oauth_access(["irrelevant:scope"], user: user)
  170. {:ok, list} = List.create("Test", user)
  171. assert {:error, _} = Streamer.get_topic("list:#{list.id}", user, read_oauth_token)
  172. for valid_token <- [read_oauth_token, read_lists_token] do
  173. assert {:ok, _} = Streamer.get_topic("list", user, valid_token, %{"list" => list.id})
  174. end
  175. assert {:error, _} = Streamer.get_topic("list", user, invalid_token, %{"list" => list.id})
  176. end
  177. test "disallows list stream that are not owned by the user", %{user: user, token: oauth_token} do
  178. another_user = insert(:user)
  179. {:ok, list} = List.create("Test", another_user)
  180. assert {:error, _} = Streamer.get_topic("list:#{list.id}", user, oauth_token)
  181. assert {:error, _} = Streamer.get_topic("list", user, oauth_token, %{"list" => list.id})
  182. end
  183. end
  184. describe "user streams" do
  185. setup do
  186. %{user: user, token: token} = oauth_access(["read"])
  187. notify = insert(:notification, user: user, activity: build(:note_activity))
  188. {:ok, %{user: user, notify: notify, token: token}}
  189. end
  190. test "it streams the user's post in the 'user' stream", %{user: user, token: oauth_token} do
  191. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  192. {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
  193. assert_receive {:render_with_user, _, _, ^activity, _}
  194. refute Streamer.filtered_by_user?(user, activity)
  195. end
  196. test "it streams boosts of the user in the 'user' stream", %{user: user, token: oauth_token} do
  197. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  198. other_user = insert(:user)
  199. {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"})
  200. {:ok, announce} = CommonAPI.repeat(activity.id, user)
  201. assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce, _}
  202. refute Streamer.filtered_by_user?(user, announce)
  203. end
  204. test "it does not stream announces of the user's own posts in the 'user' stream", %{
  205. user: user,
  206. token: oauth_token
  207. } do
  208. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  209. other_user = insert(:user)
  210. {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
  211. {:ok, announce} = CommonAPI.repeat(activity.id, other_user)
  212. assert Streamer.filtered_by_user?(user, announce)
  213. end
  214. test "it does stream notifications announces of the user's own posts in the 'user' stream", %{
  215. user: user,
  216. token: oauth_token
  217. } do
  218. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  219. other_user = insert(:user)
  220. {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
  221. {:ok, announce} = CommonAPI.repeat(activity.id, other_user)
  222. notification =
  223. Pleroma.Notification
  224. |> Repo.get_by(%{user_id: user.id, activity_id: announce.id})
  225. |> Repo.preload(:activity)
  226. refute Streamer.filtered_by_user?(user, notification)
  227. end
  228. test "it streams boosts of mastodon user in the 'user' stream", %{
  229. user: user,
  230. token: oauth_token
  231. } do
  232. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  233. other_user = insert(:user)
  234. {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"})
  235. data =
  236. File.read!("test/fixtures/mastodon-announce.json")
  237. |> Jason.decode!()
  238. |> Map.put("object", activity.data["object"])
  239. |> Map.put("actor", user.ap_id)
  240. {:ok, %Pleroma.Activity{data: _data, local: false} = announce} =
  241. Pleroma.Web.ActivityPub.Transmogrifier.handle_incoming(data)
  242. assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce, _}
  243. refute Streamer.filtered_by_user?(user, announce)
  244. end
  245. test "it sends notify to in the 'user' stream", %{
  246. user: user,
  247. token: oauth_token,
  248. notify: notify
  249. } do
  250. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  251. Streamer.stream("user", notify)
  252. assert_receive {:render_with_user, _, _, ^notify, _}
  253. refute Streamer.filtered_by_user?(user, notify)
  254. end
  255. test "it sends notify to in the 'user:notification' stream", %{
  256. user: user,
  257. token: oauth_token,
  258. notify: notify
  259. } do
  260. Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
  261. Streamer.stream("user:notification", notify)
  262. assert_receive {:render_with_user, _, _, ^notify, _}
  263. refute Streamer.filtered_by_user?(user, notify)
  264. end
  265. test "it sends chat messages to the 'user:pleroma_chat' stream", %{
  266. user: user,
  267. token: oauth_token
  268. } do
  269. other_user = insert(:user)
  270. {:ok, create_activity} =
  271. CommonAPI.post_chat_message(other_user, user, "hey cirno", idempotency_key: "123")
  272. object = Object.normalize(create_activity, fetch: false)
  273. chat = Chat.get(user.id, other_user.ap_id)
  274. cm_ref = MessageReference.for_chat_and_object(chat, object)
  275. cm_ref = %{cm_ref | chat: chat, object: object}
  276. Streamer.get_topic_and_add_socket("user:pleroma_chat", user, oauth_token)
  277. Streamer.stream("user:pleroma_chat", {user, cm_ref})
  278. text =
  279. StreamerView.render(
  280. "chat_update.json",
  281. %{chat_message_reference: cm_ref},
  282. "user:pleroma_chat:#{user.id}"
  283. )
  284. assert text =~ "hey cirno"
  285. assert_receive {:text, ^text}
  286. end
  287. test "it sends chat messages to the 'user' stream", %{user: user, token: oauth_token} do
  288. other_user = insert(:user)
  289. {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno")
  290. object = Object.normalize(create_activity, fetch: false)
  291. chat = Chat.get(user.id, other_user.ap_id)
  292. cm_ref = MessageReference.for_chat_and_object(chat, object)
  293. cm_ref = %{cm_ref | chat: chat, object: object}
  294. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  295. Streamer.stream("user", {user, cm_ref})
  296. text =
  297. StreamerView.render(
  298. "chat_update.json",
  299. %{chat_message_reference: cm_ref},
  300. "user:#{user.id}"
  301. )
  302. assert text =~ "hey cirno"
  303. assert_receive {:text, ^text}
  304. end
  305. test "it sends chat message notifications to the 'user:notification' stream", %{
  306. user: user,
  307. token: oauth_token
  308. } do
  309. other_user = insert(:user)
  310. {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey")
  311. notify =
  312. Repo.get_by(Pleroma.Notification, user_id: user.id, activity_id: create_activity.id)
  313. |> Repo.preload(:activity)
  314. Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
  315. Streamer.stream("user:notification", notify)
  316. assert_receive {:render_with_user, _, _, ^notify, _}
  317. refute Streamer.filtered_by_user?(user, notify)
  318. end
  319. test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
  320. user: user,
  321. token: oauth_token
  322. } do
  323. blocked = insert(:user)
  324. {:ok, _user_relationship} = User.block(user, blocked)
  325. Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
  326. {:ok, activity} = CommonAPI.post(user, %{status: ":("})
  327. {:ok, _} = CommonAPI.favorite(activity.id, blocked)
  328. refute_receive _
  329. end
  330. test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
  331. user: user,
  332. token: oauth_token
  333. } do
  334. user2 = insert(:user)
  335. {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
  336. {:ok, _} = CommonAPI.add_mute(activity, user)
  337. Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
  338. {:ok, favorite_activity} = CommonAPI.favorite(activity.id, user2)
  339. refute_receive _
  340. assert Streamer.filtered_by_user?(user, favorite_activity)
  341. end
  342. test "it sends favorite to 'user:notification' stream'", %{
  343. user: user,
  344. token: oauth_token
  345. } do
  346. user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
  347. {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
  348. Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
  349. {:ok, favorite_activity} = CommonAPI.favorite(activity.id, user2)
  350. assert_receive {:render_with_user, _, "notification.json", notif, _}
  351. assert notif.activity.id == favorite_activity.id
  352. refute Streamer.filtered_by_user?(user, notif)
  353. end
  354. test "it doesn't send the 'user:notification' stream' when a domain is blocked", %{
  355. user: user,
  356. token: oauth_token
  357. } do
  358. user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
  359. {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
  360. {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
  361. Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
  362. {:ok, favorite_activity} = CommonAPI.favorite(activity.id, user2)
  363. refute_receive _
  364. assert Streamer.filtered_by_user?(user, favorite_activity)
  365. end
  366. test "it sends follow activities to the 'user:notification' stream", %{
  367. user: user,
  368. token: oauth_token
  369. } do
  370. user2 = insert(:user)
  371. Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
  372. {:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user, user2)
  373. assert_receive {:render_with_user, _, "notification.json", notif, _}
  374. assert notif.activity.id == follow_activity.id
  375. refute Streamer.filtered_by_user?(user, notif)
  376. end
  377. test "it sends follow relationships updates to the 'user' stream", %{
  378. user: user,
  379. token: oauth_token
  380. } do
  381. user_id = user.id
  382. other_user = insert(:user)
  383. other_user_id = other_user.id
  384. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  385. {:ok, _follower, _followed, _follow_activity} = CommonAPI.follow(other_user, user)
  386. assert_receive {:text, event}
  387. assert %{"event" => "pleroma:follow_relationships_update", "payload" => payload} =
  388. Jason.decode!(event)
  389. assert %{
  390. "follower" => %{
  391. "follower_count" => 0,
  392. "following_count" => 0,
  393. "id" => ^user_id
  394. },
  395. "following" => %{
  396. "follower_count" => 0,
  397. "following_count" => 0,
  398. "id" => ^other_user_id
  399. },
  400. "state" => "follow_pending"
  401. } = Jason.decode!(payload)
  402. assert_receive {:text, event}
  403. assert %{"event" => "pleroma:follow_relationships_update", "payload" => payload} =
  404. Jason.decode!(event)
  405. assert %{
  406. "follower" => %{
  407. "follower_count" => 0,
  408. "following_count" => 1,
  409. "id" => ^user_id
  410. },
  411. "following" => %{
  412. "follower_count" => 1,
  413. "following_count" => 0,
  414. "id" => ^other_user_id
  415. },
  416. "state" => "follow_accept"
  417. } = Jason.decode!(payload)
  418. end
  419. test "it streams edits in the 'user' stream", %{user: user, token: oauth_token} do
  420. sender = insert(:user)
  421. {:ok, _, _, _} = CommonAPI.follow(sender, user)
  422. {:ok, activity} = CommonAPI.post(sender, %{status: "hey"})
  423. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  424. {:ok, edited} = CommonAPI.update(activity, sender, %{status: "mew mew"})
  425. create = Pleroma.Activity.get_create_by_object_ap_id_with_object(activity.object.data["id"])
  426. assert_receive {:render_with_user, _, "status_update.json", ^create, _}
  427. refute Streamer.filtered_by_user?(user, edited)
  428. end
  429. test "it streams own edits in the 'user' stream", %{user: user, token: oauth_token} do
  430. {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
  431. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  432. {:ok, edited} = CommonAPI.update(activity, user, %{status: "mew mew"})
  433. create = Pleroma.Activity.get_create_by_object_ap_id_with_object(activity.object.data["id"])
  434. assert_receive {:render_with_user, _, "status_update.json", ^create, _}
  435. refute Streamer.filtered_by_user?(user, edited)
  436. end
  437. test "it streams posts containing followed hashtags on the 'user' stream", %{
  438. user: user,
  439. token: oauth_token
  440. } do
  441. hashtag = insert(:hashtag, %{name: "tenshi"})
  442. other_user = insert(:user)
  443. {:ok, user} = User.follow_hashtag(user, hashtag)
  444. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  445. {:ok, activity} = CommonAPI.post(other_user, %{status: "hey #tenshi"})
  446. assert_receive {:render_with_user, _, "update.json", ^activity, _}
  447. end
  448. test "should not stream private posts containing followed hashtags on the 'user' stream", %{
  449. user: user,
  450. token: oauth_token
  451. } do
  452. hashtag = insert(:hashtag, %{name: "tenshi"})
  453. other_user = insert(:user)
  454. {:ok, user} = User.follow_hashtag(user, hashtag)
  455. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  456. {:ok, activity} =
  457. CommonAPI.post(other_user, %{status: "hey #tenshi", visibility: "private"})
  458. refute_receive {:render_with_user, _, "update.json", ^activity, _}
  459. end
  460. end
  461. describe "public streams" do
  462. test "it sends to public (authenticated)" do
  463. %{user: user, token: oauth_token} = oauth_access(["read"])
  464. other_user = insert(:user)
  465. Streamer.get_topic_and_add_socket("public", user, oauth_token)
  466. {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
  467. assert_receive {:render_with_user, _, _, ^activity, _}
  468. refute Streamer.filtered_by_user?(other_user, activity)
  469. end
  470. test "it sends to public (unauthenticated)" do
  471. user = insert(:user)
  472. Streamer.get_topic_and_add_socket("public", nil, nil)
  473. {:ok, activity} = CommonAPI.post(user, %{status: "Test"})
  474. activity_id = activity.id
  475. assert_receive {:text, event}
  476. assert %{"event" => "update", "payload" => payload} = Jason.decode!(event)
  477. assert %{"id" => ^activity_id} = Jason.decode!(payload)
  478. {:ok, _} = CommonAPI.delete(activity.id, user)
  479. assert_receive {:text, event}
  480. assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
  481. end
  482. test "handles deletions" do
  483. %{user: user, token: oauth_token} = oauth_access(["read"])
  484. other_user = insert(:user)
  485. {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
  486. Streamer.get_topic_and_add_socket("public", user, oauth_token)
  487. {:ok, _} = CommonAPI.delete(activity.id, other_user)
  488. activity_id = activity.id
  489. assert_receive {:text, event}
  490. assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
  491. end
  492. test "it streams edits in the 'public' stream" do
  493. sender = insert(:user)
  494. Streamer.get_topic_and_add_socket("public", nil, nil)
  495. {:ok, activity} = CommonAPI.post(sender, %{status: "hey"})
  496. assert_receive {:text, _}
  497. {:ok, edited} = CommonAPI.update(activity, sender, %{status: "mew mew"})
  498. edited = Pleroma.Activity.normalize(edited)
  499. %{id: activity_id} = Pleroma.Activity.get_create_by_object_ap_id(edited.object.data["id"])
  500. assert_receive {:text, event}
  501. assert %{"event" => "status.update", "payload" => payload} = Jason.decode!(event)
  502. assert %{"id" => ^activity_id} = Jason.decode!(payload)
  503. refute Streamer.filtered_by_user?(sender, edited)
  504. end
  505. test "it streams multiple edits in the 'public' stream correctly" do
  506. sender = insert(:user)
  507. Streamer.get_topic_and_add_socket("public", nil, nil)
  508. {:ok, activity} = CommonAPI.post(sender, %{status: "hey"})
  509. assert_receive {:text, _}
  510. {:ok, edited} = CommonAPI.update(activity, sender, %{status: "mew mew"})
  511. edited = Pleroma.Activity.normalize(edited)
  512. %{id: activity_id} = Pleroma.Activity.get_create_by_object_ap_id(edited.object.data["id"])
  513. assert_receive {:text, event}
  514. assert %{"event" => "status.update", "payload" => payload} = Jason.decode!(event)
  515. assert %{"id" => ^activity_id} = Jason.decode!(payload)
  516. refute Streamer.filtered_by_user?(sender, edited)
  517. {:ok, edited} = CommonAPI.update(activity, sender, %{status: "mew mew 2"})
  518. edited = Pleroma.Activity.normalize(edited)
  519. %{id: activity_id} = Pleroma.Activity.get_create_by_object_ap_id(edited.object.data["id"])
  520. assert_receive {:text, event}
  521. assert %{"event" => "status.update", "payload" => payload} = Jason.decode!(event)
  522. assert %{"id" => ^activity_id, "content" => "mew mew 2"} = Jason.decode!(payload)
  523. refute Streamer.filtered_by_user?(sender, edited)
  524. end
  525. end
  526. describe "thread_containment/2" do
  527. test "it filters to user if recipients invalid and thread containment is enabled" do
  528. clear_config([:instance, :skip_thread_containment], false)
  529. author = insert(:user)
  530. %{user: user, token: oauth_token} = oauth_access(["read"])
  531. User.follow(user, author, :follow_accept)
  532. activity =
  533. insert(:note_activity,
  534. note:
  535. insert(:note,
  536. user: author,
  537. data: %{"to" => ["TEST-FFF"]}
  538. )
  539. )
  540. Streamer.get_topic_and_add_socket("public", user, oauth_token)
  541. Streamer.stream("public", activity)
  542. assert_receive {:render_with_user, _, _, ^activity, _}
  543. assert Streamer.filtered_by_user?(user, activity)
  544. end
  545. test "it sends message if recipients invalid and thread containment is disabled" do
  546. clear_config([:instance, :skip_thread_containment], true)
  547. author = insert(:user)
  548. %{user: user, token: oauth_token} = oauth_access(["read"])
  549. User.follow(user, author, :follow_accept)
  550. activity =
  551. insert(:note_activity,
  552. note:
  553. insert(:note,
  554. user: author,
  555. data: %{"to" => ["TEST-FFF"]}
  556. )
  557. )
  558. Streamer.get_topic_and_add_socket("public", user, oauth_token)
  559. Streamer.stream("public", activity)
  560. assert_receive {:render_with_user, _, _, ^activity, _}
  561. refute Streamer.filtered_by_user?(user, activity)
  562. end
  563. test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
  564. clear_config([:instance, :skip_thread_containment], false)
  565. author = insert(:user)
  566. user = insert(:user, skip_thread_containment: true)
  567. %{token: oauth_token} = oauth_access(["read"], user: user)
  568. User.follow(user, author, :follow_accept)
  569. activity =
  570. insert(:note_activity,
  571. note:
  572. insert(:note,
  573. user: author,
  574. data: %{"to" => ["TEST-FFF"]}
  575. )
  576. )
  577. Streamer.get_topic_and_add_socket("public", user, oauth_token)
  578. Streamer.stream("public", activity)
  579. assert_receive {:render_with_user, _, _, ^activity, _}
  580. refute Streamer.filtered_by_user?(user, activity)
  581. end
  582. end
  583. describe "blocks" do
  584. setup do: oauth_access(["read"])
  585. test "it filters messages involving blocked users", %{user: user, token: oauth_token} do
  586. blocked_user = insert(:user)
  587. {:ok, _user_relationship} = User.block(user, blocked_user)
  588. Streamer.get_topic_and_add_socket("public", user, oauth_token)
  589. {:ok, activity} = CommonAPI.post(blocked_user, %{status: "Test"})
  590. assert_receive {:render_with_user, _, _, ^activity, _}
  591. assert Streamer.filtered_by_user?(user, activity)
  592. end
  593. test "it filters messages transitively involving blocked users", %{
  594. user: blocker,
  595. token: blocker_token
  596. } do
  597. blockee = insert(:user)
  598. friend = insert(:user)
  599. Streamer.get_topic_and_add_socket("public", blocker, blocker_token)
  600. {:ok, _user_relationship} = User.block(blocker, blockee)
  601. {:ok, activity_one} = CommonAPI.post(friend, %{status: "hey! @#{blockee.nickname}"})
  602. assert_receive {:render_with_user, _, _, ^activity_one, _}
  603. assert Streamer.filtered_by_user?(blocker, activity_one)
  604. {:ok, activity_two} = CommonAPI.post(blockee, %{status: "hey! @#{friend.nickname}"})
  605. assert_receive {:render_with_user, _, _, ^activity_two, _}
  606. assert Streamer.filtered_by_user?(blocker, activity_two)
  607. {:ok, activity_three} = CommonAPI.post(blockee, %{status: "hey! @#{blocker.nickname}"})
  608. assert_receive {:render_with_user, _, _, ^activity_three, _}
  609. assert Streamer.filtered_by_user?(blocker, activity_three)
  610. end
  611. end
  612. describe "lists" do
  613. setup do: oauth_access(["read"])
  614. test "it doesn't send unwanted DMs to list", %{user: user_a, token: user_a_token} do
  615. user_b = insert(:user)
  616. user_c = insert(:user)
  617. {:ok, user_a, user_b} = User.follow(user_a, user_b)
  618. {:ok, list} = List.create("Test", user_a)
  619. {:ok, list} = List.follow(list, user_b)
  620. Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
  621. {:ok, _activity} =
  622. CommonAPI.post(user_b, %{
  623. status: "@#{user_c.nickname} Test",
  624. visibility: "direct"
  625. })
  626. refute_receive _
  627. end
  628. test "it doesn't send unwanted private posts to list", %{user: user_a, token: user_a_token} do
  629. user_b = insert(:user)
  630. {:ok, list} = List.create("Test", user_a)
  631. {:ok, list} = List.follow(list, user_b)
  632. Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
  633. {:ok, _activity} =
  634. CommonAPI.post(user_b, %{
  635. status: "Test",
  636. visibility: "private"
  637. })
  638. refute_receive _
  639. end
  640. test "it sends wanted private posts to list", %{user: user_a, token: user_a_token} do
  641. user_b = insert(:user)
  642. {:ok, user_a, user_b} = User.follow(user_a, user_b)
  643. {:ok, list} = List.create("Test", user_a)
  644. {:ok, list} = List.follow(list, user_b)
  645. Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
  646. {:ok, activity} =
  647. CommonAPI.post(user_b, %{
  648. status: "Test",
  649. visibility: "private"
  650. })
  651. assert_receive {:render_with_user, _, _, ^activity, _}
  652. refute Streamer.filtered_by_user?(user_a, activity)
  653. end
  654. end
  655. describe "muted reblogs" do
  656. setup do: oauth_access(["read"])
  657. test "it filters muted reblogs", %{user: user1, token: user1_token} do
  658. user2 = insert(:user)
  659. user3 = insert(:user)
  660. CommonAPI.follow(user2, user1)
  661. CommonAPI.hide_reblogs(user2, user1)
  662. {:ok, create_activity} = CommonAPI.post(user3, %{status: "I'm kawen"})
  663. Streamer.get_topic_and_add_socket("user", user1, user1_token)
  664. {:ok, announce_activity} = CommonAPI.repeat(create_activity.id, user2)
  665. assert_receive {:render_with_user, _, _, ^announce_activity, _}
  666. assert Streamer.filtered_by_user?(user1, announce_activity)
  667. end
  668. test "it filters reblog notification for reblog-muted actors", %{
  669. user: user1,
  670. token: user1_token
  671. } do
  672. user2 = insert(:user)
  673. CommonAPI.follow(user2, user1)
  674. CommonAPI.hide_reblogs(user2, user1)
  675. {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"})
  676. Streamer.get_topic_and_add_socket("user", user1, user1_token)
  677. {:ok, _announce_activity} = CommonAPI.repeat(create_activity.id, user2)
  678. assert_receive {:render_with_user, _, "notification.json", notif, _}
  679. assert Streamer.filtered_by_user?(user1, notif)
  680. end
  681. test "it send non-reblog notification for reblog-muted actors", %{
  682. user: user1,
  683. token: user1_token
  684. } do
  685. user2 = insert(:user)
  686. CommonAPI.follow(user2, user1)
  687. CommonAPI.hide_reblogs(user2, user1)
  688. {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"})
  689. Streamer.get_topic_and_add_socket("user", user1, user1_token)
  690. {:ok, _favorite_activity} = CommonAPI.favorite(create_activity.id, user2)
  691. assert_receive {:render_with_user, _, "notification.json", notif, _}
  692. refute Streamer.filtered_by_user?(user1, notif)
  693. end
  694. end
  695. describe "muted threads" do
  696. test "it filters posts from muted threads" do
  697. user = insert(:user)
  698. %{user: user2, token: user2_token} = oauth_access(["read"])
  699. Streamer.get_topic_and_add_socket("user", user2, user2_token)
  700. {:ok, user2, user, _activity} = CommonAPI.follow(user, user2)
  701. {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
  702. {:ok, _} = CommonAPI.add_mute(activity, user2)
  703. assert_receive {:render_with_user, _, _, ^activity, _}
  704. assert Streamer.filtered_by_user?(user2, activity)
  705. end
  706. end
  707. describe "direct streams" do
  708. setup do: oauth_access(["read"])
  709. test "it sends conversation update to the 'direct' stream", %{user: user, token: oauth_token} do
  710. another_user = insert(:user)
  711. Streamer.get_topic_and_add_socket("direct", user, oauth_token)
  712. {:ok, _create_activity} =
  713. CommonAPI.post(another_user, %{
  714. status: "hey @#{user.nickname}",
  715. visibility: "direct"
  716. })
  717. assert_receive {:text, received_event}
  718. assert %{"event" => "conversation", "payload" => received_payload} =
  719. Jason.decode!(received_event)
  720. assert %{"last_status" => last_status} = Jason.decode!(received_payload)
  721. [participation] = Participation.for_user(user)
  722. assert last_status["pleroma"]["direct_conversation_id"] == participation.id
  723. end
  724. test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted",
  725. %{user: user, token: oauth_token} do
  726. another_user = insert(:user)
  727. Streamer.get_topic_and_add_socket("direct", user, oauth_token)
  728. {:ok, create_activity} =
  729. CommonAPI.post(another_user, %{
  730. status: "hi @#{user.nickname}",
  731. visibility: "direct"
  732. })
  733. create_activity_id = create_activity.id
  734. assert_receive {:render_with_user, _, _, ^create_activity, _}
  735. assert_receive {:text, received_conversation1}
  736. assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
  737. {:ok, _} = CommonAPI.delete(create_activity_id, another_user)
  738. assert_receive {:text, received_event}
  739. assert %{"event" => "delete", "payload" => ^create_activity_id} =
  740. Jason.decode!(received_event)
  741. refute_receive _
  742. end
  743. @tag :erratic
  744. test "it sends conversation update to the 'direct' stream when a message is deleted", %{
  745. user: user,
  746. token: oauth_token
  747. } do
  748. another_user = insert(:user)
  749. Streamer.get_topic_and_add_socket("direct", user, oauth_token)
  750. {:ok, create_activity} =
  751. CommonAPI.post(another_user, %{
  752. status: "hi @#{user.nickname}",
  753. visibility: "direct"
  754. })
  755. {:ok, create_activity2} =
  756. CommonAPI.post(another_user, %{
  757. status: "hi @#{user.nickname} 2",
  758. in_reply_to_status_id: create_activity.id,
  759. visibility: "direct"
  760. })
  761. assert_receive {:render_with_user, _, _, ^create_activity, _}
  762. assert_receive {:render_with_user, _, _, ^create_activity2, _}
  763. assert_receive {:text, received_conversation1}
  764. assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
  765. assert_receive {:text, received_conversation1}
  766. assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
  767. {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
  768. assert_receive {:text, received_event}
  769. assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
  770. assert_receive {:text, received_event}
  771. assert %{"event" => "conversation", "payload" => received_payload} =
  772. Jason.decode!(received_event)
  773. assert %{"last_status" => last_status} = Jason.decode!(received_payload)
  774. assert last_status["id"] == to_string(create_activity.id)
  775. end
  776. end
  777. describe "stop streaming if token got revoked" do
  778. setup do
  779. child_proc = fn start, finalize ->
  780. fn ->
  781. start.()
  782. receive do
  783. {StreamerTest, :ready} ->
  784. assert_receive {:render_with_user, _, "update.json", _, _}
  785. receive do
  786. {StreamerTest, :revoked} -> finalize.()
  787. end
  788. end
  789. end
  790. end
  791. starter = fn user, token ->
  792. fn -> Streamer.get_topic_and_add_socket("user", user, token) end
  793. end
  794. hit = fn -> assert_receive :close end
  795. miss = fn -> refute_receive :close end
  796. send_all = fn tasks, thing -> Enum.each(tasks, &send(&1.pid, thing)) end
  797. %{
  798. child_proc: child_proc,
  799. starter: starter,
  800. hit: hit,
  801. miss: miss,
  802. send_all: send_all
  803. }
  804. end
  805. test "do not revoke other tokens", %{
  806. child_proc: child_proc,
  807. starter: starter,
  808. hit: hit,
  809. miss: miss,
  810. send_all: send_all
  811. } do
  812. %{user: user, token: token} = oauth_access(["read"])
  813. %{token: token2} = oauth_access(["read"], user: user)
  814. %{user: user2, token: user2_token} = oauth_access(["read"])
  815. post_user = insert(:user)
  816. CommonAPI.follow(post_user, user)
  817. CommonAPI.follow(post_user, user2)
  818. tasks = [
  819. Task.async(child_proc.(starter.(user, token), hit)),
  820. Task.async(child_proc.(starter.(user, token2), miss)),
  821. Task.async(child_proc.(starter.(user2, user2_token), miss))
  822. ]
  823. {:ok, _} =
  824. CommonAPI.post(post_user, %{
  825. status: "hi"
  826. })
  827. send_all.(tasks, {StreamerTest, :ready})
  828. Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token)
  829. send_all.(tasks, {StreamerTest, :revoked})
  830. Enum.each(tasks, &Task.await/1)
  831. end
  832. test "revoke all streams for this token", %{
  833. child_proc: child_proc,
  834. starter: starter,
  835. hit: hit,
  836. send_all: send_all
  837. } do
  838. %{user: user, token: token} = oauth_access(["read"])
  839. post_user = insert(:user)
  840. CommonAPI.follow(post_user, user)
  841. tasks = [
  842. Task.async(child_proc.(starter.(user, token), hit)),
  843. Task.async(child_proc.(starter.(user, token), hit))
  844. ]
  845. {:ok, _} =
  846. CommonAPI.post(post_user, %{
  847. status: "hi"
  848. })
  849. send_all.(tasks, {StreamerTest, :ready})
  850. Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token)
  851. send_all.(tasks, {StreamerTest, :revoked})
  852. Enum.each(tasks, &Task.await/1)
  853. end
  854. end
  855. end