logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma

reverse_proxy.ex (13282B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.ReverseProxy do
  5. @range_headers ~w(range if-range)
  6. @keep_req_headers ~w(accept user-agent accept-encoding cache-control if-modified-since) ++
  7. ~w(if-unmodified-since if-none-match) ++ @range_headers
  8. @resp_cache_headers ~w(etag date last-modified)
  9. @keep_resp_headers @resp_cache_headers ++
  10. ~w(content-length content-type content-disposition content-encoding) ++
  11. ~w(content-range accept-ranges vary)
  12. @default_cache_control_header "public, max-age=1209600"
  13. @valid_resp_codes [200, 206, 304]
  14. @max_read_duration :timer.seconds(30)
  15. @max_body_length :infinity
  16. @failed_request_ttl :timer.seconds(60)
  17. @methods ~w(GET HEAD)
  18. def max_read_duration_default, do: @max_read_duration
  19. def default_cache_control_header, do: @default_cache_control_header
  20. @moduledoc """
  21. A reverse proxy.
  22. Pleroma.ReverseProxy.call(conn, url, options)
  23. It is not meant to be added into a plug pipeline, but to be called from another plug or controller.
  24. Supports `#{inspect(@methods)}` HTTP methods, and only allows `#{inspect(@valid_resp_codes)}` status codes.
  25. Responses are chunked to the client while downloading from the upstream.
  26. Some request / responses headers are preserved:
  27. * request: `#{inspect(@keep_req_headers)}`
  28. * response: `#{inspect(@keep_resp_headers)}`
  29. Options:
  30. * `redirect_on_failure` (default `false`). Redirects the client to the real remote URL if there's any HTTP
  31. errors. Any error during body processing will not be redirected as the response is chunked. This may expose
  32. remote URL, clients IPs, ….
  33. * `max_body_length` (default `#{inspect(@max_body_length)}`): limits the content length to be approximately the
  34. specified length. It is validated with the `content-length` header and also verified when proxying.
  35. * `max_read_duration` (default `#{inspect(@max_read_duration)}` ms): the total time the connection is allowed to
  36. read from the remote upstream.
  37. * `failed_request_ttl` (default `#{inspect(@failed_request_ttl)}` ms): the time the failed request is cached and cannot be retried.
  38. * `inline_content_types`:
  39. * `true` will not alter `content-disposition` (up to the upstream),
  40. * `false` will add `content-disposition: attachment` to any request,
  41. * a list of whitelisted content types
  42. * `keep_user_agent` will forward the client's user-agent to the upstream. This may be useful if the upstream is
  43. doing content transformation (encoding, …) depending on the request.
  44. * `req_headers`, `resp_headers` additional headers.
  45. * `http`: options for [hackney](https://github.com/benoitc/hackney) or [gun](https://github.com/ninenines/gun).
  46. """
  47. @default_options [pool: :media]
  48. @inline_content_types [
  49. "image/gif",
  50. "image/jpeg",
  51. "image/jpg",
  52. "image/png",
  53. "image/svg+xml",
  54. "audio/mpeg",
  55. "audio/mp3",
  56. "video/webm",
  57. "video/mp4",
  58. "video/quicktime"
  59. ]
  60. require Logger
  61. import Plug.Conn
  62. @type option() ::
  63. {:keep_user_agent, boolean}
  64. | {:max_read_duration, :timer.time() | :infinity}
  65. | {:max_body_length, non_neg_integer() | :infinity}
  66. | {:failed_request_ttl, :timer.time() | :infinity}
  67. | {:http, []}
  68. | {:req_headers, [{String.t(), String.t()}]}
  69. | {:resp_headers, [{String.t(), String.t()}]}
  70. | {:inline_content_types, boolean() | [String.t()]}
  71. | {:redirect_on_failure, boolean()}
  72. @spec call(Plug.Conn.t(), url :: String.t(), [option()]) :: Plug.Conn.t()
  73. def call(_conn, _url, _opts \\ [])
  74. def call(conn = %{method: method}, url, opts) when method in @methods do
  75. client_opts = Keyword.merge(@default_options, Keyword.get(opts, :http, []))
  76. req_headers = build_req_headers(conn.req_headers, opts)
  77. opts =
  78. if filename = Pleroma.Web.MediaProxy.filename(url) do
  79. Keyword.put_new(opts, :attachment_name, filename)
  80. else
  81. opts
  82. end
  83. with {:ok, nil} <- Cachex.get(:failed_proxy_url_cache, url),
  84. {:ok, code, headers, client} <- request(method, url, req_headers, client_opts),
  85. :ok <-
  86. header_length_constraint(
  87. headers,
  88. Keyword.get(opts, :max_body_length, @max_body_length)
  89. ) do
  90. response(conn, client, url, code, headers, opts)
  91. else
  92. {:ok, true} ->
  93. conn
  94. |> error_or_redirect(url, 500, "Request failed", opts)
  95. |> halt()
  96. {:ok, code, headers} ->
  97. head_response(conn, url, code, headers, opts)
  98. |> halt()
  99. {:error, {:invalid_http_response, code}} ->
  100. Logger.error("#{__MODULE__}: request to #{inspect(url)} failed with HTTP status #{code}")
  101. track_failed_url(url, code, opts)
  102. conn
  103. |> error_or_redirect(
  104. url,
  105. code,
  106. "Request failed: " <> Plug.Conn.Status.reason_phrase(code),
  107. opts
  108. )
  109. |> halt()
  110. {:error, error} ->
  111. Logger.error("#{__MODULE__}: request to #{inspect(url)} failed: #{inspect(error)}")
  112. track_failed_url(url, error, opts)
  113. conn
  114. |> error_or_redirect(url, 500, "Request failed", opts)
  115. |> halt()
  116. end
  117. end
  118. def call(conn, _, _) do
  119. conn
  120. |> send_resp(400, Plug.Conn.Status.reason_phrase(400))
  121. |> halt()
  122. end
  123. defp request(method, url, headers, opts) do
  124. Logger.debug("#{__MODULE__} #{method} #{url} #{inspect(headers)}")
  125. method = method |> String.downcase() |> String.to_existing_atom()
  126. case client().request(method, url, headers, "", opts) do
  127. {:ok, code, headers, client} when code in @valid_resp_codes ->
  128. {:ok, code, downcase_headers(headers), client}
  129. {:ok, code, headers} when code in @valid_resp_codes ->
  130. {:ok, code, downcase_headers(headers)}
  131. {:ok, code, _, _} ->
  132. {:error, {:invalid_http_response, code}}
  133. {:ok, code, _} ->
  134. {:error, {:invalid_http_response, code}}
  135. {:error, error} ->
  136. {:error, error}
  137. end
  138. end
  139. defp response(conn, client, url, status, headers, opts) do
  140. Logger.debug("#{__MODULE__} #{status} #{url} #{inspect(headers)}")
  141. result =
  142. conn
  143. |> put_resp_headers(build_resp_headers(headers, opts))
  144. |> send_chunked(status)
  145. |> chunk_reply(client, opts)
  146. case result do
  147. {:ok, conn} ->
  148. halt(conn)
  149. {:error, :closed, conn} ->
  150. client().close(client)
  151. halt(conn)
  152. {:error, error, conn} ->
  153. Logger.warn(
  154. "#{__MODULE__} request to #{url} failed while reading/chunking: #{inspect(error)}"
  155. )
  156. client().close(client)
  157. halt(conn)
  158. end
  159. end
  160. defp chunk_reply(conn, client, opts) do
  161. chunk_reply(conn, client, opts, 0, 0)
  162. end
  163. defp chunk_reply(conn, client, opts, sent_so_far, duration) do
  164. with {:ok, duration} <-
  165. check_read_duration(
  166. duration,
  167. Keyword.get(opts, :max_read_duration, @max_read_duration)
  168. ),
  169. {:ok, data, client} <- client().stream_body(client),
  170. {:ok, duration} <- increase_read_duration(duration),
  171. sent_so_far = sent_so_far + byte_size(data),
  172. :ok <-
  173. body_size_constraint(
  174. sent_so_far,
  175. Keyword.get(opts, :max_body_length, @max_body_length)
  176. ),
  177. {:ok, conn} <- chunk(conn, data) do
  178. chunk_reply(conn, client, opts, sent_so_far, duration)
  179. else
  180. :done -> {:ok, conn}
  181. {:error, error} -> {:error, error, conn}
  182. end
  183. end
  184. defp head_response(conn, url, code, headers, opts) do
  185. Logger.debug("#{__MODULE__} #{code} #{url} #{inspect(headers)}")
  186. conn
  187. |> put_resp_headers(build_resp_headers(headers, opts))
  188. |> send_resp(code, "")
  189. end
  190. defp error_or_redirect(conn, url, code, body, opts) do
  191. if Keyword.get(opts, :redirect_on_failure, false) do
  192. conn
  193. |> Phoenix.Controller.redirect(external: url)
  194. |> halt()
  195. else
  196. conn
  197. |> send_resp(code, body)
  198. |> halt
  199. end
  200. end
  201. defp downcase_headers(headers) do
  202. Enum.map(headers, fn {k, v} ->
  203. {String.downcase(k), v}
  204. end)
  205. end
  206. defp get_content_type(headers) do
  207. {_, content_type} =
  208. List.keyfind(headers, "content-type", 0, {"content-type", "application/octet-stream"})
  209. [content_type | _] = String.split(content_type, ";")
  210. content_type
  211. end
  212. defp put_resp_headers(conn, headers) do
  213. Enum.reduce(headers, conn, fn {k, v}, conn ->
  214. put_resp_header(conn, k, v)
  215. end)
  216. end
  217. defp build_req_headers(headers, opts) do
  218. headers
  219. |> downcase_headers()
  220. |> Enum.filter(fn {k, _} -> k in @keep_req_headers end)
  221. |> build_req_range_or_encoding_header(opts)
  222. |> build_req_user_agent_header(opts)
  223. |> Keyword.merge(Keyword.get(opts, :req_headers, []))
  224. end
  225. # Disable content-encoding if any @range_headers are requested (see #1823).
  226. defp build_req_range_or_encoding_header(headers, _opts) do
  227. range? = Enum.any?(headers, fn {header, _} -> Enum.member?(@range_headers, header) end)
  228. if range? && List.keymember?(headers, "accept-encoding", 0) do
  229. List.keydelete(headers, "accept-encoding", 0)
  230. else
  231. headers
  232. end
  233. end
  234. defp build_req_user_agent_header(headers, opts) do
  235. if Keyword.get(opts, :keep_user_agent, false) do
  236. List.keystore(
  237. headers,
  238. "user-agent",
  239. 0,
  240. {"user-agent", Pleroma.Application.user_agent()}
  241. )
  242. else
  243. headers
  244. end
  245. end
  246. defp build_resp_headers(headers, opts) do
  247. headers
  248. |> Enum.filter(fn {k, _} -> k in @keep_resp_headers end)
  249. |> build_resp_cache_headers(opts)
  250. |> build_resp_content_disposition_header(opts)
  251. |> Keyword.merge(Keyword.get(opts, :resp_headers, []))
  252. end
  253. defp build_resp_cache_headers(headers, _opts) do
  254. has_cache? = Enum.any?(headers, fn {k, _} -> k in @resp_cache_headers end)
  255. cond do
  256. has_cache? ->
  257. # There's caching header present but no cache-control -- we need to set our own
  258. # as Plug defaults to "max-age=0, private, must-revalidate"
  259. List.keystore(
  260. headers,
  261. "cache-control",
  262. 0,
  263. {"cache-control", @default_cache_control_header}
  264. )
  265. true ->
  266. List.keystore(
  267. headers,
  268. "cache-control",
  269. 0,
  270. {"cache-control", @default_cache_control_header}
  271. )
  272. end
  273. end
  274. defp build_resp_content_disposition_header(headers, opts) do
  275. opt = Keyword.get(opts, :inline_content_types, @inline_content_types)
  276. content_type = get_content_type(headers)
  277. attachment? =
  278. cond do
  279. is_list(opt) && !Enum.member?(opt, content_type) -> true
  280. opt == false -> true
  281. true -> false
  282. end
  283. if attachment? do
  284. name =
  285. try do
  286. {{"content-disposition", content_disposition_string}, _} =
  287. List.keytake(headers, "content-disposition", 0)
  288. [name | _] =
  289. Regex.run(
  290. ~r/filename="((?:[^"\\]|\\.)*)"/u,
  291. content_disposition_string || "",
  292. capture: :all_but_first
  293. )
  294. name
  295. rescue
  296. MatchError -> Keyword.get(opts, :attachment_name, "attachment")
  297. end
  298. disposition = "attachment; filename=\"#{name}\""
  299. List.keystore(headers, "content-disposition", 0, {"content-disposition", disposition})
  300. else
  301. headers
  302. end
  303. end
  304. defp header_length_constraint(headers, limit) when is_integer(limit) and limit > 0 do
  305. with {_, size} <- List.keyfind(headers, "content-length", 0),
  306. {size, _} <- Integer.parse(size),
  307. true <- size <= limit do
  308. :ok
  309. else
  310. false ->
  311. {:error, :body_too_large}
  312. _ ->
  313. :ok
  314. end
  315. end
  316. defp header_length_constraint(_, _), do: :ok
  317. defp body_size_constraint(size, limit) when is_integer(limit) and limit > 0 and size >= limit do
  318. {:error, :body_too_large}
  319. end
  320. defp body_size_constraint(_, _), do: :ok
  321. defp check_read_duration(nil = _duration, max), do: check_read_duration(@max_read_duration, max)
  322. defp check_read_duration(duration, max)
  323. when is_integer(duration) and is_integer(max) and max > 0 do
  324. if duration > max do
  325. {:error, :read_duration_exceeded}
  326. else
  327. {:ok, {duration, :erlang.system_time(:millisecond)}}
  328. end
  329. end
  330. defp check_read_duration(_, _), do: {:ok, :no_duration_limit, :no_duration_limit}
  331. defp increase_read_duration({previous_duration, started})
  332. when is_integer(previous_duration) and is_integer(started) do
  333. duration = :erlang.system_time(:millisecond) - started
  334. {:ok, previous_duration + duration}
  335. end
  336. defp increase_read_duration(_) do
  337. {:ok, :no_duration_limit, :no_duration_limit}
  338. end
  339. defp client, do: Pleroma.ReverseProxy.Client
  340. defp track_failed_url(url, error, opts) do
  341. ttl =
  342. unless error in [:body_too_large, 400, 204] do
  343. Keyword.get(opts, :failed_request_ttl, @failed_request_ttl)
  344. else
  345. nil
  346. end
  347. Cachex.put(:failed_proxy_url_cache, url, true, ttl: ttl)
  348. end
  349. end