logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma

fed_socket.ex (3763B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.FedSockets.FedSocket do
  5. @moduledoc """
  6. The FedSocket module abstracts the actions to be taken taken on connections regardless of
  7. whether the connection started as inbound or outbound.
  8. Normally outside modules will have no need to call the FedSocket module directly.
  9. """
  10. alias Pleroma.Object
  11. alias Pleroma.Object.Containment
  12. alias Pleroma.User
  13. alias Pleroma.Web.ActivityPub.ObjectView
  14. alias Pleroma.Web.ActivityPub.UserView
  15. alias Pleroma.Web.ActivityPub.Visibility
  16. alias Pleroma.Web.FedSockets.FetchRegistry
  17. alias Pleroma.Web.FedSockets.IngesterWorker
  18. alias Pleroma.Web.FedSockets.OutgoingHandler
  19. alias Pleroma.Web.FedSockets.SocketInfo
  20. require Logger
  21. @shake "61dd18f7-f1e6-49a4-939a-a749fcdc1103"
  22. def connect_to_host(uri) do
  23. case OutgoingHandler.start_link(uri) do
  24. {:ok, pid} ->
  25. {:ok, pid}
  26. error ->
  27. {:error, error}
  28. end
  29. end
  30. def close(%SocketInfo{pid: socket_pid}),
  31. do: Process.send(socket_pid, :close, [])
  32. def publish(%SocketInfo{pid: socket_pid}, json) do
  33. %{action: :publish, data: json}
  34. |> Jason.encode!()
  35. |> send_packet(socket_pid)
  36. end
  37. def fetch(%SocketInfo{pid: socket_pid}, id) do
  38. fetch_uuid = FetchRegistry.register_fetch(id)
  39. %{action: :fetch, data: id, uuid: fetch_uuid}
  40. |> Jason.encode!()
  41. |> send_packet(socket_pid)
  42. wait_for_fetch_to_return(fetch_uuid, 0)
  43. end
  44. def receive_package(%SocketInfo{} = fed_socket, json) do
  45. json
  46. |> Jason.decode!()
  47. |> process_package(fed_socket)
  48. end
  49. defp wait_for_fetch_to_return(uuid, cntr) do
  50. case FetchRegistry.check_fetch(uuid) do
  51. {:error, :waiting} ->
  52. Process.sleep(:math.pow(cntr, 3) |> Kernel.trunc())
  53. wait_for_fetch_to_return(uuid, cntr + 1)
  54. {:error, :missing} ->
  55. Logger.error("FedSocket fetch timed out - #{inspect(uuid)}")
  56. {:error, :timeout}
  57. {:ok, _fr} ->
  58. FetchRegistry.pop_fetch(uuid)
  59. end
  60. end
  61. defp process_package(%{"action" => "publish", "data" => data}, %{origin: origin} = _fed_socket) do
  62. if Containment.contain_origin(origin, data) do
  63. IngesterWorker.enqueue("ingest", %{"object" => data})
  64. end
  65. {:reply, %{"action" => "publish_reply", "status" => "processed"}}
  66. end
  67. defp process_package(%{"action" => "fetch_reply", "uuid" => uuid, "data" => data}, _fed_socket) do
  68. FetchRegistry.register_fetch_received(uuid, data)
  69. {:noreply, nil}
  70. end
  71. defp process_package(%{"action" => "fetch", "uuid" => uuid, "data" => ap_id}, _fed_socket) do
  72. {:ok, data} = render_fetched_data(ap_id, uuid)
  73. {:reply, data}
  74. end
  75. defp process_package(%{"action" => "publish_reply"}, _fed_socket) do
  76. {:noreply, nil}
  77. end
  78. defp process_package(other, _fed_socket) do
  79. Logger.warn("unknown json packages received #{inspect(other)}")
  80. {:noreply, nil}
  81. end
  82. defp render_fetched_data(ap_id, uuid) do
  83. {:ok,
  84. %{
  85. "action" => "fetch_reply",
  86. "status" => "processed",
  87. "uuid" => uuid,
  88. "data" => represent_item(ap_id)
  89. }}
  90. end
  91. defp represent_item(ap_id) do
  92. case User.get_by_ap_id(ap_id) do
  93. nil ->
  94. object = Object.get_cached_by_ap_id(ap_id)
  95. if Visibility.is_public?(object) do
  96. Phoenix.View.render_to_string(ObjectView, "object.json", object: object)
  97. else
  98. nil
  99. end
  100. user ->
  101. Phoenix.View.render_to_string(UserView, "user.json", user: user)
  102. end
  103. end
  104. defp send_packet(data, socket_pid) do
  105. Process.send(socket_pid, {:send, data}, [])
  106. end
  107. def shake, do: @shake
  108. end