logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://hacktivis.me/git/pleroma.git

streamer_test.exs (38689B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.StreamerTest do
  5. use Pleroma.DataCase
  6. import Pleroma.Factory
  7. alias Pleroma.Chat
  8. alias Pleroma.Chat.MessageReference
  9. alias Pleroma.Conversation.Participation
  10. alias Pleroma.List
  11. alias Pleroma.Object
  12. alias Pleroma.User
  13. alias Pleroma.Web.CommonAPI
  14. alias Pleroma.Web.Streamer
  15. alias Pleroma.Web.StreamerView
  16. @moduletag needs_streamer: true, capture_log: true
  17. setup do: clear_config([:instance, :skip_thread_containment])
  18. describe "get_topic/_ (unauthenticated)" do
  19. test "allows no stream" do
  20. assert {:ok, nil} = Streamer.get_topic(nil, nil, nil)
  21. end
  22. test "allows public" do
  23. assert {:ok, "public"} = Streamer.get_topic("public", nil, nil)
  24. assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil, nil)
  25. assert {:ok, "public:media"} = Streamer.get_topic("public:media", nil, nil)
  26. assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", nil, nil)
  27. end
  28. test "rejects local public streams if restricted_unauthenticated is on" do
  29. clear_config([:restrict_unauthenticated, :timelines, :local], true)
  30. assert {:error, :unauthorized} = Streamer.get_topic("public:local", nil, nil)
  31. assert {:error, :unauthorized} = Streamer.get_topic("public:local:media", nil, nil)
  32. end
  33. test "rejects remote public streams if restricted_unauthenticated is on" do
  34. clear_config([:restrict_unauthenticated, :timelines, :federated], true)
  35. assert {:error, :unauthorized} = Streamer.get_topic("public", nil, nil)
  36. assert {:error, :unauthorized} = Streamer.get_topic("public:media", nil, nil)
  37. assert {:error, :unauthorized} =
  38. Streamer.get_topic("public:remote", nil, nil, %{"instance" => "lain.com"})
  39. assert {:error, :unauthorized} =
  40. Streamer.get_topic("public:remote:media", nil, nil, %{"instance" => "lain.com"})
  41. end
  42. test "allows instance streams" do
  43. assert {:ok, "public:remote:lain.com"} =
  44. Streamer.get_topic("public:remote", nil, nil, %{"instance" => "lain.com"})
  45. assert {:ok, "public:remote:media:lain.com"} =
  46. Streamer.get_topic("public:remote:media", nil, nil, %{"instance" => "lain.com"})
  47. end
  48. test "allows hashtag streams" do
  49. assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", nil, nil, %{"tag" => "cofe"})
  50. end
  51. test "disallows user streams" do
  52. assert {:error, _} = Streamer.get_topic("user", nil, nil)
  53. assert {:error, _} = Streamer.get_topic("user:notification", nil, nil)
  54. assert {:error, _} = Streamer.get_topic("direct", nil, nil)
  55. end
  56. test "disallows list streams" do
  57. assert {:error, _} = Streamer.get_topic("list", nil, nil, %{"list" => 42})
  58. end
  59. end
  60. describe "get_topic/_ (authenticated)" do
  61. setup do: oauth_access(["read"])
  62. test "allows public streams (regardless of OAuth token scopes)", %{
  63. user: user,
  64. token: read_oauth_token
  65. } do
  66. with oauth_token <- [nil, read_oauth_token] do
  67. assert {:ok, "public"} = Streamer.get_topic("public", user, oauth_token)
  68. assert {:ok, "public:local"} = Streamer.get_topic("public:local", user, oauth_token)
  69. assert {:ok, "public:media"} = Streamer.get_topic("public:media", user, oauth_token)
  70. assert {:ok, "public:local:media"} =
  71. Streamer.get_topic("public:local:media", user, oauth_token)
  72. end
  73. end
  74. test "allows local public streams if restricted_unauthenticated is on", %{
  75. user: user,
  76. token: oauth_token
  77. } do
  78. clear_config([:restrict_unauthenticated, :timelines, :local], true)
  79. %{token: read_notifications_token} = oauth_access(["read:notifications"], user: user)
  80. %{token: badly_scoped_token} = oauth_access(["irrelevant:scope"], user: user)
  81. assert {:ok, "public:local"} = Streamer.get_topic("public:local", user, oauth_token)
  82. assert {:ok, "public:local:media"} =
  83. Streamer.get_topic("public:local:media", user, oauth_token)
  84. for token <- [read_notifications_token, badly_scoped_token] do
  85. assert {:error, :unauthorized} = Streamer.get_topic("public:local", user, token)
  86. assert {:error, :unauthorized} = Streamer.get_topic("public:local:media", user, token)
  87. end
  88. end
  89. test "allows remote public streams if restricted_unauthenticated is on", %{
  90. user: user,
  91. token: oauth_token
  92. } do
  93. clear_config([:restrict_unauthenticated, :timelines, :federated], true)
  94. %{token: read_notifications_token} = oauth_access(["read:notifications"], user: user)
  95. %{token: badly_scoped_token} = oauth_access(["irrelevant:scope"], user: user)
  96. assert {:ok, "public"} = Streamer.get_topic("public", user, oauth_token)
  97. assert {:ok, "public:media"} = Streamer.get_topic("public:media", user, oauth_token)
  98. assert {:ok, "public:remote:lain.com"} =
  99. Streamer.get_topic("public:remote", user, oauth_token, %{"instance" => "lain.com"})
  100. assert {:ok, "public:remote:media:lain.com"} =
  101. Streamer.get_topic("public:remote:media", user, oauth_token, %{
  102. "instance" => "lain.com"
  103. })
  104. for token <- [read_notifications_token, badly_scoped_token] do
  105. assert {:error, :unauthorized} = Streamer.get_topic("public", user, token)
  106. assert {:error, :unauthorized} = Streamer.get_topic("public:media", user, token)
  107. assert {:error, :unauthorized} =
  108. Streamer.get_topic("public:remote", user, token, %{
  109. "instance" => "lain.com"
  110. })
  111. assert {:error, :unauthorized} =
  112. Streamer.get_topic("public:remote:media", user, token, %{
  113. "instance" => "lain.com"
  114. })
  115. end
  116. end
  117. test "allows user streams (with proper OAuth token scopes)", %{
  118. user: user,
  119. token: read_oauth_token
  120. } do
  121. %{token: read_notifications_token} = oauth_access(["read:notifications"], user: user)
  122. %{token: read_statuses_token} = oauth_access(["read:statuses"], user: user)
  123. %{token: badly_scoped_token} = oauth_access(["irrelevant:scope"], user: user)
  124. expected_user_topic = "user:#{user.id}"
  125. expected_notification_topic = "user:notification:#{user.id}"
  126. expected_direct_topic = "direct:#{user.id}"
  127. expected_pleroma_chat_topic = "user:pleroma_chat:#{user.id}"
  128. for valid_user_token <- [read_oauth_token, read_statuses_token] do
  129. assert {:ok, ^expected_user_topic} = Streamer.get_topic("user", user, valid_user_token)
  130. assert {:ok, ^expected_direct_topic} =
  131. Streamer.get_topic("direct", user, valid_user_token)
  132. assert {:ok, ^expected_pleroma_chat_topic} =
  133. Streamer.get_topic("user:pleroma_chat", user, valid_user_token)
  134. end
  135. for invalid_user_token <- [read_notifications_token, badly_scoped_token],
  136. user_topic <- ["user", "direct", "user:pleroma_chat"] do
  137. assert {:error, :unauthorized} = Streamer.get_topic(user_topic, user, invalid_user_token)
  138. end
  139. for valid_notification_token <- [read_oauth_token, read_notifications_token] do
  140. assert {:ok, ^expected_notification_topic} =
  141. Streamer.get_topic("user:notification", user, valid_notification_token)
  142. end
  143. for invalid_notification_token <- [read_statuses_token, badly_scoped_token] do
  144. assert {:error, :unauthorized} =
  145. Streamer.get_topic("user:notification", user, invalid_notification_token)
  146. end
  147. end
  148. test "allows hashtag streams (regardless of OAuth token scopes)", %{
  149. user: user,
  150. token: read_oauth_token
  151. } do
  152. for oauth_token <- [nil, read_oauth_token] do
  153. assert {:ok, "hashtag:cofe"} =
  154. Streamer.get_topic("hashtag", user, oauth_token, %{"tag" => "cofe"})
  155. end
  156. end
  157. test "disallows registering to another user's stream", %{user: user, token: read_oauth_token} do
  158. another_user = insert(:user)
  159. assert {:error, _} = Streamer.get_topic("user:#{another_user.id}", user, read_oauth_token)
  160. assert {:error, _} =
  161. Streamer.get_topic("user:notification:#{another_user.id}", user, read_oauth_token)
  162. assert {:error, _} = Streamer.get_topic("direct:#{another_user.id}", user, read_oauth_token)
  163. end
  164. test "allows list stream that are owned by the user (with `read` or `read:lists` scopes)", %{
  165. user: user,
  166. token: read_oauth_token
  167. } do
  168. %{token: read_lists_token} = oauth_access(["read:lists"], user: user)
  169. %{token: invalid_token} = oauth_access(["irrelevant:scope"], user: user)
  170. {:ok, list} = List.create("Test", user)
  171. assert {:error, _} = Streamer.get_topic("list:#{list.id}", user, read_oauth_token)
  172. for valid_token <- [read_oauth_token, read_lists_token] do
  173. assert {:ok, _} = Streamer.get_topic("list", user, valid_token, %{"list" => list.id})
  174. end
  175. assert {:error, _} = Streamer.get_topic("list", user, invalid_token, %{"list" => list.id})
  176. end
  177. test "disallows list stream that are not owned by the user", %{user: user, token: oauth_token} do
  178. another_user = insert(:user)
  179. {:ok, list} = List.create("Test", another_user)
  180. assert {:error, _} = Streamer.get_topic("list:#{list.id}", user, oauth_token)
  181. assert {:error, _} = Streamer.get_topic("list", user, oauth_token, %{"list" => list.id})
  182. end
  183. end
  184. describe "user streams" do
  185. setup do
  186. %{user: user, token: token} = oauth_access(["read"])
  187. notify = insert(:notification, user: user, activity: build(:note_activity))
  188. {:ok, %{user: user, notify: notify, token: token}}
  189. end
  190. test "it streams the user's post in the 'user' stream", %{user: user, token: oauth_token} do
  191. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  192. {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
  193. assert_receive {:render_with_user, _, _, ^activity, _}
  194. refute Streamer.filtered_by_user?(user, activity)
  195. end
  196. test "it streams boosts of the user in the 'user' stream", %{user: user, token: oauth_token} do
  197. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  198. other_user = insert(:user)
  199. {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"})
  200. {:ok, announce} = CommonAPI.repeat(activity.id, user)
  201. assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce, _}
  202. refute Streamer.filtered_by_user?(user, announce)
  203. end
  204. test "it does not stream announces of the user's own posts in the 'user' stream", %{
  205. user: user,
  206. token: oauth_token
  207. } do
  208. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  209. other_user = insert(:user)
  210. {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
  211. {:ok, announce} = CommonAPI.repeat(activity.id, other_user)
  212. assert Streamer.filtered_by_user?(user, announce)
  213. end
  214. test "it does stream notifications announces of the user's own posts in the 'user' stream", %{
  215. user: user,
  216. token: oauth_token
  217. } do
  218. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  219. other_user = insert(:user)
  220. {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
  221. {:ok, announce} = CommonAPI.repeat(activity.id, other_user)
  222. notification =
  223. Pleroma.Notification
  224. |> Repo.get_by(%{user_id: user.id, activity_id: announce.id})
  225. |> Repo.preload(:activity)
  226. refute Streamer.filtered_by_user?(user, notification)
  227. end
  228. test "it streams boosts of mastodon user in the 'user' stream", %{
  229. user: user,
  230. token: oauth_token
  231. } do
  232. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  233. other_user = insert(:user)
  234. {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"})
  235. data =
  236. File.read!("test/fixtures/mastodon-announce.json")
  237. |> Jason.decode!()
  238. |> Map.put("object", activity.data["object"])
  239. |> Map.put("actor", user.ap_id)
  240. {:ok, %Pleroma.Activity{data: _data, local: false} = announce} =
  241. Pleroma.Web.ActivityPub.Transmogrifier.handle_incoming(data)
  242. assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce, _}
  243. refute Streamer.filtered_by_user?(user, announce)
  244. end
  245. test "it sends notify to in the 'user' stream", %{
  246. user: user,
  247. token: oauth_token,
  248. notify: notify
  249. } do
  250. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  251. Streamer.stream("user", notify)
  252. assert_receive {:render_with_user, _, _, ^notify, _}
  253. refute Streamer.filtered_by_user?(user, notify)
  254. end
  255. test "it sends notify to in the 'user:notification' stream", %{
  256. user: user,
  257. token: oauth_token,
  258. notify: notify
  259. } do
  260. Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
  261. Streamer.stream("user:notification", notify)
  262. assert_receive {:render_with_user, _, _, ^notify, _}
  263. refute Streamer.filtered_by_user?(user, notify)
  264. end
  265. test "it sends chat messages to the 'user:pleroma_chat' stream", %{
  266. user: user,
  267. token: oauth_token
  268. } do
  269. other_user = insert(:user)
  270. {:ok, create_activity} =
  271. CommonAPI.post_chat_message(other_user, user, "hey cirno", idempotency_key: "123")
  272. object = Object.normalize(create_activity, fetch: false)
  273. chat = Chat.get(user.id, other_user.ap_id)
  274. cm_ref = MessageReference.for_chat_and_object(chat, object)
  275. cm_ref = %{cm_ref | chat: chat, object: object}
  276. Streamer.get_topic_and_add_socket("user:pleroma_chat", user, oauth_token)
  277. Streamer.stream("user:pleroma_chat", {user, cm_ref})
  278. text =
  279. StreamerView.render(
  280. "chat_update.json",
  281. %{chat_message_reference: cm_ref},
  282. "user:pleroma_chat:#{user.id}"
  283. )
  284. assert text =~ "hey cirno"
  285. assert_receive {:text, ^text}
  286. end
  287. test "it sends chat messages to the 'user' stream", %{user: user, token: oauth_token} do
  288. other_user = insert(:user)
  289. {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno")
  290. object = Object.normalize(create_activity, fetch: false)
  291. chat = Chat.get(user.id, other_user.ap_id)
  292. cm_ref = MessageReference.for_chat_and_object(chat, object)
  293. cm_ref = %{cm_ref | chat: chat, object: object}
  294. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  295. Streamer.stream("user", {user, cm_ref})
  296. text =
  297. StreamerView.render(
  298. "chat_update.json",
  299. %{chat_message_reference: cm_ref},
  300. "user:#{user.id}"
  301. )
  302. assert text =~ "hey cirno"
  303. assert_receive {:text, ^text}
  304. end
  305. test "it sends chat message notifications to the 'user:notification' stream", %{
  306. user: user,
  307. token: oauth_token
  308. } do
  309. other_user = insert(:user)
  310. {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey")
  311. notify =
  312. Repo.get_by(Pleroma.Notification, user_id: user.id, activity_id: create_activity.id)
  313. |> Repo.preload(:activity)
  314. Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
  315. Streamer.stream("user:notification", notify)
  316. assert_receive {:render_with_user, _, _, ^notify, _}
  317. refute Streamer.filtered_by_user?(user, notify)
  318. end
  319. test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
  320. user: user,
  321. token: oauth_token
  322. } do
  323. blocked = insert(:user)
  324. {:ok, _user_relationship} = User.block(user, blocked)
  325. Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
  326. {:ok, activity} = CommonAPI.post(user, %{status: ":("})
  327. {:ok, _} = CommonAPI.favorite(blocked, activity.id)
  328. refute_receive _
  329. end
  330. test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
  331. user: user,
  332. token: oauth_token
  333. } do
  334. user2 = insert(:user)
  335. {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
  336. {:ok, _} = CommonAPI.add_mute(user, activity)
  337. Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
  338. {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
  339. refute_receive _
  340. assert Streamer.filtered_by_user?(user, favorite_activity)
  341. end
  342. test "it sends favorite to 'user:notification' stream'", %{
  343. user: user,
  344. token: oauth_token
  345. } do
  346. user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
  347. {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
  348. Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
  349. {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
  350. assert_receive {:render_with_user, _, "notification.json", notif, _}
  351. assert notif.activity.id == favorite_activity.id
  352. refute Streamer.filtered_by_user?(user, notif)
  353. end
  354. test "it doesn't send the 'user:notification' stream' when a domain is blocked", %{
  355. user: user,
  356. token: oauth_token
  357. } do
  358. user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
  359. {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
  360. {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
  361. Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
  362. {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
  363. refute_receive _
  364. assert Streamer.filtered_by_user?(user, favorite_activity)
  365. end
  366. test "it sends follow activities to the 'user:notification' stream", %{
  367. user: user,
  368. token: oauth_token
  369. } do
  370. user2 = insert(:user)
  371. Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
  372. {:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user)
  373. assert_receive {:render_with_user, _, "notification.json", notif, _}
  374. assert notif.activity.id == follow_activity.id
  375. refute Streamer.filtered_by_user?(user, notif)
  376. end
  377. test "it sends follow relationships updates to the 'user' stream", %{
  378. user: user,
  379. token: oauth_token
  380. } do
  381. user_id = user.id
  382. other_user = insert(:user)
  383. other_user_id = other_user.id
  384. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  385. {:ok, _follower, _followed, _follow_activity} = CommonAPI.follow(user, other_user)
  386. assert_receive {:text, event}
  387. assert %{"event" => "pleroma:follow_relationships_update", "payload" => payload} =
  388. Jason.decode!(event)
  389. assert %{
  390. "follower" => %{
  391. "follower_count" => 0,
  392. "following_count" => 0,
  393. "id" => ^user_id
  394. },
  395. "following" => %{
  396. "follower_count" => 0,
  397. "following_count" => 0,
  398. "id" => ^other_user_id
  399. },
  400. "state" => "follow_pending"
  401. } = Jason.decode!(payload)
  402. assert_receive {:text, event}
  403. assert %{"event" => "pleroma:follow_relationships_update", "payload" => payload} =
  404. Jason.decode!(event)
  405. assert %{
  406. "follower" => %{
  407. "follower_count" => 0,
  408. "following_count" => 1,
  409. "id" => ^user_id
  410. },
  411. "following" => %{
  412. "follower_count" => 1,
  413. "following_count" => 0,
  414. "id" => ^other_user_id
  415. },
  416. "state" => "follow_accept"
  417. } = Jason.decode!(payload)
  418. end
  419. test "it streams edits in the 'user' stream", %{user: user, token: oauth_token} do
  420. sender = insert(:user)
  421. {:ok, _, _, _} = CommonAPI.follow(user, sender)
  422. {:ok, activity} = CommonAPI.post(sender, %{status: "hey"})
  423. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  424. {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew"})
  425. create = Pleroma.Activity.get_create_by_object_ap_id_with_object(activity.object.data["id"])
  426. assert_receive {:render_with_user, _, "status_update.json", ^create, _}
  427. refute Streamer.filtered_by_user?(user, edited)
  428. end
  429. test "it streams own edits in the 'user' stream", %{user: user, token: oauth_token} do
  430. {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
  431. Streamer.get_topic_and_add_socket("user", user, oauth_token)
  432. {:ok, edited} = CommonAPI.update(user, activity, %{status: "mew mew"})
  433. create = Pleroma.Activity.get_create_by_object_ap_id_with_object(activity.object.data["id"])
  434. assert_receive {:render_with_user, _, "status_update.json", ^create, _}
  435. refute Streamer.filtered_by_user?(user, edited)
  436. end
  437. end
  438. describe "public streams" do
  439. test "it sends to public (authenticated)" do
  440. %{user: user, token: oauth_token} = oauth_access(["read"])
  441. other_user = insert(:user)
  442. Streamer.get_topic_and_add_socket("public", user, oauth_token)
  443. {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
  444. assert_receive {:render_with_user, _, _, ^activity, _}
  445. refute Streamer.filtered_by_user?(other_user, activity)
  446. end
  447. test "it sends to public (unauthenticated)" do
  448. user = insert(:user)
  449. Streamer.get_topic_and_add_socket("public", nil, nil)
  450. {:ok, activity} = CommonAPI.post(user, %{status: "Test"})
  451. activity_id = activity.id
  452. assert_receive {:text, event}
  453. assert %{"event" => "update", "payload" => payload} = Jason.decode!(event)
  454. assert %{"id" => ^activity_id} = Jason.decode!(payload)
  455. {:ok, _} = CommonAPI.delete(activity.id, user)
  456. assert_receive {:text, event}
  457. assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
  458. end
  459. test "handles deletions" do
  460. %{user: user, token: oauth_token} = oauth_access(["read"])
  461. other_user = insert(:user)
  462. {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
  463. Streamer.get_topic_and_add_socket("public", user, oauth_token)
  464. {:ok, _} = CommonAPI.delete(activity.id, other_user)
  465. activity_id = activity.id
  466. assert_receive {:text, event}
  467. assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
  468. end
  469. test "it streams edits in the 'public' stream" do
  470. sender = insert(:user)
  471. Streamer.get_topic_and_add_socket("public", nil, nil)
  472. {:ok, activity} = CommonAPI.post(sender, %{status: "hey"})
  473. assert_receive {:text, _}
  474. {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew"})
  475. edited = Pleroma.Activity.normalize(edited)
  476. %{id: activity_id} = Pleroma.Activity.get_create_by_object_ap_id(edited.object.data["id"])
  477. assert_receive {:text, event}
  478. assert %{"event" => "status.update", "payload" => payload} = Jason.decode!(event)
  479. assert %{"id" => ^activity_id} = Jason.decode!(payload)
  480. refute Streamer.filtered_by_user?(sender, edited)
  481. end
  482. test "it streams multiple edits in the 'public' stream correctly" do
  483. sender = insert(:user)
  484. Streamer.get_topic_and_add_socket("public", nil, nil)
  485. {:ok, activity} = CommonAPI.post(sender, %{status: "hey"})
  486. assert_receive {:text, _}
  487. {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew"})
  488. edited = Pleroma.Activity.normalize(edited)
  489. %{id: activity_id} = Pleroma.Activity.get_create_by_object_ap_id(edited.object.data["id"])
  490. assert_receive {:text, event}
  491. assert %{"event" => "status.update", "payload" => payload} = Jason.decode!(event)
  492. assert %{"id" => ^activity_id} = Jason.decode!(payload)
  493. refute Streamer.filtered_by_user?(sender, edited)
  494. {:ok, edited} = CommonAPI.update(sender, activity, %{status: "mew mew 2"})
  495. edited = Pleroma.Activity.normalize(edited)
  496. %{id: activity_id} = Pleroma.Activity.get_create_by_object_ap_id(edited.object.data["id"])
  497. assert_receive {:text, event}
  498. assert %{"event" => "status.update", "payload" => payload} = Jason.decode!(event)
  499. assert %{"id" => ^activity_id, "content" => "mew mew 2"} = Jason.decode!(payload)
  500. refute Streamer.filtered_by_user?(sender, edited)
  501. end
  502. end
  503. describe "thread_containment/2" do
  504. test "it filters to user if recipients invalid and thread containment is enabled" do
  505. clear_config([:instance, :skip_thread_containment], false)
  506. author = insert(:user)
  507. %{user: user, token: oauth_token} = oauth_access(["read"])
  508. User.follow(user, author, :follow_accept)
  509. activity =
  510. insert(:note_activity,
  511. note:
  512. insert(:note,
  513. user: author,
  514. data: %{"to" => ["TEST-FFF"]}
  515. )
  516. )
  517. Streamer.get_topic_and_add_socket("public", user, oauth_token)
  518. Streamer.stream("public", activity)
  519. assert_receive {:render_with_user, _, _, ^activity, _}
  520. assert Streamer.filtered_by_user?(user, activity)
  521. end
  522. test "it sends message if recipients invalid and thread containment is disabled" do
  523. clear_config([:instance, :skip_thread_containment], true)
  524. author = insert(:user)
  525. %{user: user, token: oauth_token} = oauth_access(["read"])
  526. User.follow(user, author, :follow_accept)
  527. activity =
  528. insert(:note_activity,
  529. note:
  530. insert(:note,
  531. user: author,
  532. data: %{"to" => ["TEST-FFF"]}
  533. )
  534. )
  535. Streamer.get_topic_and_add_socket("public", user, oauth_token)
  536. Streamer.stream("public", activity)
  537. assert_receive {:render_with_user, _, _, ^activity, _}
  538. refute Streamer.filtered_by_user?(user, activity)
  539. end
  540. test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
  541. clear_config([:instance, :skip_thread_containment], false)
  542. author = insert(:user)
  543. user = insert(:user, skip_thread_containment: true)
  544. %{token: oauth_token} = oauth_access(["read"], user: user)
  545. User.follow(user, author, :follow_accept)
  546. activity =
  547. insert(:note_activity,
  548. note:
  549. insert(:note,
  550. user: author,
  551. data: %{"to" => ["TEST-FFF"]}
  552. )
  553. )
  554. Streamer.get_topic_and_add_socket("public", user, oauth_token)
  555. Streamer.stream("public", activity)
  556. assert_receive {:render_with_user, _, _, ^activity, _}
  557. refute Streamer.filtered_by_user?(user, activity)
  558. end
  559. end
  560. describe "blocks" do
  561. setup do: oauth_access(["read"])
  562. test "it filters messages involving blocked users", %{user: user, token: oauth_token} do
  563. blocked_user = insert(:user)
  564. {:ok, _user_relationship} = User.block(user, blocked_user)
  565. Streamer.get_topic_and_add_socket("public", user, oauth_token)
  566. {:ok, activity} = CommonAPI.post(blocked_user, %{status: "Test"})
  567. assert_receive {:render_with_user, _, _, ^activity, _}
  568. assert Streamer.filtered_by_user?(user, activity)
  569. end
  570. test "it filters messages transitively involving blocked users", %{
  571. user: blocker,
  572. token: blocker_token
  573. } do
  574. blockee = insert(:user)
  575. friend = insert(:user)
  576. Streamer.get_topic_and_add_socket("public", blocker, blocker_token)
  577. {:ok, _user_relationship} = User.block(blocker, blockee)
  578. {:ok, activity_one} = CommonAPI.post(friend, %{status: "hey! @#{blockee.nickname}"})
  579. assert_receive {:render_with_user, _, _, ^activity_one, _}
  580. assert Streamer.filtered_by_user?(blocker, activity_one)
  581. {:ok, activity_two} = CommonAPI.post(blockee, %{status: "hey! @#{friend.nickname}"})
  582. assert_receive {:render_with_user, _, _, ^activity_two, _}
  583. assert Streamer.filtered_by_user?(blocker, activity_two)
  584. {:ok, activity_three} = CommonAPI.post(blockee, %{status: "hey! @#{blocker.nickname}"})
  585. assert_receive {:render_with_user, _, _, ^activity_three, _}
  586. assert Streamer.filtered_by_user?(blocker, activity_three)
  587. end
  588. end
  589. describe "lists" do
  590. setup do: oauth_access(["read"])
  591. test "it doesn't send unwanted DMs to list", %{user: user_a, token: user_a_token} do
  592. user_b = insert(:user)
  593. user_c = insert(:user)
  594. {:ok, user_a, user_b} = User.follow(user_a, user_b)
  595. {:ok, list} = List.create("Test", user_a)
  596. {:ok, list} = List.follow(list, user_b)
  597. Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
  598. {:ok, _activity} =
  599. CommonAPI.post(user_b, %{
  600. status: "@#{user_c.nickname} Test",
  601. visibility: "direct"
  602. })
  603. refute_receive _
  604. end
  605. test "it doesn't send unwanted private posts to list", %{user: user_a, token: user_a_token} do
  606. user_b = insert(:user)
  607. {:ok, list} = List.create("Test", user_a)
  608. {:ok, list} = List.follow(list, user_b)
  609. Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
  610. {:ok, _activity} =
  611. CommonAPI.post(user_b, %{
  612. status: "Test",
  613. visibility: "private"
  614. })
  615. refute_receive _
  616. end
  617. test "it sends wanted private posts to list", %{user: user_a, token: user_a_token} do
  618. user_b = insert(:user)
  619. {:ok, user_a, user_b} = User.follow(user_a, user_b)
  620. {:ok, list} = List.create("Test", user_a)
  621. {:ok, list} = List.follow(list, user_b)
  622. Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
  623. {:ok, activity} =
  624. CommonAPI.post(user_b, %{
  625. status: "Test",
  626. visibility: "private"
  627. })
  628. assert_receive {:render_with_user, _, _, ^activity, _}
  629. refute Streamer.filtered_by_user?(user_a, activity)
  630. end
  631. end
  632. describe "muted reblogs" do
  633. setup do: oauth_access(["read"])
  634. test "it filters muted reblogs", %{user: user1, token: user1_token} do
  635. user2 = insert(:user)
  636. user3 = insert(:user)
  637. CommonAPI.follow(user1, user2)
  638. CommonAPI.hide_reblogs(user1, user2)
  639. {:ok, create_activity} = CommonAPI.post(user3, %{status: "I'm kawen"})
  640. Streamer.get_topic_and_add_socket("user", user1, user1_token)
  641. {:ok, announce_activity} = CommonAPI.repeat(create_activity.id, user2)
  642. assert_receive {:render_with_user, _, _, ^announce_activity, _}
  643. assert Streamer.filtered_by_user?(user1, announce_activity)
  644. end
  645. test "it filters reblog notification for reblog-muted actors", %{
  646. user: user1,
  647. token: user1_token
  648. } do
  649. user2 = insert(:user)
  650. CommonAPI.follow(user1, user2)
  651. CommonAPI.hide_reblogs(user1, user2)
  652. {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"})
  653. Streamer.get_topic_and_add_socket("user", user1, user1_token)
  654. {:ok, _announce_activity} = CommonAPI.repeat(create_activity.id, user2)
  655. assert_receive {:render_with_user, _, "notification.json", notif, _}
  656. assert Streamer.filtered_by_user?(user1, notif)
  657. end
  658. test "it send non-reblog notification for reblog-muted actors", %{
  659. user: user1,
  660. token: user1_token
  661. } do
  662. user2 = insert(:user)
  663. CommonAPI.follow(user1, user2)
  664. CommonAPI.hide_reblogs(user1, user2)
  665. {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"})
  666. Streamer.get_topic_and_add_socket("user", user1, user1_token)
  667. {:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id)
  668. assert_receive {:render_with_user, _, "notification.json", notif, _}
  669. refute Streamer.filtered_by_user?(user1, notif)
  670. end
  671. end
  672. describe "muted threads" do
  673. test "it filters posts from muted threads" do
  674. user = insert(:user)
  675. %{user: user2, token: user2_token} = oauth_access(["read"])
  676. Streamer.get_topic_and_add_socket("user", user2, user2_token)
  677. {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
  678. {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
  679. {:ok, _} = CommonAPI.add_mute(user2, activity)
  680. assert_receive {:render_with_user, _, _, ^activity, _}
  681. assert Streamer.filtered_by_user?(user2, activity)
  682. end
  683. end
  684. describe "direct streams" do
  685. setup do: oauth_access(["read"])
  686. test "it sends conversation update to the 'direct' stream", %{user: user, token: oauth_token} do
  687. another_user = insert(:user)
  688. Streamer.get_topic_and_add_socket("direct", user, oauth_token)
  689. {:ok, _create_activity} =
  690. CommonAPI.post(another_user, %{
  691. status: "hey @#{user.nickname}",
  692. visibility: "direct"
  693. })
  694. assert_receive {:text, received_event}
  695. assert %{"event" => "conversation", "payload" => received_payload} =
  696. Jason.decode!(received_event)
  697. assert %{"last_status" => last_status} = Jason.decode!(received_payload)
  698. [participation] = Participation.for_user(user)
  699. assert last_status["pleroma"]["direct_conversation_id"] == participation.id
  700. end
  701. test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted",
  702. %{user: user, token: oauth_token} do
  703. another_user = insert(:user)
  704. Streamer.get_topic_and_add_socket("direct", user, oauth_token)
  705. {:ok, create_activity} =
  706. CommonAPI.post(another_user, %{
  707. status: "hi @#{user.nickname}",
  708. visibility: "direct"
  709. })
  710. create_activity_id = create_activity.id
  711. assert_receive {:render_with_user, _, _, ^create_activity, _}
  712. assert_receive {:text, received_conversation1}
  713. assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
  714. {:ok, _} = CommonAPI.delete(create_activity_id, another_user)
  715. assert_receive {:text, received_event}
  716. assert %{"event" => "delete", "payload" => ^create_activity_id} =
  717. Jason.decode!(received_event)
  718. refute_receive _
  719. end
  720. @tag :erratic
  721. test "it sends conversation update to the 'direct' stream when a message is deleted", %{
  722. user: user,
  723. token: oauth_token
  724. } do
  725. another_user = insert(:user)
  726. Streamer.get_topic_and_add_socket("direct", user, oauth_token)
  727. {:ok, create_activity} =
  728. CommonAPI.post(another_user, %{
  729. status: "hi @#{user.nickname}",
  730. visibility: "direct"
  731. })
  732. {:ok, create_activity2} =
  733. CommonAPI.post(another_user, %{
  734. status: "hi @#{user.nickname} 2",
  735. in_reply_to_status_id: create_activity.id,
  736. visibility: "direct"
  737. })
  738. assert_receive {:render_with_user, _, _, ^create_activity, _}
  739. assert_receive {:render_with_user, _, _, ^create_activity2, _}
  740. assert_receive {:text, received_conversation1}
  741. assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
  742. assert_receive {:text, received_conversation1}
  743. assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
  744. {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
  745. assert_receive {:text, received_event}
  746. assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
  747. assert_receive {:text, received_event}
  748. assert %{"event" => "conversation", "payload" => received_payload} =
  749. Jason.decode!(received_event)
  750. assert %{"last_status" => last_status} = Jason.decode!(received_payload)
  751. assert last_status["id"] == to_string(create_activity.id)
  752. end
  753. end
  754. describe "stop streaming if token got revoked" do
  755. setup do
  756. child_proc = fn start, finalize ->
  757. fn ->
  758. start.()
  759. receive do
  760. {StreamerTest, :ready} ->
  761. assert_receive {:render_with_user, _, "update.json", _, _}
  762. receive do
  763. {StreamerTest, :revoked} -> finalize.()
  764. end
  765. end
  766. end
  767. end
  768. starter = fn user, token ->
  769. fn -> Streamer.get_topic_and_add_socket("user", user, token) end
  770. end
  771. hit = fn -> assert_receive :close end
  772. miss = fn -> refute_receive :close end
  773. send_all = fn tasks, thing -> Enum.each(tasks, &send(&1.pid, thing)) end
  774. %{
  775. child_proc: child_proc,
  776. starter: starter,
  777. hit: hit,
  778. miss: miss,
  779. send_all: send_all
  780. }
  781. end
  782. test "do not revoke other tokens", %{
  783. child_proc: child_proc,
  784. starter: starter,
  785. hit: hit,
  786. miss: miss,
  787. send_all: send_all
  788. } do
  789. %{user: user, token: token} = oauth_access(["read"])
  790. %{token: token2} = oauth_access(["read"], user: user)
  791. %{user: user2, token: user2_token} = oauth_access(["read"])
  792. post_user = insert(:user)
  793. CommonAPI.follow(user, post_user)
  794. CommonAPI.follow(user2, post_user)
  795. tasks = [
  796. Task.async(child_proc.(starter.(user, token), hit)),
  797. Task.async(child_proc.(starter.(user, token2), miss)),
  798. Task.async(child_proc.(starter.(user2, user2_token), miss))
  799. ]
  800. {:ok, _} =
  801. CommonAPI.post(post_user, %{
  802. status: "hi"
  803. })
  804. send_all.(tasks, {StreamerTest, :ready})
  805. Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token)
  806. send_all.(tasks, {StreamerTest, :revoked})
  807. Enum.each(tasks, &Task.await/1)
  808. end
  809. test "revoke all streams for this token", %{
  810. child_proc: child_proc,
  811. starter: starter,
  812. hit: hit,
  813. send_all: send_all
  814. } do
  815. %{user: user, token: token} = oauth_access(["read"])
  816. post_user = insert(:user)
  817. CommonAPI.follow(user, post_user)
  818. tasks = [
  819. Task.async(child_proc.(starter.(user, token), hit)),
  820. Task.async(child_proc.(starter.(user, token), hit))
  821. ]
  822. {:ok, _} =
  823. CommonAPI.post(post_user, %{
  824. status: "hi"
  825. })
  826. send_all.(tasks, {StreamerTest, :ready})
  827. Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token)
  828. send_all.(tasks, {StreamerTest, :revoked})
  829. Enum.each(tasks, &Task.await/1)
  830. end
  831. end
  832. end