logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://hacktivis.me/git/pleroma.git

reverse_proxy.ex (12825B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.ReverseProxy do
  5. @range_headers ~w(range if-range)
  6. @keep_req_headers ~w(accept accept-encoding cache-control if-modified-since) ++
  7. ~w(if-unmodified-since if-none-match) ++ @range_headers
  8. @resp_cache_headers ~w(etag date last-modified)
  9. @keep_resp_headers @resp_cache_headers ++
  10. ~w(content-type content-disposition content-encoding) ++
  11. ~w(content-range accept-ranges vary)
  12. @default_cache_control_header "public, max-age=1209600"
  13. @valid_resp_codes [200, 206, 304]
  14. @max_read_duration :timer.seconds(30)
  15. @max_body_length :infinity
  16. @failed_request_ttl :timer.seconds(60)
  17. @methods ~w(GET HEAD)
  18. @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
  19. def max_read_duration_default, do: @max_read_duration
  20. def default_cache_control_header, do: @default_cache_control_header
  21. @moduledoc """
  22. A reverse proxy.
  23. Pleroma.ReverseProxy.call(conn, url, options)
  24. It is not meant to be added into a plug pipeline, but to be called from another plug or controller.
  25. Supports `#{inspect(@methods)}` HTTP methods, and only allows `#{inspect(@valid_resp_codes)}` status codes.
  26. Responses are chunked to the client while downloading from the upstream.
  27. Some request / responses headers are preserved:
  28. * request: `#{inspect(@keep_req_headers)}`
  29. * response: `#{inspect(@keep_resp_headers)}`
  30. Options:
  31. * `redirect_on_failure` (default `false`). Redirects the client to the real remote URL if there's any HTTP
  32. errors. Any error during body processing will not be redirected as the response is chunked. This may expose
  33. remote URL, clients IPs, ….
  34. * `max_body_length` (default `#{inspect(@max_body_length)}`): limits the content length to be approximately the
  35. specified length. It is validated with the `content-length` header and also verified when proxying.
  36. * `max_read_duration` (default `#{inspect(@max_read_duration)}` ms): the total time the connection is allowed to
  37. read from the remote upstream.
  38. * `failed_request_ttl` (default `#{inspect(@failed_request_ttl)}` ms): the time the failed request is cached and cannot be retried.
  39. * `inline_content_types`:
  40. * `true` will not alter `content-disposition` (up to the upstream),
  41. * `false` will add `content-disposition: attachment` to any request,
  42. * a list of whitelisted content types
  43. * `req_headers`, `resp_headers` additional headers.
  44. * `http`: options for [hackney](https://github.com/benoitc/hackney) or [gun](https://github.com/ninenines/gun).
  45. """
  46. @default_options [pool: :media]
  47. @inline_content_types [
  48. "image/gif",
  49. "image/jpeg",
  50. "image/jpg",
  51. "image/png",
  52. "image/svg+xml",
  53. "audio/mpeg",
  54. "audio/mp3",
  55. "video/webm",
  56. "video/mp4",
  57. "video/quicktime"
  58. ]
  59. require Logger
  60. import Plug.Conn
  61. @type option() ::
  62. {:max_read_duration, non_neg_integer() | :infinity}
  63. | {:max_body_length, non_neg_integer() | :infinity}
  64. | {:failed_request_ttl, non_neg_integer() | :infinity}
  65. | {:http, keyword()}
  66. | {:req_headers, [{String.t(), String.t()}]}
  67. | {:resp_headers, [{String.t(), String.t()}]}
  68. | {:inline_content_types, boolean() | list(String.t())}
  69. | {:redirect_on_failure, boolean()}
  70. @spec call(Plug.Conn.t(), String.t(), list(option())) :: Plug.Conn.t()
  71. def call(_conn, _url, _opts \\ [])
  72. def call(conn = %{method: method}, url, opts) when method in @methods do
  73. client_opts = Keyword.merge(@default_options, Keyword.get(opts, :http, []))
  74. req_headers = build_req_headers(conn.req_headers, opts)
  75. opts =
  76. if filename = Pleroma.Web.MediaProxy.filename(url) do
  77. Keyword.put_new(opts, :attachment_name, filename)
  78. else
  79. opts
  80. end
  81. with {:ok, nil} <- @cachex.get(:failed_proxy_url_cache, url),
  82. {:ok, code, headers, client} <- request(method, url, req_headers, client_opts),
  83. :ok <-
  84. header_length_constraint(
  85. headers,
  86. Keyword.get(opts, :max_body_length, @max_body_length)
  87. ) do
  88. response(conn, client, url, code, headers, opts)
  89. else
  90. {:ok, true} ->
  91. conn
  92. |> error_or_redirect(url, 500, "Request failed", opts)
  93. |> halt()
  94. {:ok, code, headers} ->
  95. head_response(conn, url, code, headers, opts)
  96. |> halt()
  97. {:error, {:invalid_http_response, code}} ->
  98. Logger.error("#{__MODULE__}: request to #{inspect(url)} failed with HTTP status #{code}")
  99. track_failed_url(url, code, opts)
  100. conn
  101. |> error_or_redirect(
  102. url,
  103. code,
  104. "Request failed: " <> Plug.Conn.Status.reason_phrase(code),
  105. opts
  106. )
  107. |> halt()
  108. {:error, error} ->
  109. Logger.error("#{__MODULE__}: request to #{inspect(url)} failed: #{inspect(error)}")
  110. track_failed_url(url, error, opts)
  111. conn
  112. |> error_or_redirect(url, 500, "Request failed", opts)
  113. |> halt()
  114. end
  115. end
  116. def call(conn, _, _) do
  117. conn
  118. |> send_resp(400, Plug.Conn.Status.reason_phrase(400))
  119. |> halt()
  120. end
  121. defp request(method, url, headers, opts) do
  122. Logger.debug("#{__MODULE__} #{method} #{url} #{inspect(headers)}")
  123. method = method |> String.downcase() |> String.to_existing_atom()
  124. case client().request(method, url, headers, "", opts) do
  125. {:ok, code, headers, client} when code in @valid_resp_codes ->
  126. {:ok, code, downcase_headers(headers), client}
  127. {:ok, code, headers} when code in @valid_resp_codes ->
  128. {:ok, code, downcase_headers(headers)}
  129. {:ok, code, _, _} ->
  130. {:error, {:invalid_http_response, code}}
  131. {:ok, code, _} ->
  132. {:error, {:invalid_http_response, code}}
  133. {:error, error} ->
  134. {:error, error}
  135. end
  136. end
  137. defp response(conn, client, url, status, headers, opts) do
  138. Logger.debug("#{__MODULE__} #{status} #{url} #{inspect(headers)}")
  139. result =
  140. conn
  141. |> put_resp_headers(build_resp_headers(headers, opts))
  142. |> send_chunked(status)
  143. |> chunk_reply(client, opts)
  144. case result do
  145. {:ok, conn} ->
  146. halt(conn)
  147. {:error, :closed, conn} ->
  148. client().close(client)
  149. halt(conn)
  150. {:error, error, conn} ->
  151. Logger.warning(
  152. "#{__MODULE__} request to #{url} failed while reading/chunking: #{inspect(error)}"
  153. )
  154. client().close(client)
  155. halt(conn)
  156. end
  157. end
  158. defp chunk_reply(conn, client, opts) do
  159. chunk_reply(conn, client, opts, 0, 0)
  160. end
  161. defp chunk_reply(conn, client, opts, sent_so_far, duration) do
  162. with {:ok, duration} <-
  163. check_read_duration(
  164. duration,
  165. Keyword.get(opts, :max_read_duration, @max_read_duration)
  166. ),
  167. {:ok, data, client} <- client().stream_body(client),
  168. {:ok, duration} <- increase_read_duration(duration),
  169. sent_so_far = sent_so_far + byte_size(data),
  170. :ok <-
  171. body_size_constraint(
  172. sent_so_far,
  173. Keyword.get(opts, :max_body_length, @max_body_length)
  174. ),
  175. {:ok, conn} <- chunk(conn, data) do
  176. chunk_reply(conn, client, opts, sent_so_far, duration)
  177. else
  178. :done -> {:ok, conn}
  179. {:error, error} -> {:error, error, conn}
  180. end
  181. end
  182. defp head_response(conn, url, code, headers, opts) do
  183. Logger.debug("#{__MODULE__} #{code} #{url} #{inspect(headers)}")
  184. conn
  185. |> put_resp_headers(build_resp_headers(headers, opts))
  186. |> send_resp(code, "")
  187. end
  188. defp error_or_redirect(conn, url, code, body, opts) do
  189. if Keyword.get(opts, :redirect_on_failure, false) do
  190. conn
  191. |> Phoenix.Controller.redirect(external: url)
  192. |> halt()
  193. else
  194. conn
  195. |> send_resp(code, body)
  196. |> halt
  197. end
  198. end
  199. defp downcase_headers(headers) do
  200. Enum.map(headers, fn {k, v} ->
  201. {String.downcase(k), v}
  202. end)
  203. end
  204. defp get_content_type(headers) do
  205. {_, content_type} =
  206. List.keyfind(headers, "content-type", 0, {"content-type", "application/octet-stream"})
  207. [content_type | _] = String.split(content_type, ";")
  208. content_type
  209. end
  210. defp put_resp_headers(conn, headers) do
  211. Enum.reduce(headers, conn, fn {k, v}, conn ->
  212. put_resp_header(conn, k, v)
  213. end)
  214. end
  215. defp build_req_headers(headers, opts) do
  216. headers
  217. |> downcase_headers()
  218. |> Enum.filter(fn {k, _} -> k in @keep_req_headers end)
  219. |> build_req_range_or_encoding_header(opts)
  220. |> build_req_user_agent_header(opts)
  221. |> Keyword.merge(Keyword.get(opts, :req_headers, []))
  222. end
  223. # Disable content-encoding if any @range_headers are requested (see #1823).
  224. defp build_req_range_or_encoding_header(headers, _opts) do
  225. range? = Enum.any?(headers, fn {header, _} -> Enum.member?(@range_headers, header) end)
  226. if range? && List.keymember?(headers, "accept-encoding", 0) do
  227. List.keydelete(headers, "accept-encoding", 0)
  228. else
  229. headers
  230. end
  231. end
  232. defp build_req_user_agent_header(headers, _opts) do
  233. List.keystore(
  234. headers,
  235. "user-agent",
  236. 0,
  237. {"user-agent", Pleroma.Application.user_agent()}
  238. )
  239. end
  240. defp build_resp_headers(headers, opts) do
  241. headers
  242. |> Enum.filter(fn {k, _} -> k in @keep_resp_headers end)
  243. |> build_resp_cache_headers(opts)
  244. |> build_resp_content_disposition_header(opts)
  245. |> Keyword.merge(Keyword.get(opts, :resp_headers, []))
  246. end
  247. defp build_resp_cache_headers(headers, _opts) do
  248. has_cache? = Enum.any?(headers, fn {k, _} -> k in @resp_cache_headers end)
  249. cond do
  250. has_cache? ->
  251. # There's caching header present but no cache-control -- we need to set our own
  252. # as Plug defaults to "max-age=0, private, must-revalidate"
  253. List.keystore(
  254. headers,
  255. "cache-control",
  256. 0,
  257. {"cache-control", @default_cache_control_header}
  258. )
  259. true ->
  260. List.keystore(
  261. headers,
  262. "cache-control",
  263. 0,
  264. {"cache-control", @default_cache_control_header}
  265. )
  266. end
  267. end
  268. defp build_resp_content_disposition_header(headers, opts) do
  269. opt = Keyword.get(opts, :inline_content_types, @inline_content_types)
  270. content_type = get_content_type(headers)
  271. attachment? =
  272. cond do
  273. is_list(opt) && !Enum.member?(opt, content_type) -> true
  274. opt == false -> true
  275. true -> false
  276. end
  277. if attachment? do
  278. name =
  279. try do
  280. {{"content-disposition", content_disposition_string}, _} =
  281. List.keytake(headers, "content-disposition", 0)
  282. [name | _] =
  283. Regex.run(
  284. ~r/filename="((?:[^"\\]|\\.)*)"/u,
  285. content_disposition_string || "",
  286. capture: :all_but_first
  287. )
  288. name
  289. rescue
  290. MatchError -> Keyword.get(opts, :attachment_name, "attachment")
  291. end
  292. disposition = "attachment; filename=\"#{name}\""
  293. List.keystore(headers, "content-disposition", 0, {"content-disposition", disposition})
  294. else
  295. headers
  296. end
  297. end
  298. defp header_length_constraint(headers, limit) when is_integer(limit) and limit > 0 do
  299. with {_, size} <- List.keyfind(headers, "content-length", 0),
  300. {size, _} <- Integer.parse(size),
  301. true <- size <= limit do
  302. :ok
  303. else
  304. false ->
  305. {:error, :body_too_large}
  306. _ ->
  307. :ok
  308. end
  309. end
  310. defp header_length_constraint(_, _), do: :ok
  311. defp body_size_constraint(size, limit) when is_integer(limit) and limit > 0 and size >= limit do
  312. {:error, :body_too_large}
  313. end
  314. defp body_size_constraint(_, _), do: :ok
  315. defp check_read_duration(duration, max)
  316. when is_integer(duration) and is_integer(max) and max > 0 do
  317. if duration > max do
  318. {:error, :read_duration_exceeded}
  319. else
  320. {:ok, {duration, :erlang.system_time(:millisecond)}}
  321. end
  322. end
  323. defp check_read_duration(_, _), do: {:ok, :no_duration_limit, :no_duration_limit}
  324. defp increase_read_duration({previous_duration, started})
  325. when is_integer(previous_duration) and is_integer(started) do
  326. duration = :erlang.system_time(:millisecond) - started
  327. {:ok, previous_duration + duration}
  328. end
  329. defp client, do: Pleroma.ReverseProxy.Client.Wrapper
  330. defp track_failed_url(url, error, opts) do
  331. ttl =
  332. unless error in [:body_too_large, 400, 204] do
  333. Keyword.get(opts, :failed_request_ttl, @failed_request_ttl)
  334. else
  335. nil
  336. end
  337. @cachex.put(:failed_proxy_url_cache, url, true, ttl: ttl)
  338. end
  339. end