logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://hacktivis.me/git/pleroma.git

mastodon_websocket_test.exs (15345B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Integration.MastodonWebsocketTest do
  5. # Needs a streamer, needs to stay synchronous
  6. use Pleroma.DataCase
  7. import ExUnit.CaptureLog
  8. import Pleroma.Factory
  9. alias Pleroma.Integration.WebsocketClient
  10. alias Pleroma.Web.CommonAPI
  11. alias Pleroma.Web.OAuth
  12. @moduletag needs_streamer: true, capture_log: true
  13. @path Pleroma.Web.Endpoint.url()
  14. |> URI.parse()
  15. |> Map.put(:scheme, "ws")
  16. |> Map.put(:path, "/api/v1/streaming")
  17. |> URI.to_string()
  18. def start_socket(qs \\ nil, headers \\ []) do
  19. path =
  20. case qs do
  21. nil -> @path
  22. qs -> @path <> qs
  23. end
  24. WebsocketClient.start_link(self(), path, headers)
  25. end
  26. defp decode_json(json) do
  27. with {:ok, %{"event" => event, "payload" => payload_text}} <- Jason.decode(json),
  28. {:ok, payload} <- Jason.decode(payload_text) do
  29. {:ok, %{"event" => event, "payload" => payload}}
  30. end
  31. end
  32. # Turns atom keys to strings
  33. defp atom_key_to_string(json) do
  34. json
  35. |> Jason.encode!()
  36. |> Jason.decode!()
  37. end
  38. test "refuses invalid requests" do
  39. capture_log(fn ->
  40. assert {:error, %WebSockex.RequestError{code: 404}} = start_socket("?stream=ncjdk")
  41. Process.sleep(30)
  42. end)
  43. end
  44. test "requires authentication and a valid token for protected streams" do
  45. capture_log(fn ->
  46. assert {:error, %WebSockex.RequestError{code: 401}} =
  47. start_socket("?stream=user&access_token=aaaaaaaaaaaa")
  48. assert {:error, %WebSockex.RequestError{code: 401}} = start_socket("?stream=user")
  49. Process.sleep(30)
  50. end)
  51. end
  52. test "allows unified stream" do
  53. assert {:ok, _} = start_socket()
  54. end
  55. test "allows public streams without authentication" do
  56. assert {:ok, _} = start_socket("?stream=public")
  57. assert {:ok, _} = start_socket("?stream=public:local")
  58. assert {:ok, _} = start_socket("?stream=public:remote&instance=lain.com")
  59. assert {:ok, _} = start_socket("?stream=hashtag&tag=lain")
  60. end
  61. test "receives well formatted events" do
  62. user = insert(:user)
  63. {:ok, _} = start_socket("?stream=public")
  64. {:ok, activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
  65. assert_receive {:text, raw_json}, 1_000
  66. assert {:ok, json} = Jason.decode(raw_json)
  67. assert "update" == json["event"]
  68. assert json["payload"]
  69. assert {:ok, json} = Jason.decode(json["payload"])
  70. view_json =
  71. Pleroma.Web.MastodonAPI.StatusView.render("show.json", activity: activity, for: nil)
  72. |> atom_key_to_string()
  73. assert json == view_json
  74. end
  75. describe "subscribing via WebSocket" do
  76. test "can subscribe" do
  77. user = insert(:user)
  78. {:ok, pid} = start_socket()
  79. WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
  80. assert_receive {:text, raw_json}, 1_000
  81. assert {:ok,
  82. %{
  83. "event" => "pleroma:respond",
  84. "payload" => %{"type" => "subscribe", "result" => "success"}
  85. }} = decode_json(raw_json)
  86. {:ok, activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
  87. assert_receive {:text, raw_json}, 1_000
  88. assert {:ok, json} = Jason.decode(raw_json)
  89. assert "update" == json["event"]
  90. assert json["payload"]
  91. assert {:ok, json} = Jason.decode(json["payload"])
  92. view_json =
  93. Pleroma.Web.MastodonAPI.StatusView.render("show.json", activity: activity, for: nil)
  94. |> Jason.encode!()
  95. |> Jason.decode!()
  96. assert json == view_json
  97. end
  98. test "can subscribe to multiple streams" do
  99. user = insert(:user)
  100. {:ok, pid} = start_socket()
  101. WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
  102. assert_receive {:text, raw_json}, 1_000
  103. assert {:ok,
  104. %{
  105. "event" => "pleroma:respond",
  106. "payload" => %{"type" => "subscribe", "result" => "success"}
  107. }} = decode_json(raw_json)
  108. WebsocketClient.send_text(
  109. pid,
  110. %{type: "subscribe", stream: "hashtag", tag: "mew"} |> Jason.encode!()
  111. )
  112. assert_receive {:text, raw_json}, 1_000
  113. assert {:ok,
  114. %{
  115. "event" => "pleroma:respond",
  116. "payload" => %{"type" => "subscribe", "result" => "success"}
  117. }} = decode_json(raw_json)
  118. {:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber #mew"})
  119. assert_receive {:text, raw_json}, 1_000
  120. assert {:ok, %{"stream" => stream1}} = Jason.decode(raw_json)
  121. assert_receive {:text, raw_json}, 1_000
  122. assert {:ok, %{"stream" => stream2}} = Jason.decode(raw_json)
  123. streams = [stream1, stream2]
  124. assert ["hashtag", "mew"] in streams
  125. assert ["public"] in streams
  126. end
  127. test "won't double subscribe" do
  128. user = insert(:user)
  129. {:ok, pid} = start_socket()
  130. WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
  131. assert_receive {:text, raw_json}, 1_000
  132. assert {:ok,
  133. %{
  134. "event" => "pleroma:respond",
  135. "payload" => %{"type" => "subscribe", "result" => "success"}
  136. }} = decode_json(raw_json)
  137. WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
  138. assert_receive {:text, raw_json}, 1_000
  139. assert {:ok,
  140. %{
  141. "event" => "pleroma:respond",
  142. "payload" => %{"type" => "subscribe", "result" => "ignored"}
  143. }} = decode_json(raw_json)
  144. {:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
  145. assert_receive {:text, _}, 1_000
  146. refute_receive {:text, _}, 1_000
  147. end
  148. test "rejects invalid streams" do
  149. {:ok, pid} = start_socket()
  150. WebsocketClient.send_text(pid, %{type: "subscribe", stream: "nonsense"} |> Jason.encode!())
  151. assert_receive {:text, raw_json}, 1_000
  152. assert {:ok,
  153. %{
  154. "event" => "pleroma:respond",
  155. "payload" => %{"type" => "subscribe", "result" => "error", "error" => "bad_topic"}
  156. }} = decode_json(raw_json)
  157. end
  158. test "can unsubscribe" do
  159. user = insert(:user)
  160. {:ok, pid} = start_socket()
  161. WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
  162. assert_receive {:text, raw_json}, 1_000
  163. assert {:ok,
  164. %{
  165. "event" => "pleroma:respond",
  166. "payload" => %{"type" => "subscribe", "result" => "success"}
  167. }} = decode_json(raw_json)
  168. WebsocketClient.send_text(pid, %{type: "unsubscribe", stream: "public"} |> Jason.encode!())
  169. assert_receive {:text, raw_json}, 1_000
  170. assert {:ok,
  171. %{
  172. "event" => "pleroma:respond",
  173. "payload" => %{"type" => "unsubscribe", "result" => "success"}
  174. }} = decode_json(raw_json)
  175. {:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
  176. refute_receive {:text, _}, 1_000
  177. end
  178. end
  179. describe "with a valid user token" do
  180. setup do
  181. {:ok, app} =
  182. Pleroma.Repo.insert(
  183. OAuth.App.register_changeset(%OAuth.App{}, %{
  184. client_name: "client",
  185. scopes: ["read"],
  186. redirect_uris: "url"
  187. })
  188. )
  189. user = insert(:user)
  190. {:ok, auth} = OAuth.Authorization.create_authorization(app, user)
  191. {:ok, token} = OAuth.Token.exchange_token(app, auth)
  192. %{app: app, user: user, token: token}
  193. end
  194. test "accepts valid tokens", state do
  195. assert {:ok, _} = start_socket("?stream=user&access_token=#{state.token.token}")
  196. end
  197. test "accepts the 'user' stream", %{token: token} = _state do
  198. assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
  199. capture_log(fn ->
  200. assert {:error, %WebSockex.RequestError{code: 401}} = start_socket("?stream=user")
  201. Process.sleep(30)
  202. end)
  203. end
  204. test "accepts the 'user:notification' stream", %{token: token} = _state do
  205. assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
  206. capture_log(fn ->
  207. assert {:error, %WebSockex.RequestError{code: 401}} =
  208. start_socket("?stream=user:notification")
  209. Process.sleep(30)
  210. end)
  211. end
  212. test "accepts valid token on client-sent event", %{token: token} do
  213. assert {:ok, pid} = start_socket()
  214. WebsocketClient.send_text(
  215. pid,
  216. %{type: "pleroma:authenticate", token: token.token} |> Jason.encode!()
  217. )
  218. assert_receive {:text, raw_json}, 1_000
  219. assert {:ok,
  220. %{
  221. "event" => "pleroma:respond",
  222. "payload" => %{"type" => "pleroma:authenticate", "result" => "success"}
  223. }} = decode_json(raw_json)
  224. WebsocketClient.send_text(pid, %{type: "subscribe", stream: "user"} |> Jason.encode!())
  225. assert_receive {:text, raw_json}, 1_000
  226. assert {:ok,
  227. %{
  228. "event" => "pleroma:respond",
  229. "payload" => %{"type" => "subscribe", "result" => "success"}
  230. }} = decode_json(raw_json)
  231. end
  232. test "rejects invalid token on client-sent event" do
  233. assert {:ok, pid} = start_socket()
  234. WebsocketClient.send_text(
  235. pid,
  236. %{type: "pleroma:authenticate", token: "Something else"} |> Jason.encode!()
  237. )
  238. assert_receive {:text, raw_json}, 1_000
  239. assert {:ok,
  240. %{
  241. "event" => "pleroma:respond",
  242. "payload" => %{
  243. "type" => "pleroma:authenticate",
  244. "result" => "error",
  245. "error" => "unauthorized"
  246. }
  247. }} = decode_json(raw_json)
  248. end
  249. test "rejects new authenticate request if already logged-in", %{token: token} do
  250. assert {:ok, pid} = start_socket()
  251. WebsocketClient.send_text(
  252. pid,
  253. %{type: "pleroma:authenticate", token: token.token} |> Jason.encode!()
  254. )
  255. assert_receive {:text, raw_json}, 1_000
  256. assert {:ok,
  257. %{
  258. "event" => "pleroma:respond",
  259. "payload" => %{"type" => "pleroma:authenticate", "result" => "success"}
  260. }} = decode_json(raw_json)
  261. WebsocketClient.send_text(
  262. pid,
  263. %{type: "pleroma:authenticate", token: "Something else"} |> Jason.encode!()
  264. )
  265. assert_receive {:text, raw_json}, 1_000
  266. assert {:ok,
  267. %{
  268. "event" => "pleroma:respond",
  269. "payload" => %{
  270. "type" => "pleroma:authenticate",
  271. "result" => "error",
  272. "error" => "already_authenticated"
  273. }
  274. }} = decode_json(raw_json)
  275. end
  276. test "accepts the 'list' stream", %{token: token, user: user} do
  277. posting_user = insert(:user)
  278. {:ok, list} = Pleroma.List.create("test", user)
  279. Pleroma.List.follow(list, posting_user)
  280. assert {:ok, _} = start_socket("?stream=list&access_token=#{token.token}&list=#{list.id}")
  281. assert {:ok, pid} = start_socket("?access_token=#{token.token}")
  282. WebsocketClient.send_text(
  283. pid,
  284. %{type: "subscribe", stream: "list", list: list.id} |> Jason.encode!()
  285. )
  286. assert_receive {:text, raw_json}, 1_000
  287. assert {:ok,
  288. %{
  289. "event" => "pleroma:respond",
  290. "payload" => %{"type" => "subscribe", "result" => "success"}
  291. }} = decode_json(raw_json)
  292. WebsocketClient.send_text(
  293. pid,
  294. %{type: "subscribe", stream: "list", list: to_string(list.id)} |> Jason.encode!()
  295. )
  296. assert_receive {:text, raw_json}, 1_000
  297. assert {:ok,
  298. %{
  299. "event" => "pleroma:respond",
  300. "payload" => %{"type" => "subscribe", "result" => "ignored"}
  301. }} = decode_json(raw_json)
  302. end
  303. test "disconnect when token is revoked", %{app: app, user: user, token: token} do
  304. assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
  305. assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
  306. {:ok, auth} = OAuth.Authorization.create_authorization(app, user)
  307. {:ok, token2} = OAuth.Token.exchange_token(app, auth)
  308. assert {:ok, _} = start_socket("?stream=user&access_token=#{token2.token}")
  309. OAuth.Token.Strategy.Revoke.revoke(token)
  310. assert_receive {:close, _}
  311. assert_receive {:close, _}
  312. refute_receive {:close, _}
  313. end
  314. test "receives private statuses", %{user: reading_user, token: token} do
  315. user = insert(:user)
  316. CommonAPI.follow(reading_user, user)
  317. {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
  318. {:ok, activity} =
  319. CommonAPI.post(user, %{status: "nice echo chamber", visibility: "private"})
  320. assert_receive {:text, raw_json}, 1_000
  321. assert {:ok, json} = Jason.decode(raw_json)
  322. assert "update" == json["event"]
  323. assert json["payload"]
  324. assert {:ok, json} = Jason.decode(json["payload"])
  325. view_json =
  326. Pleroma.Web.MastodonAPI.StatusView.render("show.json",
  327. activity: activity,
  328. for: reading_user
  329. )
  330. |> Jason.encode!()
  331. |> Jason.decode!()
  332. assert json == view_json
  333. end
  334. test "receives edits", %{user: reading_user, token: token} do
  335. user = insert(:user)
  336. CommonAPI.follow(reading_user, user)
  337. {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
  338. {:ok, activity} =
  339. CommonAPI.post(user, %{status: "nice echo chamber", visibility: "private"})
  340. assert_receive {:text, _raw_json}, 1_000
  341. {:ok, _} = CommonAPI.update(user, activity, %{status: "mew mew", visibility: "private"})
  342. assert_receive {:text, raw_json}, 1_000
  343. activity = Pleroma.Activity.normalize(activity)
  344. view_json =
  345. Pleroma.Web.MastodonAPI.StatusView.render("show.json",
  346. activity: activity,
  347. for: reading_user
  348. )
  349. |> Jason.encode!()
  350. |> Jason.decode!()
  351. assert {:ok, %{"event" => "status.update", "payload" => ^view_json}} = decode_json(raw_json)
  352. end
  353. test "receives notifications", %{user: reading_user, token: token} do
  354. user = insert(:user)
  355. CommonAPI.follow(reading_user, user)
  356. {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
  357. {:ok, %Pleroma.Activity{id: activity_id} = _activity} =
  358. CommonAPI.post(user, %{
  359. status: "nice echo chamber @#{reading_user.nickname}",
  360. visibility: "private"
  361. })
  362. assert_receive {:text, raw_json}, 1_000
  363. assert {:ok,
  364. %{
  365. "event" => "notification",
  366. "payload" => %{
  367. "status" => %{
  368. "id" => ^activity_id
  369. }
  370. }
  371. }} = decode_json(raw_json)
  372. end
  373. end
  374. end