logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://anongit.hacktivis.me/git/pleroma.git/

activity_pub.ex (57440B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.ActivityPub.ActivityPub do
  5. alias Pleroma.Activity
  6. alias Pleroma.Activity.Ir.Topics
  7. alias Pleroma.Config
  8. alias Pleroma.Constants
  9. alias Pleroma.Conversation
  10. alias Pleroma.Conversation.Participation
  11. alias Pleroma.Filter
  12. alias Pleroma.Hashtag
  13. alias Pleroma.Maps
  14. alias Pleroma.Notification
  15. alias Pleroma.Object
  16. alias Pleroma.Object.Containment
  17. alias Pleroma.Object.Fetcher
  18. alias Pleroma.Pagination
  19. alias Pleroma.Repo
  20. alias Pleroma.Upload
  21. alias Pleroma.User
  22. alias Pleroma.Web.ActivityPub.MRF
  23. alias Pleroma.Web.ActivityPub.Transmogrifier
  24. alias Pleroma.Web.Streamer
  25. alias Pleroma.Web.WebFinger
  26. alias Pleroma.Workers.BackgroundWorker
  27. alias Pleroma.Workers.PollWorker
  28. import Ecto.Query
  29. import Pleroma.Web.ActivityPub.Utils
  30. import Pleroma.Web.ActivityPub.Visibility
  31. require Logger
  32. require Pleroma.Constants
  33. @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
  34. @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
  35. defp get_recipients(%{"type" => "Create"} = data) do
  36. to = Map.get(data, "to", [])
  37. cc = Map.get(data, "cc", [])
  38. bcc = Map.get(data, "bcc", [])
  39. actor = Map.get(data, "actor", [])
  40. recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
  41. {recipients, to, cc}
  42. end
  43. defp get_recipients(data) do
  44. to = Map.get(data, "to", [])
  45. cc = Map.get(data, "cc", [])
  46. bcc = Map.get(data, "bcc", [])
  47. recipients = Enum.concat([to, cc, bcc])
  48. {recipients, to, cc}
  49. end
  50. defp check_actor_can_insert(%{"type" => "Delete"}), do: true
  51. defp check_actor_can_insert(%{"type" => "Undo"}), do: true
  52. defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
  53. case User.get_cached_by_ap_id(actor) do
  54. %User{is_active: true} -> true
  55. _ -> false
  56. end
  57. end
  58. defp check_actor_can_insert(_), do: true
  59. defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
  60. limit = Config.get([:instance, :remote_limit])
  61. String.length(content) <= limit
  62. end
  63. defp check_remote_limit(_), do: true
  64. def increase_note_count_if_public(actor, object) do
  65. if public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
  66. end
  67. def decrease_note_count_if_public(actor, object) do
  68. if public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
  69. end
  70. def update_last_status_at_if_public(actor, object) do
  71. if public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
  72. end
  73. defp increase_replies_count_if_reply(%{
  74. "object" => %{"inReplyTo" => reply_ap_id} = object,
  75. "type" => "Create"
  76. }) do
  77. if public?(object) do
  78. Object.increase_replies_count(reply_ap_id)
  79. end
  80. end
  81. defp increase_replies_count_if_reply(_create_data), do: :noop
  82. defp increase_quotes_count_if_quote(%{
  83. "object" => %{"quoteUrl" => quote_ap_id} = object,
  84. "type" => "Create"
  85. }) do
  86. if public?(object) do
  87. Object.increase_quotes_count(quote_ap_id)
  88. end
  89. end
  90. defp increase_quotes_count_if_quote(_create_data), do: :noop
  91. @object_types ~w[ChatMessage Question Answer Audio Video Image Event Article Note Page]
  92. @impl true
  93. def persist(%{"type" => type} = object, meta) when type in @object_types do
  94. with {:ok, object} <- Object.create(object) do
  95. {:ok, object, meta}
  96. end
  97. end
  98. @impl true
  99. def persist(object, meta) do
  100. with local <- Keyword.fetch!(meta, :local),
  101. {recipients, _, _} <- get_recipients(object),
  102. {:ok, activity} <-
  103. Repo.insert(%Activity{
  104. data: object,
  105. local: local,
  106. recipients: recipients,
  107. actor: object["actor"]
  108. }),
  109. # TODO: add tests for expired activities, when Note type will be supported in new pipeline
  110. {:ok, _} <- maybe_create_activity_expiration(activity) do
  111. {:ok, activity, meta}
  112. end
  113. end
  114. @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
  115. def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
  116. with nil <- Activity.normalize(map),
  117. map <- lazy_put_activity_defaults(map, fake),
  118. {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
  119. {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
  120. {:ok, map} <- MRF.filter(map),
  121. {recipients, _, _} = get_recipients(map),
  122. {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
  123. {:containment, :ok} <- {:containment, Containment.contain_child(map)},
  124. {:ok, map, object} <- insert_full_object(map),
  125. {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
  126. # Splice in the child object if we have one.
  127. activity = Maps.put_if_present(activity, :object, object)
  128. Pleroma.Web.RichMedia.Card.get_by_activity(activity)
  129. # Add local posts to search index
  130. if local, do: Pleroma.Search.add_to_index(activity)
  131. {:ok, activity}
  132. else
  133. %Activity{} = activity ->
  134. {:ok, activity}
  135. {:actor_check, _} ->
  136. {:error, false}
  137. {:containment, _} = error ->
  138. error
  139. {:error, _} = error ->
  140. error
  141. {:fake, true, map, recipients} ->
  142. activity = %Activity{
  143. data: map,
  144. local: local,
  145. actor: map["actor"],
  146. recipients: recipients,
  147. id: "pleroma:fakeid"
  148. }
  149. Pleroma.Web.RichMedia.Card.get_by_activity(activity)
  150. {:ok, activity}
  151. {:remote_limit_pass, _} ->
  152. {:error, :remote_limit}
  153. {:reject, _} = e ->
  154. {:error, e}
  155. end
  156. end
  157. defp insert_activity_with_expiration(data, local, recipients) do
  158. struct = %Activity{
  159. data: data,
  160. local: local,
  161. actor: data["actor"],
  162. recipients: recipients
  163. }
  164. with {:ok, activity} <- Repo.insert(struct) do
  165. maybe_create_activity_expiration(activity)
  166. end
  167. end
  168. def notify_and_stream(activity) do
  169. {:ok, notifications} = Notification.create_notifications(activity)
  170. Notification.stream(notifications)
  171. original_activity =
  172. case activity do
  173. %{data: %{"type" => "Update"}, object: %{data: %{"id" => id}}} ->
  174. Activity.get_create_by_object_ap_id_with_object(id)
  175. _ ->
  176. activity
  177. end
  178. conversation = create_or_bump_conversation(original_activity, original_activity.actor)
  179. participations = get_participations(conversation)
  180. stream_out(activity)
  181. stream_out_participations(participations)
  182. end
  183. defp maybe_create_activity_expiration(
  184. %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
  185. ) do
  186. with {:ok, _job} <-
  187. Pleroma.Workers.PurgeExpiredActivity.enqueue(
  188. %{
  189. activity_id: activity.id
  190. },
  191. scheduled_at: expires_at
  192. ) do
  193. {:ok, activity}
  194. end
  195. end
  196. defp maybe_create_activity_expiration(activity), do: {:ok, activity}
  197. defp create_or_bump_conversation(activity, actor) do
  198. with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
  199. %User{} = user <- User.get_cached_by_ap_id(actor) do
  200. Participation.mark_as_read(user, conversation)
  201. {:ok, conversation}
  202. end
  203. end
  204. defp get_participations({:ok, conversation}) do
  205. conversation
  206. |> Repo.preload(:participations, force: true)
  207. |> Map.get(:participations)
  208. end
  209. defp get_participations(_), do: []
  210. def stream_out_participations(participations) do
  211. participations =
  212. participations
  213. |> Repo.preload(:user)
  214. Streamer.stream("participation", participations)
  215. end
  216. @impl true
  217. def stream_out_participations(%Object{data: %{"context" => context}}, user) do
  218. with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
  219. conversation = Repo.preload(conversation, :participations)
  220. last_activity_id =
  221. fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
  222. user: user,
  223. blocking_user: user
  224. })
  225. if last_activity_id do
  226. stream_out_participations(conversation.participations)
  227. end
  228. end
  229. end
  230. @impl true
  231. def stream_out_participations(_, _), do: :noop
  232. @impl true
  233. def stream_out(%Activity{data: %{"type" => data_type}} = activity)
  234. when data_type in ["Create", "Announce", "Delete", "Update"] do
  235. activity
  236. |> Topics.get_activity_topics()
  237. |> Streamer.stream(activity)
  238. end
  239. @impl true
  240. def stream_out(_activity) do
  241. :noop
  242. end
  243. @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
  244. def create(params, fake \\ false) do
  245. with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
  246. result
  247. end
  248. end
  249. defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
  250. additional = params[:additional] || %{}
  251. # only accept false as false value
  252. local = !(params[:local] == false)
  253. published = params[:published]
  254. quick_insert? = Config.get([:env]) == :benchmark
  255. create_data =
  256. make_create_data(
  257. %{to: to, actor: actor, published: published, context: context, object: object},
  258. additional
  259. )
  260. with {:ok, activity} <- insert(create_data, local, fake),
  261. {:fake, false, activity} <- {:fake, fake, activity},
  262. _ <- increase_replies_count_if_reply(create_data),
  263. _ <- increase_quotes_count_if_quote(create_data),
  264. {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
  265. {:ok, _actor} <- increase_note_count_if_public(actor, activity),
  266. {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
  267. _ <- notify_and_stream(activity),
  268. :ok <- maybe_schedule_poll_notifications(activity),
  269. :ok <- maybe_handle_group_posts(activity),
  270. :ok <- maybe_federate(activity) do
  271. {:ok, activity}
  272. else
  273. {:quick_insert, true, activity} ->
  274. {:ok, activity}
  275. {:fake, true, activity} ->
  276. {:ok, activity}
  277. {:error, message} ->
  278. Repo.rollback(message)
  279. end
  280. end
  281. defp maybe_schedule_poll_notifications(activity) do
  282. PollWorker.schedule_poll_end(activity)
  283. :ok
  284. end
  285. @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
  286. def listen(%{to: to, actor: actor, context: context, object: object} = params) do
  287. additional = params[:additional] || %{}
  288. # only accept false as false value
  289. local = !(params[:local] == false)
  290. published = params[:published]
  291. listen_data =
  292. make_listen_data(
  293. %{to: to, actor: actor, published: published, context: context, object: object},
  294. additional
  295. )
  296. with {:ok, activity} <- insert(listen_data, local),
  297. _ <- notify_and_stream(activity),
  298. :ok <- maybe_federate(activity) do
  299. {:ok, activity}
  300. end
  301. end
  302. @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
  303. {:ok, Activity.t()} | nil | {:error, any()}
  304. def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
  305. with {:ok, result} <-
  306. Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
  307. result
  308. end
  309. end
  310. defp do_unfollow(follower, followed, activity_id, local) do
  311. with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
  312. {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
  313. unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
  314. {:ok, activity} <- insert(unfollow_data, local),
  315. _ <- notify_and_stream(activity),
  316. :ok <- maybe_federate(activity) do
  317. {:ok, activity}
  318. else
  319. nil -> nil
  320. {:error, error} -> Repo.rollback(error)
  321. end
  322. end
  323. @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
  324. def flag(params) do
  325. with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
  326. result
  327. end
  328. end
  329. defp do_flag(
  330. %{
  331. actor: actor,
  332. context: _context,
  333. account: account,
  334. statuses: statuses,
  335. content: content
  336. } = params
  337. ) do
  338. # only accept false as false value
  339. local = !(params[:local] == false)
  340. forward = !(params[:forward] == false)
  341. additional = params[:additional] || %{}
  342. additional =
  343. if forward do
  344. Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
  345. else
  346. Map.merge(additional, %{"to" => [], "cc" => []})
  347. end
  348. with flag_data <- make_flag_data(params, additional),
  349. {:ok, activity} <- insert(flag_data, local),
  350. {:ok, stripped_activity} <- strip_report_status_data(activity),
  351. _ <- notify_and_stream(activity),
  352. :ok <-
  353. maybe_federate(stripped_activity) do
  354. User.all_users_with_privilege(:reports_manage_reports)
  355. |> Enum.filter(fn user -> user.ap_id != actor end)
  356. |> Enum.filter(fn user -> not is_nil(user.email) end)
  357. |> Enum.each(fn privileged_user ->
  358. privileged_user
  359. |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
  360. |> Pleroma.Emails.Mailer.deliver_async()
  361. end)
  362. {:ok, activity}
  363. else
  364. {:error, error} -> Repo.rollback(error)
  365. end
  366. end
  367. @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
  368. def move(%User{} = origin, %User{} = target, local \\ true) do
  369. params = %{
  370. "type" => "Move",
  371. "actor" => origin.ap_id,
  372. "object" => origin.ap_id,
  373. "target" => target.ap_id,
  374. "to" => [origin.follower_address]
  375. }
  376. with true <- origin.ap_id in target.also_known_as,
  377. {:ok, activity} <- insert(params, local),
  378. _ <- notify_and_stream(activity) do
  379. maybe_federate(activity)
  380. BackgroundWorker.new(%{
  381. "op" => "move_following",
  382. "origin_id" => origin.id,
  383. "target_id" => target.id
  384. })
  385. |> Oban.insert()
  386. {:ok, activity}
  387. else
  388. false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
  389. err -> err
  390. end
  391. end
  392. def fetch_activities_for_context_query(context, opts) do
  393. public = [Constants.as_public()]
  394. recipients =
  395. if opts[:user],
  396. do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
  397. else: public
  398. from(activity in Activity)
  399. |> maybe_preload_objects(opts)
  400. |> maybe_preload_bookmarks(opts)
  401. |> maybe_set_thread_muted_field(opts)
  402. |> restrict_unauthenticated(opts[:user])
  403. |> restrict_blocked(opts)
  404. |> restrict_blockers_visibility(opts)
  405. |> restrict_recipients(recipients, opts[:user])
  406. |> restrict_filtered(opts)
  407. |> where(
  408. [activity],
  409. fragment(
  410. "?->>'type' = ? and ?->>'context' = ?",
  411. activity.data,
  412. "Create",
  413. activity.data,
  414. ^context
  415. )
  416. )
  417. |> exclude_poll_votes(opts)
  418. |> exclude_id(opts)
  419. |> order_by([activity], desc: activity.id)
  420. end
  421. @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
  422. def fetch_activities_for_context(context, opts \\ %{}) do
  423. context
  424. |> fetch_activities_for_context_query(opts)
  425. |> Repo.all()
  426. end
  427. @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
  428. Ecto.UUID.t() | nil
  429. def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
  430. context
  431. |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
  432. |> restrict_visibility(%{visibility: "direct"})
  433. |> limit(1)
  434. |> select([a], a.id)
  435. |> Repo.one()
  436. end
  437. defp fetch_paginated_optimized(query, opts, pagination) do
  438. # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
  439. # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
  440. opts = Map.put(opts, :skip_extra_order, true)
  441. Pagination.fetch_paginated(query, opts, pagination)
  442. end
  443. def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
  444. list_memberships = Pleroma.List.memberships(opts[:user])
  445. fetch_activities_query(recipients ++ list_memberships, opts)
  446. |> fetch_paginated_optimized(opts, pagination)
  447. |> Enum.reverse()
  448. |> maybe_update_cc(list_memberships, opts[:user])
  449. end
  450. @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
  451. def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
  452. includes_local_public = Map.get(opts, :includes_local_public, false)
  453. opts = Map.delete(opts, :user)
  454. intended_recipients =
  455. if includes_local_public do
  456. [Constants.as_public(), as_local_public()]
  457. else
  458. [Constants.as_public()]
  459. end
  460. intended_recipients
  461. |> fetch_activities_query(opts)
  462. |> restrict_unlisted(opts)
  463. |> fetch_paginated_optimized(opts, pagination)
  464. end
  465. @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
  466. def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
  467. opts
  468. |> Map.put(:restrict_unlisted, true)
  469. |> fetch_public_or_unlisted_activities(pagination)
  470. end
  471. @valid_visibilities ~w[direct unlisted public private]
  472. defp restrict_visibility(query, %{visibility: visibility})
  473. when is_list(visibility) do
  474. if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
  475. from(
  476. a in query,
  477. where:
  478. fragment(
  479. "activity_visibility(?, ?, ?) = ANY (?)",
  480. a.actor,
  481. a.recipients,
  482. a.data,
  483. ^visibility
  484. )
  485. )
  486. else
  487. Logger.error("Could not restrict visibility to #{visibility}")
  488. end
  489. end
  490. defp restrict_visibility(query, %{visibility: visibility})
  491. when visibility in @valid_visibilities do
  492. from(
  493. a in query,
  494. where:
  495. fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
  496. )
  497. end
  498. defp restrict_visibility(_query, %{visibility: visibility})
  499. when visibility not in @valid_visibilities do
  500. Logger.error("Could not restrict visibility to #{visibility}")
  501. end
  502. defp restrict_visibility(query, _visibility), do: query
  503. defp exclude_visibility(query, %{exclude_visibilities: visibility})
  504. when is_list(visibility) do
  505. if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
  506. from(
  507. a in query,
  508. where:
  509. not fragment(
  510. "activity_visibility(?, ?, ?) = ANY (?)",
  511. a.actor,
  512. a.recipients,
  513. a.data,
  514. ^visibility
  515. )
  516. )
  517. else
  518. Logger.error("Could not exclude visibility to #{visibility}")
  519. query
  520. end
  521. end
  522. defp exclude_visibility(query, %{exclude_visibilities: visibility})
  523. when visibility in @valid_visibilities do
  524. from(
  525. a in query,
  526. where:
  527. not fragment(
  528. "activity_visibility(?, ?, ?) = ?",
  529. a.actor,
  530. a.recipients,
  531. a.data,
  532. ^visibility
  533. )
  534. )
  535. end
  536. defp exclude_visibility(query, %{exclude_visibilities: visibility})
  537. when visibility not in [nil | @valid_visibilities] do
  538. Logger.error("Could not exclude visibility to #{visibility}")
  539. query
  540. end
  541. defp exclude_visibility(query, _visibility), do: query
  542. defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
  543. do: query
  544. defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
  545. do: query
  546. defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
  547. local_public = as_local_public()
  548. from(
  549. a in query,
  550. where: fragment("thread_visibility(?, (?)->>'id', ?) = true", ^ap_id, a.data, ^local_public)
  551. )
  552. end
  553. defp restrict_thread_visibility(query, _, _), do: query
  554. def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
  555. params =
  556. params
  557. |> Map.put(:user, reading_user)
  558. |> Map.put(:actor_id, user.ap_id)
  559. %{
  560. godmode: params[:godmode],
  561. reading_user: reading_user
  562. }
  563. |> user_activities_recipients()
  564. |> fetch_activities(params)
  565. |> Enum.reverse()
  566. end
  567. def fetch_user_activities(user, reading_user, params \\ %{})
  568. def fetch_user_activities(user, reading_user, %{total: true} = params) do
  569. result = fetch_activities_for_user(user, reading_user, params)
  570. Keyword.put(result, :items, Enum.reverse(result[:items]))
  571. end
  572. def fetch_user_activities(user, reading_user, params) do
  573. user
  574. |> fetch_activities_for_user(reading_user, params)
  575. |> Enum.reverse()
  576. end
  577. defp fetch_activities_for_user(user, reading_user, params) do
  578. params =
  579. params
  580. |> Map.put(:type, ["Create", "Announce"])
  581. |> Map.put(:user, reading_user)
  582. |> Map.put(:actor_id, user.ap_id)
  583. |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
  584. params =
  585. if User.blocks?(reading_user, user) do
  586. params
  587. else
  588. params
  589. |> Map.put(:blocking_user, reading_user)
  590. |> Map.put(:muting_user, reading_user)
  591. end
  592. pagination_type = Map.get(params, :pagination_type) || :keyset
  593. %{
  594. godmode: params[:godmode],
  595. reading_user: reading_user
  596. }
  597. |> user_activities_recipients()
  598. |> fetch_activities(params, pagination_type)
  599. end
  600. def fetch_statuses(reading_user, %{total: true} = params) do
  601. result = fetch_activities_for_reading_user(reading_user, params)
  602. Keyword.put(result, :items, Enum.reverse(result[:items]))
  603. end
  604. def fetch_statuses(reading_user, params) do
  605. reading_user
  606. |> fetch_activities_for_reading_user(params)
  607. |> Enum.reverse()
  608. end
  609. defp fetch_activities_for_reading_user(reading_user, params) do
  610. params = Map.put(params, :type, ["Create", "Announce"])
  611. %{
  612. godmode: params[:godmode],
  613. reading_user: reading_user
  614. }
  615. |> user_activities_recipients()
  616. |> fetch_activities(params, :offset)
  617. end
  618. defp user_activities_recipients(%{godmode: true}), do: []
  619. defp user_activities_recipients(%{reading_user: reading_user}) do
  620. if not is_nil(reading_user) and reading_user.local do
  621. [
  622. Constants.as_public(),
  623. as_local_public(),
  624. reading_user.ap_id | User.following(reading_user)
  625. ]
  626. else
  627. [Constants.as_public()]
  628. end
  629. end
  630. defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
  631. raise "Can't use the child object without preloading!"
  632. end
  633. defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
  634. from(
  635. [activity, object] in query,
  636. where:
  637. fragment(
  638. "?->>'type' != ? or ?->>'actor' != ?",
  639. activity.data,
  640. "Announce",
  641. object.data,
  642. ^actor
  643. )
  644. )
  645. end
  646. defp restrict_announce_object_actor(query, _), do: query
  647. defp restrict_since(query, %{since_id: ""}), do: query
  648. defp restrict_since(query, %{since_id: since_id}) do
  649. from(activity in query, where: activity.id > ^since_id)
  650. end
  651. defp restrict_since(query, _), do: query
  652. defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
  653. raise_on_missing_preload()
  654. end
  655. defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
  656. from(
  657. [_activity, object] in query,
  658. where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
  659. )
  660. end
  661. defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
  662. restrict_embedded_tag_any(query, %{tag: tag})
  663. end
  664. defp restrict_embedded_tag_all(query, _), do: query
  665. defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
  666. raise_on_missing_preload()
  667. end
  668. defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
  669. from(
  670. [_activity, object] in query,
  671. where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
  672. )
  673. end
  674. defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
  675. restrict_embedded_tag_any(query, %{tag: [tag]})
  676. end
  677. defp restrict_embedded_tag_any(query, _), do: query
  678. defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
  679. raise_on_missing_preload()
  680. end
  681. defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
  682. from(
  683. [_activity, object] in query,
  684. where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
  685. )
  686. end
  687. defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
  688. when is_binary(tag_reject) do
  689. restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
  690. end
  691. defp restrict_embedded_tag_reject_any(query, _), do: query
  692. defp object_ids_query_for_tags(tags) do
  693. from(hto in "hashtags_objects")
  694. |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
  695. |> where([hto, ht], ht.name in ^tags)
  696. |> select([hto], hto.object_id)
  697. |> distinct([hto], true)
  698. end
  699. defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
  700. raise_on_missing_preload()
  701. end
  702. defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
  703. restrict_hashtag_any(query, %{tag: single_tag})
  704. end
  705. defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
  706. from(
  707. [_activity, object] in query,
  708. where:
  709. fragment(
  710. """
  711. (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
  712. ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
  713. AND hashtags_objects.object_id = ?) @> ?
  714. """,
  715. ^tags,
  716. object.id,
  717. ^tags
  718. )
  719. )
  720. end
  721. defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
  722. restrict_hashtag_all(query, %{tag_all: [tag]})
  723. end
  724. defp restrict_hashtag_all(query, _), do: query
  725. defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
  726. raise_on_missing_preload()
  727. end
  728. defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
  729. hashtag_ids =
  730. from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
  731. |> Repo.all()
  732. # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
  733. from(
  734. [_activity, object] in query,
  735. join: hto in "hashtags_objects",
  736. on: hto.object_id == object.id,
  737. where: hto.hashtag_id in ^hashtag_ids,
  738. distinct: [desc: object.id],
  739. order_by: [desc: object.id]
  740. )
  741. end
  742. defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
  743. restrict_hashtag_any(query, %{tag: [tag]})
  744. end
  745. defp restrict_hashtag_any(query, _), do: query
  746. defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
  747. raise_on_missing_preload()
  748. end
  749. defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
  750. from(
  751. [_activity, object] in query,
  752. where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
  753. )
  754. end
  755. defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
  756. restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
  757. end
  758. defp restrict_hashtag_reject_any(query, _), do: query
  759. defp raise_on_missing_preload do
  760. raise "Can't use the child object without preloading!"
  761. end
  762. defp restrict_recipients(query, [], _user), do: query
  763. defp restrict_recipients(query, recipients, nil) do
  764. from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
  765. end
  766. defp restrict_recipients(query, recipients, user) do
  767. from(
  768. activity in query,
  769. where: fragment("? && ?", ^recipients, activity.recipients),
  770. or_where: activity.actor == ^user.ap_id
  771. )
  772. end
  773. # Essentially, either look for activities addressed to `recipients`, _OR_ ones
  774. # that reference a hashtag that the user follows
  775. # Firstly, two fallbacks in case there's no hashtag constraint, or the user doesn't
  776. # follow any
  777. defp restrict_recipients_or_hashtags(query, recipients, user, nil) do
  778. restrict_recipients(query, recipients, user)
  779. end
  780. defp restrict_recipients_or_hashtags(query, recipients, user, []) do
  781. restrict_recipients(query, recipients, user)
  782. end
  783. defp restrict_recipients_or_hashtags(query, recipients, _user, hashtag_ids) do
  784. from([activity, object] in query)
  785. |> join(:left, [activity, object], hto in "hashtags_objects",
  786. on: hto.object_id == object.id,
  787. as: :hto
  788. )
  789. |> where(
  790. [activity, object, hto: hto],
  791. (hto.hashtag_id in ^hashtag_ids and ^Constants.as_public() in activity.recipients) or
  792. fragment("? && ?", ^recipients, activity.recipients)
  793. )
  794. end
  795. defp restrict_local(query, %{local_only: true}) do
  796. from(activity in query, where: activity.local == true)
  797. end
  798. defp restrict_local(query, _), do: query
  799. defp restrict_remote(query, %{remote: true}) do
  800. from(activity in query, where: activity.local == false)
  801. end
  802. defp restrict_remote(query, _), do: query
  803. defp restrict_actor(query, %{actor_id: actor_id}) do
  804. from(activity in query, where: activity.actor == ^actor_id)
  805. end
  806. defp restrict_actor(query, _), do: query
  807. defp restrict_type(query, %{type: type}) when is_binary(type) do
  808. from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
  809. end
  810. defp restrict_type(query, %{type: type}) do
  811. from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
  812. end
  813. defp restrict_type(query, _), do: query
  814. defp restrict_state(query, %{state: state}) do
  815. from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
  816. end
  817. defp restrict_state(query, _), do: query
  818. defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
  819. from(
  820. [_activity, object] in query,
  821. where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
  822. )
  823. end
  824. defp restrict_favorited_by(query, _), do: query
  825. defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
  826. raise "Can't use the child object without preloading!"
  827. end
  828. defp restrict_media(query, %{only_media: true}) do
  829. from(
  830. [activity, object] in query,
  831. where: fragment("(?)->>'type' = ?", activity.data, "Create"),
  832. where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
  833. )
  834. end
  835. defp restrict_media(query, _), do: query
  836. defp restrict_replies(query, %{exclude_replies: true}) do
  837. from(
  838. [activity, object] in query,
  839. where:
  840. fragment("?->>'inReplyTo' is null or ?->>'type' = 'Announce'", object.data, activity.data)
  841. )
  842. end
  843. defp restrict_replies(query, %{
  844. reply_filtering_user: %User{} = user,
  845. reply_visibility: "self"
  846. }) do
  847. from(
  848. [activity, object] in query,
  849. where:
  850. fragment(
  851. "?->>'inReplyTo' is null OR ? = ANY(?)",
  852. object.data,
  853. ^user.ap_id,
  854. activity.recipients
  855. )
  856. )
  857. end
  858. defp restrict_replies(query, %{
  859. reply_filtering_user: %User{} = user,
  860. reply_visibility: "following"
  861. }) do
  862. from(
  863. [activity, object] in query,
  864. where:
  865. fragment(
  866. """
  867. ?->>'type' != 'Create' -- This isn't a Create
  868. OR ?->>'inReplyTo' is null -- this isn't a reply
  869. OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
  870. -- unless they are the author (because authors
  871. -- are also part of the recipients). This leads
  872. -- to a bug that self-replies by friends won't
  873. -- show up.
  874. OR ? = ? -- The actor is us
  875. """,
  876. activity.data,
  877. object.data,
  878. ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
  879. activity.recipients,
  880. activity.actor,
  881. activity.actor,
  882. ^user.ap_id
  883. )
  884. )
  885. end
  886. defp restrict_replies(query, _), do: query
  887. defp restrict_reblogs(query, %{exclude_reblogs: true}) do
  888. from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
  889. end
  890. defp restrict_reblogs(query, _), do: query
  891. defp restrict_muted(query, %{with_muted: true}), do: query
  892. defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
  893. mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
  894. query =
  895. from([activity] in query,
  896. where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
  897. where:
  898. fragment(
  899. "not (?->'to' \\?| ?) or ? = ?",
  900. activity.data,
  901. ^mutes,
  902. activity.actor,
  903. ^user.ap_id
  904. )
  905. )
  906. unless opts[:skip_preload] do
  907. from([thread_mute: tm] in query, where: is_nil(tm.user_id))
  908. else
  909. query
  910. end
  911. end
  912. defp restrict_muted(query, _), do: query
  913. defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
  914. blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
  915. domain_blocks = user.domain_blocks || []
  916. following_ap_ids = User.get_friends_ap_ids(user)
  917. query =
  918. if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
  919. from(
  920. [activity, object: o] in query,
  921. # You don't block the author
  922. where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
  923. # You don't block any recipients, and didn't author the post
  924. where:
  925. fragment(
  926. "((not (? && ?)) or ? = ?)",
  927. activity.recipients,
  928. ^blocked_ap_ids,
  929. activity.actor,
  930. ^user.ap_id
  931. ),
  932. # You don't block the domain of any recipients, and didn't author the post
  933. where:
  934. fragment(
  935. "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
  936. activity.recipients,
  937. ^domain_blocks,
  938. activity.actor,
  939. ^user.ap_id
  940. ),
  941. # It's not a boost of a user you block
  942. where:
  943. fragment(
  944. "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
  945. activity.data,
  946. activity.data,
  947. ^blocked_ap_ids
  948. ),
  949. # You don't block the author's domain, and also don't follow the author
  950. where:
  951. fragment(
  952. "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
  953. activity.actor,
  954. ^domain_blocks,
  955. activity.actor,
  956. ^following_ap_ids
  957. ),
  958. # Same as above, but checks the Object
  959. where:
  960. fragment(
  961. "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
  962. o.data,
  963. ^domain_blocks,
  964. o.data,
  965. ^following_ap_ids
  966. )
  967. )
  968. end
  969. defp restrict_blocked(query, _), do: query
  970. defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
  971. if Config.get([:activitypub, :blockers_visible]) == true do
  972. query
  973. else
  974. blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
  975. from(
  976. activity in query,
  977. # The author doesn't block you
  978. where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
  979. # It's not a boost of a user that blocks you
  980. where:
  981. fragment(
  982. "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
  983. activity.data,
  984. activity.data,
  985. ^blocker_ap_ids
  986. )
  987. )
  988. end
  989. end
  990. defp restrict_blockers_visibility(query, _), do: query
  991. defp restrict_unlisted(query, %{restrict_unlisted: true}) do
  992. from(
  993. activity in query,
  994. where:
  995. fragment(
  996. "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
  997. activity.data,
  998. ^[Constants.as_public()]
  999. )
  1000. )
  1001. end
  1002. defp restrict_unlisted(query, _), do: query
  1003. defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
  1004. from(
  1005. [activity, object: o] in query,
  1006. where:
  1007. fragment(
  1008. "(?)->>'type' = 'Create' and associated_object_id((?)) = any (?)",
  1009. activity.data,
  1010. activity.data,
  1011. ^ids
  1012. )
  1013. )
  1014. end
  1015. defp restrict_pinned(query, _), do: query
  1016. defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
  1017. muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
  1018. from(
  1019. activity in query,
  1020. where:
  1021. fragment(
  1022. "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
  1023. activity.data,
  1024. activity.actor,
  1025. ^muted_reblogs
  1026. )
  1027. )
  1028. end
  1029. defp restrict_muted_reblogs(query, _), do: query
  1030. defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
  1031. from(
  1032. activity in query,
  1033. where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
  1034. )
  1035. end
  1036. defp restrict_instance(query, _), do: query
  1037. defp restrict_filtered(query, %{user: %User{} = user}) do
  1038. case Filter.compose_regex(user) do
  1039. nil ->
  1040. query
  1041. regex ->
  1042. from([activity, object] in query,
  1043. where:
  1044. fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
  1045. activity.actor == ^user.ap_id
  1046. )
  1047. end
  1048. end
  1049. defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
  1050. restrict_filtered(query, %{user: user})
  1051. end
  1052. defp restrict_filtered(query, _), do: query
  1053. defp restrict_unauthenticated(query, nil) do
  1054. local = Config.restrict_unauthenticated_access?(:activities, :local)
  1055. remote = Config.restrict_unauthenticated_access?(:activities, :remote)
  1056. cond do
  1057. local and remote ->
  1058. from(activity in query, where: false)
  1059. local ->
  1060. from(activity in query, where: activity.local == false)
  1061. remote ->
  1062. from(activity in query, where: activity.local == true)
  1063. true ->
  1064. query
  1065. end
  1066. end
  1067. defp restrict_unauthenticated(query, _), do: query
  1068. defp restrict_quote_url(query, %{quote_url: quote_url}) do
  1069. from([_activity, object] in query,
  1070. where: fragment("(?)->'quoteUrl' = ?", object.data, ^quote_url)
  1071. )
  1072. end
  1073. defp restrict_quote_url(query, _), do: query
  1074. defp restrict_rule(query, %{rule_id: rule_id}) do
  1075. from(
  1076. activity in query,
  1077. where: fragment("(?)->'rules' \\? (?)", activity.data, ^rule_id)
  1078. )
  1079. end
  1080. defp restrict_rule(query, _), do: query
  1081. defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
  1082. defp exclude_poll_votes(query, _) do
  1083. if has_named_binding?(query, :object) do
  1084. from([activity, object: o] in query,
  1085. where: fragment("not(?->>'type' = ?)", o.data, "Answer")
  1086. )
  1087. else
  1088. query
  1089. end
  1090. end
  1091. defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
  1092. defp exclude_chat_messages(query, _) do
  1093. if has_named_binding?(query, :object) do
  1094. from([activity, object: o] in query,
  1095. where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
  1096. )
  1097. else
  1098. query
  1099. end
  1100. end
  1101. defp exclude_invisible_actors(query, %{type: "Flag"}), do: query
  1102. defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
  1103. defp exclude_invisible_actors(query, _opts) do
  1104. query
  1105. |> join(:inner, [activity], u in User,
  1106. as: :u,
  1107. on: activity.actor == u.ap_id and u.invisible == false
  1108. )
  1109. end
  1110. defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
  1111. from(activity in query, where: activity.id != ^id)
  1112. end
  1113. defp exclude_id(query, _), do: query
  1114. defp maybe_preload_objects(query, %{skip_preload: true}), do: query
  1115. defp maybe_preload_objects(query, _) do
  1116. query
  1117. |> Activity.with_preloaded_object()
  1118. end
  1119. defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
  1120. defp maybe_preload_bookmarks(query, opts) do
  1121. query
  1122. |> Activity.with_preloaded_bookmark(opts[:user])
  1123. end
  1124. defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
  1125. query
  1126. |> Activity.with_preloaded_report_notes()
  1127. end
  1128. defp maybe_preload_report_notes(query, _), do: query
  1129. defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
  1130. defp maybe_set_thread_muted_field(query, opts) do
  1131. query
  1132. |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
  1133. end
  1134. defp maybe_order(query, %{order: :desc}) do
  1135. query
  1136. |> order_by(desc: :id)
  1137. end
  1138. defp maybe_order(query, %{order: :asc}) do
  1139. query
  1140. |> order_by(asc: :id)
  1141. end
  1142. defp maybe_order(query, _), do: query
  1143. defp normalize_fetch_activities_query_opts(opts) do
  1144. Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
  1145. case opts[key] do
  1146. value when is_bitstring(value) ->
  1147. Map.put(opts, key, Hashtag.normalize_name(value))
  1148. value when is_list(value) ->
  1149. normalized_value =
  1150. value
  1151. |> Enum.map(&Hashtag.normalize_name/1)
  1152. |> Enum.uniq()
  1153. Map.put(opts, key, normalized_value)
  1154. _ ->
  1155. opts
  1156. end
  1157. end)
  1158. end
  1159. defp fetch_activities_query_ap_ids_ops(opts) do
  1160. source_user = opts[:muting_user]
  1161. ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
  1162. ap_id_relationships =
  1163. if opts[:blocking_user] && opts[:blocking_user] == source_user do
  1164. [:block | ap_id_relationships]
  1165. else
  1166. ap_id_relationships
  1167. end
  1168. preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
  1169. restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
  1170. restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
  1171. restrict_muted_reblogs_opts =
  1172. Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
  1173. {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
  1174. end
  1175. def fetch_activities_query(recipients, opts \\ %{}) do
  1176. opts = normalize_fetch_activities_query_opts(opts)
  1177. {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
  1178. fetch_activities_query_ap_ids_ops(opts)
  1179. config = %{
  1180. skip_thread_containment: Config.get([:instance, :skip_thread_containment])
  1181. }
  1182. query =
  1183. Activity
  1184. |> maybe_preload_objects(opts)
  1185. |> maybe_preload_bookmarks(opts)
  1186. |> maybe_preload_report_notes(opts)
  1187. |> maybe_set_thread_muted_field(opts)
  1188. |> maybe_order(opts)
  1189. |> restrict_recipients_or_hashtags(recipients, opts[:user], opts[:followed_hashtags])
  1190. |> restrict_replies(opts)
  1191. |> restrict_since(opts)
  1192. |> restrict_local(opts)
  1193. |> restrict_remote(opts)
  1194. |> restrict_actor(opts)
  1195. |> restrict_type(opts)
  1196. |> restrict_state(opts)
  1197. |> restrict_favorited_by(opts)
  1198. |> restrict_blocked(restrict_blocked_opts)
  1199. |> restrict_blockers_visibility(opts)
  1200. |> restrict_muted(restrict_muted_opts)
  1201. |> restrict_filtered(opts)
  1202. |> restrict_media(opts)
  1203. |> restrict_visibility(opts)
  1204. |> restrict_thread_visibility(opts, config)
  1205. |> restrict_reblogs(opts)
  1206. |> restrict_pinned(opts)
  1207. |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
  1208. |> restrict_instance(opts)
  1209. |> restrict_announce_object_actor(opts)
  1210. |> restrict_filtered(opts)
  1211. |> restrict_rule(opts)
  1212. |> restrict_quote_url(opts)
  1213. |> maybe_restrict_deactivated_users(opts)
  1214. |> exclude_poll_votes(opts)
  1215. |> exclude_chat_messages(opts)
  1216. |> exclude_invisible_actors(opts)
  1217. |> exclude_visibility(opts)
  1218. if Config.feature_enabled?(:improved_hashtag_timeline) do
  1219. query
  1220. |> restrict_hashtag_any(opts)
  1221. |> restrict_hashtag_all(opts)
  1222. |> restrict_hashtag_reject_any(opts)
  1223. else
  1224. query
  1225. |> restrict_embedded_tag_any(opts)
  1226. |> restrict_embedded_tag_all(opts)
  1227. |> restrict_embedded_tag_reject_any(opts)
  1228. end
  1229. end
  1230. @doc """
  1231. Fetch favorites activities of user with order by sort adds to favorites
  1232. """
  1233. @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
  1234. def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
  1235. user.ap_id
  1236. |> Activity.Queries.by_actor()
  1237. |> Activity.Queries.by_type("Like")
  1238. |> Activity.with_joined_object()
  1239. |> Object.with_joined_activity()
  1240. |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
  1241. |> order_by([like, _, _], desc_nulls_last: like.id)
  1242. |> Pagination.fetch_paginated(
  1243. Map.merge(params, %{skip_order: true}),
  1244. pagination
  1245. )
  1246. end
  1247. defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
  1248. Enum.map(activities, fn
  1249. %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
  1250. if Enum.any?(bcc, &(&1 in list_memberships)) do
  1251. update_in(activity.data["cc"], &[user_ap_id | &1])
  1252. else
  1253. activity
  1254. end
  1255. activity ->
  1256. activity
  1257. end)
  1258. end
  1259. defp maybe_update_cc(activities, _, _), do: activities
  1260. defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
  1261. from(activity in query,
  1262. where:
  1263. fragment("? && ?", activity.recipients, ^recipients) or
  1264. (fragment("? && ?", activity.recipients, ^recipients_with_public) and
  1265. ^Constants.as_public() in activity.recipients)
  1266. )
  1267. end
  1268. def fetch_activities_bounded(
  1269. recipients,
  1270. recipients_with_public,
  1271. opts \\ %{},
  1272. pagination \\ :keyset
  1273. ) do
  1274. fetch_activities_query([], opts)
  1275. |> fetch_activities_bounded_query(recipients, recipients_with_public)
  1276. |> Pagination.fetch_paginated(opts, pagination)
  1277. |> Enum.reverse()
  1278. end
  1279. @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
  1280. def upload(file, opts \\ []) do
  1281. with {:ok, data} <- Upload.store(sanitize_upload_file(file), opts) do
  1282. obj_data = Maps.put_if_present(data, "actor", opts[:actor])
  1283. Repo.insert(%Object{data: obj_data})
  1284. end
  1285. end
  1286. defp sanitize_upload_file(%Plug.Upload{filename: filename} = upload) when is_binary(filename) do
  1287. %Plug.Upload{
  1288. upload
  1289. | filename: Path.basename(filename)
  1290. }
  1291. end
  1292. defp sanitize_upload_file(upload), do: upload
  1293. @spec get_actor_url(any()) :: binary() | nil
  1294. defp get_actor_url(url) when is_binary(url), do: url
  1295. defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
  1296. defp get_actor_url(url) when is_list(url) do
  1297. url
  1298. |> List.first()
  1299. |> get_actor_url()
  1300. end
  1301. defp get_actor_url(_url), do: nil
  1302. defp normalize_image(%{"url" => url} = data) do
  1303. %{
  1304. "type" => "Image",
  1305. "url" => [%{"href" => url}]
  1306. }
  1307. |> maybe_put_description(data)
  1308. end
  1309. defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
  1310. defp normalize_image(_), do: nil
  1311. defp maybe_put_description(map, %{"name" => description}) when is_binary(description) do
  1312. Map.put(map, "name", description)
  1313. end
  1314. defp maybe_put_description(map, _), do: map
  1315. defp object_to_user_data(data, additional) do
  1316. fields =
  1317. data
  1318. |> Map.get("attachment", [])
  1319. |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
  1320. |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
  1321. emojis =
  1322. data
  1323. |> Map.get("tag", [])
  1324. |> Enum.filter(fn
  1325. %{"type" => "Emoji"} -> true
  1326. _ -> false
  1327. end)
  1328. |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
  1329. {String.trim(name, ":"), url}
  1330. end)
  1331. is_locked = data["manuallyApprovesFollowers"] || false
  1332. capabilities = data["capabilities"] || %{}
  1333. accepts_chat_messages = capabilities["acceptsChatMessages"]
  1334. data = Transmogrifier.maybe_fix_user_object(data)
  1335. is_discoverable = data["discoverable"] || false
  1336. invisible = data["invisible"] || false
  1337. actor_type = data["type"] || "Person"
  1338. featured_address = data["featured"]
  1339. {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
  1340. public_key =
  1341. if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
  1342. data["publicKey"]["publicKeyPem"]
  1343. end
  1344. shared_inbox =
  1345. if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
  1346. data["endpoints"]["sharedInbox"]
  1347. end
  1348. birthday =
  1349. if is_binary(data["vcard:bday"]) do
  1350. case Date.from_iso8601(data["vcard:bday"]) do
  1351. {:ok, date} -> date
  1352. {:error, _} -> nil
  1353. end
  1354. end
  1355. show_birthday = !!birthday
  1356. # if WebFinger request was already done, we probably have acct, otherwise
  1357. # we request WebFinger here
  1358. nickname = additional[:nickname_from_acct] || generate_nickname(data)
  1359. %{
  1360. ap_id: data["id"],
  1361. uri: get_actor_url(data["url"]),
  1362. banner: normalize_image(data["image"]),
  1363. fields: fields,
  1364. emoji: emojis,
  1365. is_locked: is_locked,
  1366. is_discoverable: is_discoverable,
  1367. invisible: invisible,
  1368. avatar: normalize_image(data["icon"]),
  1369. name: data["name"],
  1370. follower_address: data["followers"],
  1371. following_address: data["following"],
  1372. featured_address: featured_address,
  1373. bio: data["summary"] || "",
  1374. actor_type: actor_type,
  1375. also_known_as: Map.get(data, "alsoKnownAs", []),
  1376. public_key: public_key,
  1377. inbox: data["inbox"],
  1378. shared_inbox: shared_inbox,
  1379. accepts_chat_messages: accepts_chat_messages,
  1380. birthday: birthday,
  1381. show_birthday: show_birthday,
  1382. pinned_objects: pinned_objects,
  1383. nickname: nickname
  1384. }
  1385. end
  1386. defp generate_nickname(%{"preferredUsername" => username} = data) when is_binary(username) do
  1387. generated = "#{username}@#{URI.parse(data["id"]).host}"
  1388. if Config.get([WebFinger, :update_nickname_on_user_fetch]) do
  1389. case WebFinger.finger(generated) do
  1390. {:ok, %{"subject" => "acct:" <> acct}} -> acct
  1391. _ -> generated
  1392. end
  1393. else
  1394. generated
  1395. end
  1396. end
  1397. # nickname can be nil because of virtual actors
  1398. defp generate_nickname(_), do: nil
  1399. def fetch_follow_information_for_user(user) do
  1400. with {:ok, following_data} <-
  1401. Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
  1402. {:ok, hide_follows} <- collection_private(following_data),
  1403. {:ok, followers_data} <-
  1404. Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
  1405. {:ok, hide_followers} <- collection_private(followers_data) do
  1406. {:ok,
  1407. %{
  1408. hide_follows: hide_follows,
  1409. follower_count: normalize_counter(followers_data["totalItems"]),
  1410. following_count: normalize_counter(following_data["totalItems"]),
  1411. hide_followers: hide_followers
  1412. }}
  1413. else
  1414. {:error, _} = e -> e
  1415. end
  1416. end
  1417. defp normalize_counter(counter) when is_integer(counter), do: counter
  1418. defp normalize_counter(_), do: 0
  1419. def maybe_update_follow_information(user_data) do
  1420. with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
  1421. {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
  1422. {_, true} <-
  1423. {:collections_available,
  1424. !!(user_data[:following_address] && user_data[:follower_address])},
  1425. {:ok, info} <-
  1426. fetch_follow_information_for_user(user_data) do
  1427. info = Map.merge(user_data[:info] || %{}, info)
  1428. user_data
  1429. |> Map.put(:info, info)
  1430. else
  1431. {:user_type_check, false} ->
  1432. user_data
  1433. {:collections_available, false} ->
  1434. user_data
  1435. {:enabled, false} ->
  1436. user_data
  1437. e ->
  1438. Logger.error(
  1439. "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
  1440. )
  1441. user_data
  1442. end
  1443. end
  1444. defp collection_private(%{"first" => %{"type" => type}})
  1445. when type in ["CollectionPage", "OrderedCollectionPage"],
  1446. do: {:ok, false}
  1447. defp collection_private(%{"first" => first}) do
  1448. with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
  1449. Fetcher.fetch_and_contain_remote_object_from_id(first) do
  1450. {:ok, false}
  1451. else
  1452. {:error, _} -> {:ok, true}
  1453. end
  1454. end
  1455. defp collection_private(_data), do: {:ok, true}
  1456. def user_data_from_user_object(data, additional \\ []) do
  1457. with {:ok, data} <- MRF.filter(data) do
  1458. {:ok, object_to_user_data(data, additional)}
  1459. else
  1460. e -> {:error, e}
  1461. end
  1462. end
  1463. defp fetch_and_prepare_user_from_ap_id(ap_id, additional) do
  1464. with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
  1465. {:ok, data} <- user_data_from_user_object(data, additional) do
  1466. {:ok, maybe_update_follow_information(data)}
  1467. else
  1468. # If this has been deleted, only log a debug and not an error
  1469. {:error, "Object has been deleted" = e} ->
  1470. Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
  1471. {:error, e}
  1472. {:error, {:reject, reason} = e} ->
  1473. Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
  1474. {:error, e}
  1475. {:error, e} ->
  1476. Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
  1477. {:error, e}
  1478. end
  1479. end
  1480. def maybe_handle_clashing_nickname(data) do
  1481. with nickname when is_binary(nickname) <- data[:nickname],
  1482. %User{} = old_user <- User.get_by_nickname(nickname),
  1483. {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
  1484. Logger.info(
  1485. "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
  1486. )
  1487. old_user
  1488. |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
  1489. |> User.update_and_set_cache()
  1490. else
  1491. {:ap_id_comparison, true} ->
  1492. Logger.info(
  1493. "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
  1494. )
  1495. _ ->
  1496. nil
  1497. end
  1498. end
  1499. def pin_data_from_featured_collection(%{
  1500. "type" => type,
  1501. "orderedItems" => objects
  1502. })
  1503. when type in ["OrderedCollection", "Collection"] do
  1504. Map.new(objects, fn
  1505. %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
  1506. object_ap_id when is_binary(object_ap_id) -> {object_ap_id, NaiveDateTime.utc_now()}
  1507. end)
  1508. end
  1509. def pin_data_from_featured_collection(obj) do
  1510. Logger.error("Could not parse featured collection #{inspect(obj)}")
  1511. %{}
  1512. end
  1513. def fetch_and_prepare_featured_from_ap_id(nil) do
  1514. {:ok, %{}}
  1515. end
  1516. def fetch_and_prepare_featured_from_ap_id(ap_id) do
  1517. with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
  1518. {:ok, pin_data_from_featured_collection(data)}
  1519. else
  1520. e ->
  1521. Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
  1522. {:ok, %{}}
  1523. end
  1524. end
  1525. def enqueue_pin_fetches(%{pinned_objects: pins}) do
  1526. # enqueue a task to fetch all pinned objects
  1527. Enum.each(pins, fn {ap_id, _} ->
  1528. if is_nil(Object.get_cached_by_ap_id(ap_id)) do
  1529. Pleroma.Workers.RemoteFetcherWorker.new(%{
  1530. "op" => "fetch_remote",
  1531. "id" => ap_id,
  1532. "depth" => 1
  1533. })
  1534. |> Oban.insert()
  1535. end
  1536. end)
  1537. end
  1538. def enqueue_pin_fetches(_), do: nil
  1539. def make_user_from_ap_id(ap_id, additional \\ []) do
  1540. user = User.get_cached_by_ap_id(ap_id)
  1541. with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
  1542. enqueue_pin_fetches(data)
  1543. if user do
  1544. user
  1545. |> User.remote_user_changeset(data)
  1546. |> User.update_and_set_cache()
  1547. else
  1548. maybe_handle_clashing_nickname(data)
  1549. data
  1550. |> User.remote_user_changeset()
  1551. |> Repo.insert()
  1552. |> User.set_cache()
  1553. end
  1554. end
  1555. end
  1556. def make_user_from_nickname(nickname) do
  1557. with {:ok, %{"ap_id" => ap_id, "subject" => "acct:" <> acct}} when not is_nil(ap_id) <-
  1558. WebFinger.finger(nickname) do
  1559. make_user_from_ap_id(ap_id, nickname_from_acct: acct)
  1560. else
  1561. _e -> {:error, "No AP id in WebFinger"}
  1562. end
  1563. end
  1564. # filter out broken threads
  1565. defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
  1566. entire_thread_visible_for_user?(activity, user)
  1567. end
  1568. # do post-processing on a specific activity
  1569. def contain_activity(%Activity{} = activity, %User{} = user) do
  1570. contain_broken_threads(activity, user)
  1571. end
  1572. def fetch_direct_messages_query do
  1573. Activity
  1574. |> restrict_type(%{type: "Create"})
  1575. |> restrict_visibility(%{visibility: "direct"})
  1576. |> order_by([activity], asc: activity.id)
  1577. end
  1578. defp maybe_restrict_deactivated_users(activity, %{type: "Flag"}), do: activity
  1579. defp maybe_restrict_deactivated_users(activity, _opts),
  1580. do: Activity.restrict_deactivated_users(activity)
  1581. end