ping_test.exs (851B)
1 # Pleroma: A lightweight social networking server 2 # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> 3 # SPDX-License-Identifier: AGPL-3.0-only 4 5 defmodule Pleroma.Web.PingTest do 6 use Pleroma.DataCase 7 8 import Pleroma.Factory 9 alias Pleroma.Web.Streamer 10 11 setup do 12 start_supervised({Streamer.supervisor(), [ping_interval: 30]}) 13 14 :ok 15 end 16 17 describe "sockets" do 18 setup do 19 user = insert(:user) 20 {:ok, %{user: user}} 21 end 22 23 test "it sends pings", %{user: user} do 24 task = 25 Task.async(fn -> 26 assert_receive {:text, received_event}, 40 27 assert_receive {:text, received_event}, 40 28 assert_receive {:text, received_event}, 40 29 end) 30 31 Streamer.add_socket("public", %{transport_pid: task.pid, assigns: %{user: user}}) 32 33 Task.await(task) 34 end 35 end 36 end