logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma

streamer_test.exs (13706B)


      1 # Pleroma: A lightweight social networking server
      2 # Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
      3 # SPDX-License-Identifier: AGPL-3.0-only
      4 
      5 defmodule Pleroma.Web.StreamerTest do
      6   use Pleroma.DataCase
      7 
      8   alias Pleroma.List
      9   alias Pleroma.User
     10   alias Pleroma.Web.CommonAPI
     11   alias Pleroma.Web.Streamer
     12   import Pleroma.Factory
     13 
     14   clear_config_all([:instance, :skip_thread_containment])
     15 
     16   describe "user streams" do
     17     setup do
     18       GenServer.start(Streamer, %{}, name: Streamer)
     19 
     20       on_exit(fn ->
     21         if pid = Process.whereis(Streamer) do
     22           Process.exit(pid, :kill)
     23         end
     24       end)
     25 
     26       user = insert(:user)
     27       notify = insert(:notification, user: user, activity: build(:note_activity))
     28       {:ok, %{user: user, notify: notify}}
     29     end
     30 
     31     test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
     32       task =
     33         Task.async(fn ->
     34           assert_receive {:text, _}, 4_000
     35         end)
     36 
     37       Streamer.add_socket(
     38         "user",
     39         %{transport_pid: task.pid, assigns: %{user: user}}
     40       )
     41 
     42       Streamer.stream("user", notify)
     43       Task.await(task)
     44     end
     45 
     46     test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do
     47       task =
     48         Task.async(fn ->
     49           assert_receive {:text, _}, 4_000
     50         end)
     51 
     52       Streamer.add_socket(
     53         "user:notification",
     54         %{transport_pid: task.pid, assigns: %{user: user}}
     55       )
     56 
     57       Streamer.stream("user:notification", notify)
     58       Task.await(task)
     59     end
     60 
     61     test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
     62       user: user
     63     } do
     64       blocked = insert(:user)
     65       {:ok, user} = User.block(user, blocked)
     66 
     67       task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
     68 
     69       Streamer.add_socket(
     70         "user:notification",
     71         %{transport_pid: task.pid, assigns: %{user: user}}
     72       )
     73 
     74       {:ok, activity} = CommonAPI.post(user, %{"status" => ":("})
     75       {:ok, notif, _} = CommonAPI.favorite(activity.id, blocked)
     76 
     77       Streamer.stream("user:notification", notif)
     78       Task.await(task)
     79     end
     80 
     81     test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
     82       user: user
     83     } do
     84       user2 = insert(:user)
     85       task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
     86 
     87       Streamer.add_socket(
     88         "user:notification",
     89         %{transport_pid: task.pid, assigns: %{user: user}}
     90       )
     91 
     92       {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
     93       {:ok, activity} = CommonAPI.add_mute(user, activity)
     94       {:ok, notif, _} = CommonAPI.favorite(activity.id, user2)
     95       Streamer.stream("user:notification", notif)
     96       Task.await(task)
     97     end
     98 
     99     test "it doesn't send notify to the 'user:notification' stream' when a domain is blocked", %{
    100       user: user
    101     } do
    102       user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
    103       task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
    104 
    105       Streamer.add_socket(
    106         "user:notification",
    107         %{transport_pid: task.pid, assigns: %{user: user}}
    108       )
    109 
    110       {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
    111       {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
    112       {:ok, notif, _} = CommonAPI.favorite(activity.id, user2)
    113 
    114       Streamer.stream("user:notification", notif)
    115       Task.await(task)
    116     end
    117   end
    118 
    119   test "it sends to public" do
    120     user = insert(:user)
    121     other_user = insert(:user)
    122 
    123     task =
    124       Task.async(fn ->
    125         assert_receive {:text, _}, 4_000
    126       end)
    127 
    128     fake_socket = %{
    129       transport_pid: task.pid,
    130       assigns: %{
    131         user: user
    132       }
    133     }
    134 
    135     {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
    136 
    137     topics = %{
    138       "public" => [fake_socket]
    139     }
    140 
    141     Streamer.push_to_socket(topics, "public", activity)
    142 
    143     Task.await(task)
    144 
    145     task =
    146       Task.async(fn ->
    147         expected_event =
    148           %{
    149             "event" => "delete",
    150             "payload" => activity.id
    151           }
    152           |> Jason.encode!()
    153 
    154         assert_receive {:text, received_event}, 4_000
    155         assert received_event == expected_event
    156       end)
    157 
    158     fake_socket = %{
    159       transport_pid: task.pid,
    160       assigns: %{
    161         user: user
    162       }
    163     }
    164 
    165     {:ok, activity} = CommonAPI.delete(activity.id, other_user)
    166 
    167     topics = %{
    168       "public" => [fake_socket]
    169     }
    170 
    171     Streamer.push_to_socket(topics, "public", activity)
    172 
    173     Task.await(task)
    174   end
    175 
    176   describe "thread_containment" do
    177     test "it doesn't send to user if recipients invalid and thread containment is enabled" do
    178       Pleroma.Config.put([:instance, :skip_thread_containment], false)
    179       author = insert(:user)
    180       user = insert(:user, following: [author.ap_id])
    181 
    182       activity =
    183         insert(:note_activity,
    184           note:
    185             insert(:note,
    186               user: author,
    187               data: %{"to" => ["TEST-FFF"]}
    188             )
    189         )
    190 
    191       task = Task.async(fn -> refute_receive {:text, _}, 1_000 end)
    192       fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
    193       topics = %{"public" => [fake_socket]}
    194       Streamer.push_to_socket(topics, "public", activity)
    195 
    196       Task.await(task)
    197     end
    198 
    199     test "it sends message if recipients invalid and thread containment is disabled" do
    200       Pleroma.Config.put([:instance, :skip_thread_containment], true)
    201       author = insert(:user)
    202       user = insert(:user, following: [author.ap_id])
    203 
    204       activity =
    205         insert(:note_activity,
    206           note:
    207             insert(:note,
    208               user: author,
    209               data: %{"to" => ["TEST-FFF"]}
    210             )
    211         )
    212 
    213       task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
    214       fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
    215       topics = %{"public" => [fake_socket]}
    216       Streamer.push_to_socket(topics, "public", activity)
    217 
    218       Task.await(task)
    219     end
    220 
    221     test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
    222       Pleroma.Config.put([:instance, :skip_thread_containment], false)
    223       author = insert(:user)
    224       user = insert(:user, following: [author.ap_id], info: %{skip_thread_containment: true})
    225 
    226       activity =
    227         insert(:note_activity,
    228           note:
    229             insert(:note,
    230               user: author,
    231               data: %{"to" => ["TEST-FFF"]}
    232             )
    233         )
    234 
    235       task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
    236       fake_socket = %{transport_pid: task.pid, assigns: %{user: user}}
    237       topics = %{"public" => [fake_socket]}
    238       Streamer.push_to_socket(topics, "public", activity)
    239 
    240       Task.await(task)
    241     end
    242   end
    243 
    244   test "it doesn't send to blocked users" do
    245     user = insert(:user)
    246     blocked_user = insert(:user)
    247     {:ok, user} = User.block(user, blocked_user)
    248 
    249     task =
    250       Task.async(fn ->
    251         refute_receive {:text, _}, 1_000
    252       end)
    253 
    254     fake_socket = %{
    255       transport_pid: task.pid,
    256       assigns: %{
    257         user: user
    258       }
    259     }
    260 
    261     {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
    262 
    263     topics = %{
    264       "public" => [fake_socket]
    265     }
    266 
    267     Streamer.push_to_socket(topics, "public", activity)
    268 
    269     Task.await(task)
    270   end
    271 
    272   test "it doesn't send unwanted DMs to list" do
    273     user_a = insert(:user)
    274     user_b = insert(:user)
    275     user_c = insert(:user)
    276 
    277     {:ok, user_a} = User.follow(user_a, user_b)
    278 
    279     {:ok, list} = List.create("Test", user_a)
    280     {:ok, list} = List.follow(list, user_b)
    281 
    282     task =
    283       Task.async(fn ->
    284         refute_receive {:text, _}, 1_000
    285       end)
    286 
    287     fake_socket = %{
    288       transport_pid: task.pid,
    289       assigns: %{
    290         user: user_a
    291       }
    292     }
    293 
    294     {:ok, activity} =
    295       CommonAPI.post(user_b, %{
    296         "status" => "@#{user_c.nickname} Test",
    297         "visibility" => "direct"
    298       })
    299 
    300     topics = %{
    301       "list:#{list.id}" => [fake_socket]
    302     }
    303 
    304     Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
    305 
    306     Task.await(task)
    307   end
    308 
    309   test "it doesn't send unwanted private posts to list" do
    310     user_a = insert(:user)
    311     user_b = insert(:user)
    312 
    313     {:ok, list} = List.create("Test", user_a)
    314     {:ok, list} = List.follow(list, user_b)
    315 
    316     task =
    317       Task.async(fn ->
    318         refute_receive {:text, _}, 1_000
    319       end)
    320 
    321     fake_socket = %{
    322       transport_pid: task.pid,
    323       assigns: %{
    324         user: user_a
    325       }
    326     }
    327 
    328     {:ok, activity} =
    329       CommonAPI.post(user_b, %{
    330         "status" => "Test",
    331         "visibility" => "private"
    332       })
    333 
    334     topics = %{
    335       "list:#{list.id}" => [fake_socket]
    336     }
    337 
    338     Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
    339 
    340     Task.await(task)
    341   end
    342 
    343   test "it send wanted private posts to list" do
    344     user_a = insert(:user)
    345     user_b = insert(:user)
    346 
    347     {:ok, user_a} = User.follow(user_a, user_b)
    348 
    349     {:ok, list} = List.create("Test", user_a)
    350     {:ok, list} = List.follow(list, user_b)
    351 
    352     task =
    353       Task.async(fn ->
    354         assert_receive {:text, _}, 1_000
    355       end)
    356 
    357     fake_socket = %{
    358       transport_pid: task.pid,
    359       assigns: %{
    360         user: user_a
    361       }
    362     }
    363 
    364     {:ok, activity} =
    365       CommonAPI.post(user_b, %{
    366         "status" => "Test",
    367         "visibility" => "private"
    368       })
    369 
    370     topics = %{
    371       "list:#{list.id}" => [fake_socket]
    372     }
    373 
    374     Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
    375 
    376     Task.await(task)
    377   end
    378 
    379   test "it doesn't send muted reblogs" do
    380     user1 = insert(:user)
    381     user2 = insert(:user)
    382     user3 = insert(:user)
    383     CommonAPI.hide_reblogs(user1, user2)
    384 
    385     task =
    386       Task.async(fn ->
    387         refute_receive {:text, _}, 1_000
    388       end)
    389 
    390     fake_socket = %{
    391       transport_pid: task.pid,
    392       assigns: %{
    393         user: user1
    394       }
    395     }
    396 
    397     {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
    398     {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2)
    399 
    400     topics = %{
    401       "public" => [fake_socket]
    402     }
    403 
    404     Streamer.push_to_socket(topics, "public", announce_activity)
    405 
    406     Task.await(task)
    407   end
    408 
    409   test "it doesn't send posts from muted threads" do
    410     user = insert(:user)
    411     user2 = insert(:user)
    412     {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
    413 
    414     {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
    415 
    416     {:ok, activity} = CommonAPI.add_mute(user2, activity)
    417 
    418     task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
    419 
    420     Streamer.add_socket(
    421       "user",
    422       %{transport_pid: task.pid, assigns: %{user: user2}}
    423     )
    424 
    425     Streamer.stream("user", activity)
    426     Task.await(task)
    427   end
    428 
    429   describe "direct streams" do
    430     setup do
    431       GenServer.start(Streamer, %{}, name: Streamer)
    432 
    433       on_exit(fn ->
    434         if pid = Process.whereis(Streamer) do
    435           Process.exit(pid, :kill)
    436         end
    437       end)
    438 
    439       :ok
    440     end
    441 
    442     test "it sends conversation update to the 'direct' stream", %{} do
    443       user = insert(:user)
    444       another_user = insert(:user)
    445 
    446       task =
    447         Task.async(fn ->
    448           assert_receive {:text, _received_event}, 4_000
    449         end)
    450 
    451       Streamer.add_socket(
    452         "direct",
    453         %{transport_pid: task.pid, assigns: %{user: user}}
    454       )
    455 
    456       {:ok, _create_activity} =
    457         CommonAPI.post(another_user, %{
    458           "status" => "hey @#{user.nickname}",
    459           "visibility" => "direct"
    460         })
    461 
    462       Task.await(task)
    463     end
    464 
    465     test "it doesn't send conversation update to the 'direct' streamj when the last message in the conversation is deleted" do
    466       user = insert(:user)
    467       another_user = insert(:user)
    468 
    469       {:ok, create_activity} =
    470         CommonAPI.post(another_user, %{
    471           "status" => "hi @#{user.nickname}",
    472           "visibility" => "direct"
    473         })
    474 
    475       task =
    476         Task.async(fn ->
    477           assert_receive {:text, received_event}, 4_000
    478           assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
    479 
    480           refute_receive {:text, _}, 4_000
    481         end)
    482 
    483       Streamer.add_socket(
    484         "direct",
    485         %{transport_pid: task.pid, assigns: %{user: user}}
    486       )
    487 
    488       {:ok, _} = CommonAPI.delete(create_activity.id, another_user)
    489 
    490       Task.await(task)
    491     end
    492 
    493     test "it sends conversation update to the 'direct' stream when a message is deleted" do
    494       user = insert(:user)
    495       another_user = insert(:user)
    496 
    497       {:ok, create_activity} =
    498         CommonAPI.post(another_user, %{
    499           "status" => "hi @#{user.nickname}",
    500           "visibility" => "direct"
    501         })
    502 
    503       {:ok, create_activity2} =
    504         CommonAPI.post(another_user, %{
    505           "status" => "hi @#{user.nickname}",
    506           "in_reply_to_status_id" => create_activity.id,
    507           "visibility" => "direct"
    508         })
    509 
    510       task =
    511         Task.async(fn ->
    512           assert_receive {:text, received_event}, 4_000
    513           assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
    514 
    515           assert_receive {:text, received_event}, 4_000
    516 
    517           assert %{"event" => "conversation", "payload" => received_payload} =
    518                    Jason.decode!(received_event)
    519 
    520           assert %{"last_status" => last_status} = Jason.decode!(received_payload)
    521           assert last_status["id"] == to_string(create_activity.id)
    522         end)
    523 
    524       Streamer.add_socket(
    525         "direct",
    526         %{transport_pid: task.pid, assigns: %{user: user}}
    527       )
    528 
    529       {:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
    530 
    531       Task.await(task)
    532     end
    533   end
    534 end