logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma

outgoing_handler.ex (4341B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.FedSockets.OutgoingHandler do
  5. use GenServer
  6. require Logger
  7. alias Pleroma.Application
  8. alias Pleroma.Web.ActivityPub.InternalFetchActor
  9. alias Pleroma.Web.FedSockets
  10. alias Pleroma.Web.FedSockets.FedRegistry
  11. alias Pleroma.Web.FedSockets.FedSocket
  12. alias Pleroma.Web.FedSockets.SocketInfo
  13. def start_link(uri) do
  14. GenServer.start_link(__MODULE__, %{uri: uri})
  15. end
  16. def init(%{uri: uri}) do
  17. case initiate_connection(uri) do
  18. {:ok, ws_origin, conn_pid} ->
  19. FedRegistry.add_fed_socket(ws_origin, conn_pid)
  20. {:error, reason} ->
  21. Logger.debug("Outgoing connection failed - #{inspect(reason)}")
  22. :ignore
  23. end
  24. end
  25. def handle_info({:gun_ws, conn_pid, _ref, {:text, data}}, socket_info) do
  26. socket_info = SocketInfo.touch(socket_info)
  27. case FedSocket.receive_package(socket_info, data) do
  28. {:noreply, _} ->
  29. {:noreply, socket_info}
  30. {:reply, reply} ->
  31. :gun.ws_send(conn_pid, {:text, Jason.encode!(reply)})
  32. {:noreply, socket_info}
  33. {:error, reason} ->
  34. Logger.error("incoming error - receive_package: #{inspect(reason)}")
  35. {:noreply, socket_info}
  36. end
  37. end
  38. def handle_info(:close, state) do
  39. Logger.debug("Sending close frame !!!!!!!")
  40. {:close, state}
  41. end
  42. def handle_info({:gun_down, _pid, _prot, :closed, _}, state) do
  43. {:stop, :normal, state}
  44. end
  45. def handle_info({:send, data}, %{conn_pid: conn_pid} = socket_info) do
  46. socket_info = SocketInfo.touch(socket_info)
  47. :gun.ws_send(conn_pid, {:text, data})
  48. {:noreply, socket_info}
  49. end
  50. def handle_info({:gun_ws, _, _, :pong}, state) do
  51. {:noreply, state, :hibernate}
  52. end
  53. def handle_info(msg, state) do
  54. Logger.debug("#{__MODULE__} unhandled event #{inspect(msg)}")
  55. {:noreply, state}
  56. end
  57. def terminate(reason, state) do
  58. Logger.debug(
  59. "#{__MODULE__} terminating outgoing connection for #{inspect(state)} for #{inspect(reason)}"
  60. )
  61. {:ok, state}
  62. end
  63. def initiate_connection(uri) do
  64. ws_uri =
  65. uri
  66. |> SocketInfo.origin()
  67. |> FedSockets.uri_for_origin()
  68. %{host: host, port: port, path: path} = URI.parse(ws_uri)
  69. with {:ok, conn_pid} <- :gun.open(to_charlist(host), port, %{protocols: [:http]}),
  70. {:ok, _} <- :gun.await_up(conn_pid),
  71. reference <-
  72. :gun.get(conn_pid, to_charlist(path), [
  73. {'user-agent', to_charlist(Application.user_agent())}
  74. ]),
  75. {:response, :fin, 204, _} <- :gun.await(conn_pid, reference),
  76. headers <- build_headers(uri),
  77. ref <- :gun.ws_upgrade(conn_pid, to_charlist(path), headers, %{silence_pings: false}) do
  78. receive do
  79. {:gun_upgrade, ^conn_pid, ^ref, [<<"websocket">>], _} ->
  80. {:ok, ws_uri, conn_pid}
  81. after
  82. 15_000 ->
  83. Logger.debug("Fedsocket timeout connecting to #{inspect(uri)}")
  84. {:error, :timeout}
  85. end
  86. else
  87. {:response, :nofin, 404, _} ->
  88. {:error, :fedsockets_not_supported}
  89. e ->
  90. Logger.debug("Fedsocket error connecting to #{inspect(uri)}")
  91. {:error, e}
  92. end
  93. end
  94. defp build_headers(uri) do
  95. host_for_sig = uri |> URI.parse() |> host_signature()
  96. shake = FedSocket.shake()
  97. digest = "SHA-256=" <> (:crypto.hash(:sha256, shake) |> Base.encode64())
  98. date = Pleroma.Signature.signed_date()
  99. shake_size = byte_size(shake)
  100. signature_opts = %{
  101. "(request-target)": shake,
  102. "content-length": to_charlist("#{shake_size}"),
  103. date: date,
  104. digest: digest,
  105. host: host_for_sig
  106. }
  107. signature = Pleroma.Signature.sign(InternalFetchActor.get_actor(), signature_opts)
  108. [
  109. {'signature', to_charlist(signature)},
  110. {'date', date},
  111. {'digest', to_charlist(digest)},
  112. {'content-length', to_charlist("#{shake_size}")},
  113. {to_charlist("(request-target)"), to_charlist(shake)},
  114. {'user-agent', to_charlist(Application.user_agent())}
  115. ]
  116. end
  117. defp host_signature(%{host: host, scheme: scheme, port: port}) do
  118. if port == URI.default_port(scheme) do
  119. host
  120. else
  121. "#{host}:#{port}"
  122. end
  123. end
  124. end