logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://anongit.hacktivis.me/git/pleroma.git/

mastodon_websocket_test.exs (15752B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Integration.MastodonWebsocketTest do
  5. # Needs a streamer, needs to stay synchronous
  6. use Pleroma.DataCase
  7. import ExUnit.CaptureLog
  8. import Pleroma.Factory
  9. alias Pleroma.Integration.WebsocketClient
  10. alias Pleroma.Web.CommonAPI
  11. alias Pleroma.Web.OAuth
  12. @moduletag needs_streamer: true, capture_log: true
  13. @path Pleroma.Web.Endpoint.url()
  14. |> URI.parse()
  15. |> Map.put(:scheme, "ws")
  16. |> Map.put(:path, "/api/v1/streaming")
  17. |> URI.to_string()
  18. def start_socket(qs \\ nil, headers \\ []) do
  19. path =
  20. case qs do
  21. nil -> @path
  22. qs -> @path <> qs
  23. end
  24. WebsocketClient.start_link(self(), path, headers)
  25. end
  26. defp decode_json(json) do
  27. with {:ok, %{"event" => event, "payload" => payload_text}} <- Jason.decode(json),
  28. {:ok, payload} <- Jason.decode(payload_text) do
  29. {:ok, %{"event" => event, "payload" => payload}}
  30. end
  31. end
  32. # Turns atom keys to strings
  33. defp atom_key_to_string(json) do
  34. json
  35. |> Jason.encode!()
  36. |> Jason.decode!()
  37. end
  38. test "refuses invalid requests" do
  39. capture_log(fn ->
  40. assert {:error, %WebSockex.RequestError{code: 404}} = start_socket("?stream=ncjdk")
  41. Process.sleep(30)
  42. end)
  43. end
  44. test "requires authentication and a valid token for protected streams" do
  45. capture_log(fn ->
  46. assert {:error, %WebSockex.RequestError{code: 401}} =
  47. start_socket("?stream=user&access_token=aaaaaaaaaaaa")
  48. assert {:error, %WebSockex.RequestError{code: 401}} = start_socket("?stream=user")
  49. Process.sleep(30)
  50. end)
  51. end
  52. test "allows unified stream" do
  53. assert {:ok, _} = start_socket()
  54. end
  55. test "allows public streams without authentication" do
  56. assert {:ok, _} = start_socket("?stream=public")
  57. assert {:ok, _} = start_socket("?stream=public:local")
  58. assert {:ok, _} = start_socket("?stream=public:remote&instance=lain.com")
  59. assert {:ok, _} = start_socket("?stream=hashtag&tag=lain")
  60. end
  61. test "receives well formatted events" do
  62. user = insert(:user)
  63. {:ok, _} = start_socket("?stream=public")
  64. {:ok, activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
  65. assert_receive {:text, raw_json}, 1_000
  66. assert {:ok, json} = Jason.decode(raw_json)
  67. assert "update" == json["event"]
  68. assert json["payload"]
  69. assert {:ok, json} = Jason.decode(json["payload"])
  70. view_json =
  71. Pleroma.Web.MastodonAPI.StatusView.render("show.json", activity: activity, for: nil)
  72. |> atom_key_to_string()
  73. assert json == view_json
  74. end
  75. describe "subscribing via WebSocket" do
  76. test "can subscribe" do
  77. user = insert(:user)
  78. {:ok, pid} = start_socket()
  79. WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
  80. assert_receive {:text, raw_json}, 1_000
  81. assert {:ok,
  82. %{
  83. "event" => "pleroma:respond",
  84. "payload" => %{"type" => "subscribe", "result" => "success"}
  85. }} = decode_json(raw_json)
  86. {:ok, activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
  87. assert_receive {:text, raw_json}, 1_000
  88. assert {:ok, json} = Jason.decode(raw_json)
  89. assert "update" == json["event"]
  90. assert json["payload"]
  91. assert {:ok, json} = Jason.decode(json["payload"])
  92. view_json =
  93. Pleroma.Web.MastodonAPI.StatusView.render("show.json", activity: activity, for: nil)
  94. |> Jason.encode!()
  95. |> Jason.decode!()
  96. assert json == view_json
  97. end
  98. test "can subscribe to multiple streams" do
  99. user = insert(:user)
  100. {:ok, pid} = start_socket()
  101. WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
  102. assert_receive {:text, raw_json}, 1_000
  103. assert {:ok,
  104. %{
  105. "event" => "pleroma:respond",
  106. "payload" => %{"type" => "subscribe", "result" => "success"}
  107. }} = decode_json(raw_json)
  108. WebsocketClient.send_text(
  109. pid,
  110. %{type: "subscribe", stream: "hashtag", tag: "mew"} |> Jason.encode!()
  111. )
  112. assert_receive {:text, raw_json}, 1_000
  113. assert {:ok,
  114. %{
  115. "event" => "pleroma:respond",
  116. "payload" => %{"type" => "subscribe", "result" => "success"}
  117. }} = decode_json(raw_json)
  118. {:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber #mew"})
  119. assert_receive {:text, raw_json}, 1_000
  120. assert {:ok, %{"stream" => stream1}} = Jason.decode(raw_json)
  121. assert_receive {:text, raw_json}, 1_000
  122. assert {:ok, %{"stream" => stream2}} = Jason.decode(raw_json)
  123. streams = [stream1, stream2]
  124. assert ["hashtag", "mew"] in streams
  125. assert ["public"] in streams
  126. end
  127. test "won't double subscribe" do
  128. user = insert(:user)
  129. {:ok, pid} = start_socket()
  130. WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
  131. assert_receive {:text, raw_json}, 1_000
  132. assert {:ok,
  133. %{
  134. "event" => "pleroma:respond",
  135. "payload" => %{"type" => "subscribe", "result" => "success"}
  136. }} = decode_json(raw_json)
  137. WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
  138. assert_receive {:text, raw_json}, 1_000
  139. assert {:ok,
  140. %{
  141. "event" => "pleroma:respond",
  142. "payload" => %{"type" => "subscribe", "result" => "ignored"}
  143. }} = decode_json(raw_json)
  144. {:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
  145. assert_receive {:text, _}, 1_000
  146. refute_receive {:text, _}, 1_000
  147. end
  148. test "rejects invalid streams" do
  149. {:ok, pid} = start_socket()
  150. WebsocketClient.send_text(pid, %{type: "subscribe", stream: "nonsense"} |> Jason.encode!())
  151. assert_receive {:text, raw_json}, 1_000
  152. assert {:ok,
  153. %{
  154. "event" => "pleroma:respond",
  155. "payload" => %{"type" => "subscribe", "result" => "error", "error" => "bad_topic"}
  156. }} = decode_json(raw_json)
  157. end
  158. test "can unsubscribe" do
  159. user = insert(:user)
  160. {:ok, pid} = start_socket()
  161. WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
  162. assert_receive {:text, raw_json}, 1_000
  163. assert {:ok,
  164. %{
  165. "event" => "pleroma:respond",
  166. "payload" => %{"type" => "subscribe", "result" => "success"}
  167. }} = decode_json(raw_json)
  168. WebsocketClient.send_text(pid, %{type: "unsubscribe", stream: "public"} |> Jason.encode!())
  169. assert_receive {:text, raw_json}, 1_000
  170. assert {:ok,
  171. %{
  172. "event" => "pleroma:respond",
  173. "payload" => %{"type" => "unsubscribe", "result" => "success"}
  174. }} = decode_json(raw_json)
  175. {:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
  176. refute_receive {:text, _}, 1_000
  177. end
  178. end
  179. describe "with a valid user token" do
  180. setup do
  181. {:ok, app} =
  182. Pleroma.Repo.insert(
  183. OAuth.App.register_changeset(%OAuth.App{}, %{
  184. client_name: "client",
  185. scopes: ["read"],
  186. redirect_uris: "url"
  187. })
  188. )
  189. user = insert(:user)
  190. {:ok, auth} = OAuth.Authorization.create_authorization(app, user)
  191. {:ok, token} = OAuth.Token.exchange_token(app, auth)
  192. %{app: app, user: user, token: token}
  193. end
  194. test "accepts valid tokens", state do
  195. assert {:ok, _} = start_socket("?stream=user&access_token=#{state.token.token}")
  196. end
  197. test "accepts the 'user' stream", %{token: token} = _state do
  198. assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
  199. capture_log(fn ->
  200. assert {:error, %WebSockex.RequestError{code: 401}} = start_socket("?stream=user")
  201. Process.sleep(30)
  202. end)
  203. end
  204. test "accepts the 'user:notification' stream", %{token: token} = _state do
  205. assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
  206. capture_log(fn ->
  207. assert {:error, %WebSockex.RequestError{code: 401}} =
  208. start_socket("?stream=user:notification")
  209. Process.sleep(30)
  210. end)
  211. end
  212. test "accepts valid token on Sec-WebSocket-Protocol header", %{token: token} do
  213. assert {:ok, _} = start_socket("?stream=user", [{"Sec-WebSocket-Protocol", token.token}])
  214. capture_log(fn ->
  215. assert {:error, %WebSockex.RequestError{code: 401}} =
  216. start_socket("?stream=user", [{"Sec-WebSocket-Protocol", "I am a friend"}])
  217. Process.sleep(30)
  218. end)
  219. end
  220. test "accepts valid token on client-sent event", %{token: token} do
  221. assert {:ok, pid} = start_socket()
  222. WebsocketClient.send_text(
  223. pid,
  224. %{type: "pleroma:authenticate", token: token.token} |> Jason.encode!()
  225. )
  226. assert_receive {:text, raw_json}, 1_000
  227. assert {:ok,
  228. %{
  229. "event" => "pleroma:respond",
  230. "payload" => %{"type" => "pleroma:authenticate", "result" => "success"}
  231. }} = decode_json(raw_json)
  232. WebsocketClient.send_text(pid, %{type: "subscribe", stream: "user"} |> Jason.encode!())
  233. assert_receive {:text, raw_json}, 1_000
  234. assert {:ok,
  235. %{
  236. "event" => "pleroma:respond",
  237. "payload" => %{"type" => "subscribe", "result" => "success"}
  238. }} = decode_json(raw_json)
  239. end
  240. test "rejects invalid token on client-sent event" do
  241. assert {:ok, pid} = start_socket()
  242. WebsocketClient.send_text(
  243. pid,
  244. %{type: "pleroma:authenticate", token: "Something else"} |> Jason.encode!()
  245. )
  246. assert_receive {:text, raw_json}, 1_000
  247. assert {:ok,
  248. %{
  249. "event" => "pleroma:respond",
  250. "payload" => %{
  251. "type" => "pleroma:authenticate",
  252. "result" => "error",
  253. "error" => "unauthorized"
  254. }
  255. }} = decode_json(raw_json)
  256. end
  257. test "rejects new authenticate request if already logged-in", %{token: token} do
  258. assert {:ok, pid} = start_socket()
  259. WebsocketClient.send_text(
  260. pid,
  261. %{type: "pleroma:authenticate", token: token.token} |> Jason.encode!()
  262. )
  263. assert_receive {:text, raw_json}, 1_000
  264. assert {:ok,
  265. %{
  266. "event" => "pleroma:respond",
  267. "payload" => %{"type" => "pleroma:authenticate", "result" => "success"}
  268. }} = decode_json(raw_json)
  269. WebsocketClient.send_text(
  270. pid,
  271. %{type: "pleroma:authenticate", token: "Something else"} |> Jason.encode!()
  272. )
  273. assert_receive {:text, raw_json}, 1_000
  274. assert {:ok,
  275. %{
  276. "event" => "pleroma:respond",
  277. "payload" => %{
  278. "type" => "pleroma:authenticate",
  279. "result" => "error",
  280. "error" => "already_authenticated"
  281. }
  282. }} = decode_json(raw_json)
  283. end
  284. test "accepts the 'list' stream", %{token: token, user: user} do
  285. posting_user = insert(:user)
  286. {:ok, list} = Pleroma.List.create("test", user)
  287. Pleroma.List.follow(list, posting_user)
  288. assert {:ok, _} = start_socket("?stream=list&access_token=#{token.token}&list=#{list.id}")
  289. assert {:ok, pid} = start_socket("?access_token=#{token.token}")
  290. WebsocketClient.send_text(
  291. pid,
  292. %{type: "subscribe", stream: "list", list: list.id} |> Jason.encode!()
  293. )
  294. assert_receive {:text, raw_json}, 1_000
  295. assert {:ok,
  296. %{
  297. "event" => "pleroma:respond",
  298. "payload" => %{"type" => "subscribe", "result" => "success"}
  299. }} = decode_json(raw_json)
  300. WebsocketClient.send_text(
  301. pid,
  302. %{type: "subscribe", stream: "list", list: to_string(list.id)} |> Jason.encode!()
  303. )
  304. assert_receive {:text, raw_json}, 1_000
  305. assert {:ok,
  306. %{
  307. "event" => "pleroma:respond",
  308. "payload" => %{"type" => "subscribe", "result" => "ignored"}
  309. }} = decode_json(raw_json)
  310. end
  311. test "disconnect when token is revoked", %{app: app, user: user, token: token} do
  312. assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
  313. assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
  314. {:ok, auth} = OAuth.Authorization.create_authorization(app, user)
  315. {:ok, token2} = OAuth.Token.exchange_token(app, auth)
  316. assert {:ok, _} = start_socket("?stream=user&access_token=#{token2.token}")
  317. OAuth.Token.Strategy.Revoke.revoke(token)
  318. assert_receive {:close, _}
  319. assert_receive {:close, _}
  320. refute_receive {:close, _}
  321. end
  322. test "receives private statuses", %{user: reading_user, token: token} do
  323. user = insert(:user)
  324. CommonAPI.follow(user, reading_user)
  325. {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
  326. {:ok, activity} =
  327. CommonAPI.post(user, %{status: "nice echo chamber", visibility: "private"})
  328. assert_receive {:text, raw_json}, 1_000
  329. assert {:ok, json} = Jason.decode(raw_json)
  330. assert "update" == json["event"]
  331. assert json["payload"]
  332. assert {:ok, json} = Jason.decode(json["payload"])
  333. view_json =
  334. Pleroma.Web.MastodonAPI.StatusView.render("show.json",
  335. activity: activity,
  336. for: reading_user
  337. )
  338. |> Jason.encode!()
  339. |> Jason.decode!()
  340. assert json == view_json
  341. end
  342. test "receives edits", %{user: reading_user, token: token} do
  343. user = insert(:user)
  344. CommonAPI.follow(user, reading_user)
  345. {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
  346. {:ok, activity} =
  347. CommonAPI.post(user, %{status: "nice echo chamber", visibility: "private"})
  348. assert_receive {:text, _raw_json}, 1_000
  349. {:ok, _} = CommonAPI.update(activity, user, %{status: "mew mew", visibility: "private"})
  350. assert_receive {:text, raw_json}, 1_000
  351. activity = Pleroma.Activity.normalize(activity)
  352. view_json =
  353. Pleroma.Web.MastodonAPI.StatusView.render("show.json",
  354. activity: activity,
  355. for: reading_user
  356. )
  357. |> Jason.encode!()
  358. |> Jason.decode!()
  359. assert {:ok, %{"event" => "status.update", "payload" => ^view_json}} = decode_json(raw_json)
  360. end
  361. test "receives notifications", %{user: reading_user, token: token} do
  362. user = insert(:user)
  363. CommonAPI.follow(user, reading_user)
  364. {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
  365. {:ok, %Pleroma.Activity{id: activity_id} = _activity} =
  366. CommonAPI.post(user, %{
  367. status: "nice echo chamber @#{reading_user.nickname}",
  368. visibility: "private"
  369. })
  370. assert_receive {:text, raw_json}, 1_000
  371. assert {:ok,
  372. %{
  373. "event" => "notification",
  374. "payload" => %{
  375. "status" => %{
  376. "id" => ^activity_id
  377. }
  378. }
  379. }} = decode_json(raw_json)
  380. end
  381. end
  382. end