logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma git clone https://hacktivis.me/git/pleroma.git

activity_pub.ex (55871B)


  1. # Pleroma: A lightweight social networking server
  2. # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
  3. # SPDX-License-Identifier: AGPL-3.0-only
  4. defmodule Pleroma.Web.ActivityPub.ActivityPub do
  5. alias Pleroma.Activity
  6. alias Pleroma.Activity.Ir.Topics
  7. alias Pleroma.Config
  8. alias Pleroma.Constants
  9. alias Pleroma.Conversation
  10. alias Pleroma.Conversation.Participation
  11. alias Pleroma.Filter
  12. alias Pleroma.Hashtag
  13. alias Pleroma.Maps
  14. alias Pleroma.Notification
  15. alias Pleroma.Object
  16. alias Pleroma.Object.Containment
  17. alias Pleroma.Object.Fetcher
  18. alias Pleroma.Pagination
  19. alias Pleroma.Repo
  20. alias Pleroma.Upload
  21. alias Pleroma.User
  22. alias Pleroma.Web.ActivityPub.MRF
  23. alias Pleroma.Web.ActivityPub.Transmogrifier
  24. alias Pleroma.Web.Streamer
  25. alias Pleroma.Web.WebFinger
  26. alias Pleroma.Workers.BackgroundWorker
  27. alias Pleroma.Workers.PollWorker
  28. import Ecto.Query
  29. import Pleroma.Web.ActivityPub.Utils
  30. import Pleroma.Web.ActivityPub.Visibility
  31. require Logger
  32. require Pleroma.Constants
  33. @behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
  34. @behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
  35. defp get_recipients(%{"type" => "Create"} = data) do
  36. to = Map.get(data, "to", [])
  37. cc = Map.get(data, "cc", [])
  38. bcc = Map.get(data, "bcc", [])
  39. actor = Map.get(data, "actor", [])
  40. recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
  41. {recipients, to, cc}
  42. end
  43. defp get_recipients(data) do
  44. to = Map.get(data, "to", [])
  45. cc = Map.get(data, "cc", [])
  46. bcc = Map.get(data, "bcc", [])
  47. recipients = Enum.concat([to, cc, bcc])
  48. {recipients, to, cc}
  49. end
  50. defp check_actor_can_insert(%{"type" => "Delete"}), do: true
  51. defp check_actor_can_insert(%{"type" => "Undo"}), do: true
  52. defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
  53. case User.get_cached_by_ap_id(actor) do
  54. %User{is_active: true} -> true
  55. _ -> false
  56. end
  57. end
  58. defp check_actor_can_insert(_), do: true
  59. defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
  60. limit = Config.get([:instance, :remote_limit])
  61. String.length(content) <= limit
  62. end
  63. defp check_remote_limit(_), do: true
  64. def increase_note_count_if_public(actor, object) do
  65. if public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
  66. end
  67. def decrease_note_count_if_public(actor, object) do
  68. if public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
  69. end
  70. def update_last_status_at_if_public(actor, object) do
  71. if public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
  72. end
  73. defp increase_replies_count_if_reply(%{
  74. "object" => %{"inReplyTo" => reply_ap_id} = object,
  75. "type" => "Create"
  76. }) do
  77. if public?(object) do
  78. Object.increase_replies_count(reply_ap_id)
  79. end
  80. end
  81. defp increase_replies_count_if_reply(_create_data), do: :noop
  82. defp increase_quotes_count_if_quote(%{
  83. "object" => %{"quoteUrl" => quote_ap_id} = object,
  84. "type" => "Create"
  85. }) do
  86. if public?(object) do
  87. Object.increase_quotes_count(quote_ap_id)
  88. end
  89. end
  90. defp increase_quotes_count_if_quote(_create_data), do: :noop
  91. @object_types ~w[ChatMessage Question Answer Audio Video Image Event Article Note Page]
  92. @impl true
  93. def persist(%{"type" => type} = object, meta) when type in @object_types do
  94. with {:ok, object} <- Object.create(object) do
  95. {:ok, object, meta}
  96. end
  97. end
  98. @impl true
  99. def persist(object, meta) do
  100. with local <- Keyword.fetch!(meta, :local),
  101. {recipients, _, _} <- get_recipients(object),
  102. {:ok, activity} <-
  103. Repo.insert(%Activity{
  104. data: object,
  105. local: local,
  106. recipients: recipients,
  107. actor: object["actor"]
  108. }),
  109. # TODO: add tests for expired activities, when Note type will be supported in new pipeline
  110. {:ok, _} <- maybe_create_activity_expiration(activity) do
  111. {:ok, activity, meta}
  112. end
  113. end
  114. @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
  115. def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
  116. with nil <- Activity.normalize(map),
  117. map <- lazy_put_activity_defaults(map, fake),
  118. {_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
  119. {_, true} <- {:remote_limit_pass, check_remote_limit(map)},
  120. {:ok, map} <- MRF.filter(map),
  121. {recipients, _, _} = get_recipients(map),
  122. {:fake, false, map, recipients} <- {:fake, fake, map, recipients},
  123. {:containment, :ok} <- {:containment, Containment.contain_child(map)},
  124. {:ok, map, object} <- insert_full_object(map),
  125. {:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
  126. # Splice in the child object if we have one.
  127. activity = Maps.put_if_present(activity, :object, object)
  128. ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
  129. Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
  130. end)
  131. # Add local posts to search index
  132. if local, do: Pleroma.Search.add_to_index(activity)
  133. {:ok, activity}
  134. else
  135. %Activity{} = activity ->
  136. {:ok, activity}
  137. {:actor_check, _} ->
  138. {:error, false}
  139. {:containment, _} = error ->
  140. error
  141. {:error, _} = error ->
  142. error
  143. {:fake, true, map, recipients} ->
  144. activity = %Activity{
  145. data: map,
  146. local: local,
  147. actor: map["actor"],
  148. recipients: recipients,
  149. id: "pleroma:fakeid"
  150. }
  151. Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity)
  152. {:ok, activity}
  153. {:remote_limit_pass, _} ->
  154. {:error, :remote_limit}
  155. {:reject, _} = e ->
  156. {:error, e}
  157. end
  158. end
  159. defp insert_activity_with_expiration(data, local, recipients) do
  160. struct = %Activity{
  161. data: data,
  162. local: local,
  163. actor: data["actor"],
  164. recipients: recipients
  165. }
  166. with {:ok, activity} <- Repo.insert(struct) do
  167. maybe_create_activity_expiration(activity)
  168. end
  169. end
  170. def notify_and_stream(activity) do
  171. Notification.create_notifications(activity)
  172. original_activity =
  173. case activity do
  174. %{data: %{"type" => "Update"}, object: %{data: %{"id" => id}}} ->
  175. Activity.get_create_by_object_ap_id_with_object(id)
  176. _ ->
  177. activity
  178. end
  179. conversation = create_or_bump_conversation(original_activity, original_activity.actor)
  180. participations = get_participations(conversation)
  181. stream_out(activity)
  182. stream_out_participations(participations)
  183. end
  184. defp maybe_create_activity_expiration(
  185. %{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
  186. ) do
  187. with {:ok, _job} <-
  188. Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
  189. activity_id: activity.id,
  190. expires_at: expires_at
  191. }) do
  192. {:ok, activity}
  193. end
  194. end
  195. defp maybe_create_activity_expiration(activity), do: {:ok, activity}
  196. defp create_or_bump_conversation(activity, actor) do
  197. with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
  198. %User{} = user <- User.get_cached_by_ap_id(actor) do
  199. Participation.mark_as_read(user, conversation)
  200. {:ok, conversation}
  201. end
  202. end
  203. defp get_participations({:ok, conversation}) do
  204. conversation
  205. |> Repo.preload(:participations, force: true)
  206. |> Map.get(:participations)
  207. end
  208. defp get_participations(_), do: []
  209. def stream_out_participations(participations) do
  210. participations =
  211. participations
  212. |> Repo.preload(:user)
  213. Streamer.stream("participation", participations)
  214. end
  215. @impl true
  216. def stream_out_participations(%Object{data: %{"context" => context}}, user) do
  217. with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
  218. conversation = Repo.preload(conversation, :participations)
  219. last_activity_id =
  220. fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
  221. user: user,
  222. blocking_user: user
  223. })
  224. if last_activity_id do
  225. stream_out_participations(conversation.participations)
  226. end
  227. end
  228. end
  229. @impl true
  230. def stream_out_participations(_, _), do: :noop
  231. @impl true
  232. def stream_out(%Activity{data: %{"type" => data_type}} = activity)
  233. when data_type in ["Create", "Announce", "Delete", "Update"] do
  234. activity
  235. |> Topics.get_activity_topics()
  236. |> Streamer.stream(activity)
  237. end
  238. @impl true
  239. def stream_out(_activity) do
  240. :noop
  241. end
  242. @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
  243. def create(params, fake \\ false) do
  244. with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
  245. result
  246. end
  247. end
  248. defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
  249. additional = params[:additional] || %{}
  250. # only accept false as false value
  251. local = !(params[:local] == false)
  252. published = params[:published]
  253. quick_insert? = Config.get([:env]) == :benchmark
  254. create_data =
  255. make_create_data(
  256. %{to: to, actor: actor, published: published, context: context, object: object},
  257. additional
  258. )
  259. with {:ok, activity} <- insert(create_data, local, fake),
  260. {:fake, false, activity} <- {:fake, fake, activity},
  261. _ <- increase_replies_count_if_reply(create_data),
  262. _ <- increase_quotes_count_if_quote(create_data),
  263. {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
  264. {:ok, _actor} <- increase_note_count_if_public(actor, activity),
  265. {:ok, _actor} <- update_last_status_at_if_public(actor, activity),
  266. _ <- notify_and_stream(activity),
  267. :ok <- maybe_schedule_poll_notifications(activity),
  268. :ok <- maybe_handle_group_posts(activity),
  269. :ok <- maybe_federate(activity) do
  270. {:ok, activity}
  271. else
  272. {:quick_insert, true, activity} ->
  273. {:ok, activity}
  274. {:fake, true, activity} ->
  275. {:ok, activity}
  276. {:error, message} ->
  277. Repo.rollback(message)
  278. end
  279. end
  280. defp maybe_schedule_poll_notifications(activity) do
  281. PollWorker.schedule_poll_end(activity)
  282. :ok
  283. end
  284. @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
  285. def listen(%{to: to, actor: actor, context: context, object: object} = params) do
  286. additional = params[:additional] || %{}
  287. # only accept false as false value
  288. local = !(params[:local] == false)
  289. published = params[:published]
  290. listen_data =
  291. make_listen_data(
  292. %{to: to, actor: actor, published: published, context: context, object: object},
  293. additional
  294. )
  295. with {:ok, activity} <- insert(listen_data, local),
  296. _ <- notify_and_stream(activity),
  297. :ok <- maybe_federate(activity) do
  298. {:ok, activity}
  299. end
  300. end
  301. @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
  302. {:ok, Activity.t()} | nil | {:error, any()}
  303. def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
  304. with {:ok, result} <-
  305. Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
  306. result
  307. end
  308. end
  309. defp do_unfollow(follower, followed, activity_id, local) do
  310. with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
  311. {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
  312. unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
  313. {:ok, activity} <- insert(unfollow_data, local),
  314. _ <- notify_and_stream(activity),
  315. :ok <- maybe_federate(activity) do
  316. {:ok, activity}
  317. else
  318. nil -> nil
  319. {:error, error} -> Repo.rollback(error)
  320. end
  321. end
  322. @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
  323. def flag(params) do
  324. with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
  325. result
  326. end
  327. end
  328. defp do_flag(
  329. %{
  330. actor: actor,
  331. context: _context,
  332. account: account,
  333. statuses: statuses,
  334. content: content
  335. } = params
  336. ) do
  337. # only accept false as false value
  338. local = !(params[:local] == false)
  339. forward = !(params[:forward] == false)
  340. additional = params[:additional] || %{}
  341. additional =
  342. if forward do
  343. Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
  344. else
  345. Map.merge(additional, %{"to" => [], "cc" => []})
  346. end
  347. with flag_data <- make_flag_data(params, additional),
  348. {:ok, activity} <- insert(flag_data, local),
  349. {:ok, stripped_activity} <- strip_report_status_data(activity),
  350. _ <- notify_and_stream(activity),
  351. :ok <-
  352. maybe_federate(stripped_activity) do
  353. User.all_users_with_privilege(:reports_manage_reports)
  354. |> Enum.filter(fn user -> user.ap_id != actor end)
  355. |> Enum.filter(fn user -> not is_nil(user.email) end)
  356. |> Enum.each(fn privileged_user ->
  357. privileged_user
  358. |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
  359. |> Pleroma.Emails.Mailer.deliver_async()
  360. end)
  361. {:ok, activity}
  362. else
  363. {:error, error} -> Repo.rollback(error)
  364. end
  365. end
  366. @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
  367. def move(%User{} = origin, %User{} = target, local \\ true) do
  368. params = %{
  369. "type" => "Move",
  370. "actor" => origin.ap_id,
  371. "object" => origin.ap_id,
  372. "target" => target.ap_id,
  373. "to" => [origin.follower_address]
  374. }
  375. with true <- origin.ap_id in target.also_known_as,
  376. {:ok, activity} <- insert(params, local),
  377. _ <- notify_and_stream(activity) do
  378. maybe_federate(activity)
  379. BackgroundWorker.enqueue("move_following", %{
  380. "origin_id" => origin.id,
  381. "target_id" => target.id
  382. })
  383. {:ok, activity}
  384. else
  385. false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
  386. err -> err
  387. end
  388. end
  389. def fetch_activities_for_context_query(context, opts) do
  390. public = [Constants.as_public()]
  391. recipients =
  392. if opts[:user],
  393. do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
  394. else: public
  395. from(activity in Activity)
  396. |> maybe_preload_objects(opts)
  397. |> maybe_preload_bookmarks(opts)
  398. |> maybe_set_thread_muted_field(opts)
  399. |> restrict_unauthenticated(opts[:user])
  400. |> restrict_blocked(opts)
  401. |> restrict_blockers_visibility(opts)
  402. |> restrict_recipients(recipients, opts[:user])
  403. |> restrict_filtered(opts)
  404. |> where(
  405. [activity],
  406. fragment(
  407. "?->>'type' = ? and ?->>'context' = ?",
  408. activity.data,
  409. "Create",
  410. activity.data,
  411. ^context
  412. )
  413. )
  414. |> exclude_poll_votes(opts)
  415. |> exclude_id(opts)
  416. |> order_by([activity], desc: activity.id)
  417. end
  418. @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
  419. def fetch_activities_for_context(context, opts \\ %{}) do
  420. context
  421. |> fetch_activities_for_context_query(opts)
  422. |> Repo.all()
  423. end
  424. @spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
  425. Ecto.UUID.t() | nil
  426. def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
  427. context
  428. |> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
  429. |> restrict_visibility(%{visibility: "direct"})
  430. |> limit(1)
  431. |> select([a], a.id)
  432. |> Repo.one()
  433. end
  434. defp fetch_paginated_optimized(query, opts, pagination) do
  435. # Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
  436. # and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
  437. opts = Map.put(opts, :skip_extra_order, true)
  438. Pagination.fetch_paginated(query, opts, pagination)
  439. end
  440. def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
  441. list_memberships = Pleroma.List.memberships(opts[:user])
  442. fetch_activities_query(recipients ++ list_memberships, opts)
  443. |> fetch_paginated_optimized(opts, pagination)
  444. |> Enum.reverse()
  445. |> maybe_update_cc(list_memberships, opts[:user])
  446. end
  447. @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
  448. def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
  449. includes_local_public = Map.get(opts, :includes_local_public, false)
  450. opts = Map.delete(opts, :user)
  451. intended_recipients =
  452. if includes_local_public do
  453. [Constants.as_public(), as_local_public()]
  454. else
  455. [Constants.as_public()]
  456. end
  457. intended_recipients
  458. |> fetch_activities_query(opts)
  459. |> restrict_unlisted(opts)
  460. |> fetch_paginated_optimized(opts, pagination)
  461. end
  462. @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
  463. def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
  464. opts
  465. |> Map.put(:restrict_unlisted, true)
  466. |> fetch_public_or_unlisted_activities(pagination)
  467. end
  468. @valid_visibilities ~w[direct unlisted public private]
  469. defp restrict_visibility(query, %{visibility: visibility})
  470. when is_list(visibility) do
  471. if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
  472. from(
  473. a in query,
  474. where:
  475. fragment(
  476. "activity_visibility(?, ?, ?) = ANY (?)",
  477. a.actor,
  478. a.recipients,
  479. a.data,
  480. ^visibility
  481. )
  482. )
  483. else
  484. Logger.error("Could not restrict visibility to #{visibility}")
  485. end
  486. end
  487. defp restrict_visibility(query, %{visibility: visibility})
  488. when visibility in @valid_visibilities do
  489. from(
  490. a in query,
  491. where:
  492. fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
  493. )
  494. end
  495. defp restrict_visibility(_query, %{visibility: visibility})
  496. when visibility not in @valid_visibilities do
  497. Logger.error("Could not restrict visibility to #{visibility}")
  498. end
  499. defp restrict_visibility(query, _visibility), do: query
  500. defp exclude_visibility(query, %{exclude_visibilities: visibility})
  501. when is_list(visibility) do
  502. if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
  503. from(
  504. a in query,
  505. where:
  506. not fragment(
  507. "activity_visibility(?, ?, ?) = ANY (?)",
  508. a.actor,
  509. a.recipients,
  510. a.data,
  511. ^visibility
  512. )
  513. )
  514. else
  515. Logger.error("Could not exclude visibility to #{visibility}")
  516. query
  517. end
  518. end
  519. defp exclude_visibility(query, %{exclude_visibilities: visibility})
  520. when visibility in @valid_visibilities do
  521. from(
  522. a in query,
  523. where:
  524. not fragment(
  525. "activity_visibility(?, ?, ?) = ?",
  526. a.actor,
  527. a.recipients,
  528. a.data,
  529. ^visibility
  530. )
  531. )
  532. end
  533. defp exclude_visibility(query, %{exclude_visibilities: visibility})
  534. when visibility not in [nil | @valid_visibilities] do
  535. Logger.error("Could not exclude visibility to #{visibility}")
  536. query
  537. end
  538. defp exclude_visibility(query, _visibility), do: query
  539. defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
  540. do: query
  541. defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
  542. do: query
  543. defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
  544. local_public = as_local_public()
  545. from(
  546. a in query,
  547. where: fragment("thread_visibility(?, (?)->>'id', ?) = true", ^ap_id, a.data, ^local_public)
  548. )
  549. end
  550. defp restrict_thread_visibility(query, _, _), do: query
  551. def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
  552. params =
  553. params
  554. |> Map.put(:user, reading_user)
  555. |> Map.put(:actor_id, user.ap_id)
  556. %{
  557. godmode: params[:godmode],
  558. reading_user: reading_user
  559. }
  560. |> user_activities_recipients()
  561. |> fetch_activities(params)
  562. |> Enum.reverse()
  563. end
  564. def fetch_user_activities(user, reading_user, params \\ %{})
  565. def fetch_user_activities(user, reading_user, %{total: true} = params) do
  566. result = fetch_activities_for_user(user, reading_user, params)
  567. Keyword.put(result, :items, Enum.reverse(result[:items]))
  568. end
  569. def fetch_user_activities(user, reading_user, params) do
  570. user
  571. |> fetch_activities_for_user(reading_user, params)
  572. |> Enum.reverse()
  573. end
  574. defp fetch_activities_for_user(user, reading_user, params) do
  575. params =
  576. params
  577. |> Map.put(:type, ["Create", "Announce"])
  578. |> Map.put(:user, reading_user)
  579. |> Map.put(:actor_id, user.ap_id)
  580. |> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
  581. params =
  582. if User.blocks?(reading_user, user) do
  583. params
  584. else
  585. params
  586. |> Map.put(:blocking_user, reading_user)
  587. |> Map.put(:muting_user, reading_user)
  588. end
  589. pagination_type = Map.get(params, :pagination_type) || :keyset
  590. %{
  591. godmode: params[:godmode],
  592. reading_user: reading_user
  593. }
  594. |> user_activities_recipients()
  595. |> fetch_activities(params, pagination_type)
  596. end
  597. def fetch_statuses(reading_user, %{total: true} = params) do
  598. result = fetch_activities_for_reading_user(reading_user, params)
  599. Keyword.put(result, :items, Enum.reverse(result[:items]))
  600. end
  601. def fetch_statuses(reading_user, params) do
  602. reading_user
  603. |> fetch_activities_for_reading_user(params)
  604. |> Enum.reverse()
  605. end
  606. defp fetch_activities_for_reading_user(reading_user, params) do
  607. params = Map.put(params, :type, ["Create", "Announce"])
  608. %{
  609. godmode: params[:godmode],
  610. reading_user: reading_user
  611. }
  612. |> user_activities_recipients()
  613. |> fetch_activities(params, :offset)
  614. end
  615. defp user_activities_recipients(%{godmode: true}), do: []
  616. defp user_activities_recipients(%{reading_user: reading_user}) do
  617. if not is_nil(reading_user) and reading_user.local do
  618. [
  619. Constants.as_public(),
  620. as_local_public(),
  621. reading_user.ap_id | User.following(reading_user)
  622. ]
  623. else
  624. [Constants.as_public()]
  625. end
  626. end
  627. defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
  628. raise "Can't use the child object without preloading!"
  629. end
  630. defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
  631. from(
  632. [activity, object] in query,
  633. where:
  634. fragment(
  635. "?->>'type' != ? or ?->>'actor' != ?",
  636. activity.data,
  637. "Announce",
  638. object.data,
  639. ^actor
  640. )
  641. )
  642. end
  643. defp restrict_announce_object_actor(query, _), do: query
  644. defp restrict_since(query, %{since_id: ""}), do: query
  645. defp restrict_since(query, %{since_id: since_id}) do
  646. from(activity in query, where: activity.id > ^since_id)
  647. end
  648. defp restrict_since(query, _), do: query
  649. defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
  650. raise_on_missing_preload()
  651. end
  652. defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
  653. from(
  654. [_activity, object] in query,
  655. where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
  656. )
  657. end
  658. defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
  659. restrict_embedded_tag_any(query, %{tag: tag})
  660. end
  661. defp restrict_embedded_tag_all(query, _), do: query
  662. defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
  663. raise_on_missing_preload()
  664. end
  665. defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
  666. from(
  667. [_activity, object] in query,
  668. where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
  669. )
  670. end
  671. defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
  672. restrict_embedded_tag_any(query, %{tag: [tag]})
  673. end
  674. defp restrict_embedded_tag_any(query, _), do: query
  675. defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
  676. raise_on_missing_preload()
  677. end
  678. defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
  679. from(
  680. [_activity, object] in query,
  681. where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
  682. )
  683. end
  684. defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
  685. when is_binary(tag_reject) do
  686. restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
  687. end
  688. defp restrict_embedded_tag_reject_any(query, _), do: query
  689. defp object_ids_query_for_tags(tags) do
  690. from(hto in "hashtags_objects")
  691. |> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
  692. |> where([hto, ht], ht.name in ^tags)
  693. |> select([hto], hto.object_id)
  694. |> distinct([hto], true)
  695. end
  696. defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
  697. raise_on_missing_preload()
  698. end
  699. defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
  700. restrict_hashtag_any(query, %{tag: single_tag})
  701. end
  702. defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
  703. from(
  704. [_activity, object] in query,
  705. where:
  706. fragment(
  707. """
  708. (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
  709. ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
  710. AND hashtags_objects.object_id = ?) @> ?
  711. """,
  712. ^tags,
  713. object.id,
  714. ^tags
  715. )
  716. )
  717. end
  718. defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
  719. restrict_hashtag_all(query, %{tag_all: [tag]})
  720. end
  721. defp restrict_hashtag_all(query, _), do: query
  722. defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
  723. raise_on_missing_preload()
  724. end
  725. defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
  726. hashtag_ids =
  727. from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
  728. |> Repo.all()
  729. # Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
  730. from(
  731. [_activity, object] in query,
  732. join: hto in "hashtags_objects",
  733. on: hto.object_id == object.id,
  734. where: hto.hashtag_id in ^hashtag_ids,
  735. distinct: [desc: object.id],
  736. order_by: [desc: object.id]
  737. )
  738. end
  739. defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
  740. restrict_hashtag_any(query, %{tag: [tag]})
  741. end
  742. defp restrict_hashtag_any(query, _), do: query
  743. defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
  744. raise_on_missing_preload()
  745. end
  746. defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
  747. from(
  748. [_activity, object] in query,
  749. where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
  750. )
  751. end
  752. defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
  753. restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
  754. end
  755. defp restrict_hashtag_reject_any(query, _), do: query
  756. defp raise_on_missing_preload do
  757. raise "Can't use the child object without preloading!"
  758. end
  759. defp restrict_recipients(query, [], _user), do: query
  760. defp restrict_recipients(query, recipients, nil) do
  761. from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
  762. end
  763. defp restrict_recipients(query, recipients, user) do
  764. from(
  765. activity in query,
  766. where: fragment("? && ?", ^recipients, activity.recipients),
  767. or_where: activity.actor == ^user.ap_id
  768. )
  769. end
  770. defp restrict_local(query, %{local_only: true}) do
  771. from(activity in query, where: activity.local == true)
  772. end
  773. defp restrict_local(query, _), do: query
  774. defp restrict_remote(query, %{remote: true}) do
  775. from(activity in query, where: activity.local == false)
  776. end
  777. defp restrict_remote(query, _), do: query
  778. defp restrict_actor(query, %{actor_id: actor_id}) do
  779. from(activity in query, where: activity.actor == ^actor_id)
  780. end
  781. defp restrict_actor(query, _), do: query
  782. defp restrict_type(query, %{type: type}) when is_binary(type) do
  783. from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
  784. end
  785. defp restrict_type(query, %{type: type}) do
  786. from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
  787. end
  788. defp restrict_type(query, _), do: query
  789. defp restrict_state(query, %{state: state}) do
  790. from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
  791. end
  792. defp restrict_state(query, _), do: query
  793. defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
  794. from(
  795. [_activity, object] in query,
  796. where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
  797. )
  798. end
  799. defp restrict_favorited_by(query, _), do: query
  800. defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
  801. raise "Can't use the child object without preloading!"
  802. end
  803. defp restrict_media(query, %{only_media: true}) do
  804. from(
  805. [activity, object] in query,
  806. where: fragment("(?)->>'type' = ?", activity.data, "Create"),
  807. where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
  808. )
  809. end
  810. defp restrict_media(query, _), do: query
  811. defp restrict_replies(query, %{exclude_replies: true}) do
  812. from(
  813. [_activity, object] in query,
  814. where: fragment("?->>'inReplyTo' is null", object.data)
  815. )
  816. end
  817. defp restrict_replies(query, %{
  818. reply_filtering_user: %User{} = user,
  819. reply_visibility: "self"
  820. }) do
  821. from(
  822. [activity, object] in query,
  823. where:
  824. fragment(
  825. "?->>'inReplyTo' is null OR ? = ANY(?)",
  826. object.data,
  827. ^user.ap_id,
  828. activity.recipients
  829. )
  830. )
  831. end
  832. defp restrict_replies(query, %{
  833. reply_filtering_user: %User{} = user,
  834. reply_visibility: "following"
  835. }) do
  836. from(
  837. [activity, object] in query,
  838. where:
  839. fragment(
  840. """
  841. ?->>'type' != 'Create' -- This isn't a Create
  842. OR ?->>'inReplyTo' is null -- this isn't a reply
  843. OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
  844. -- unless they are the author (because authors
  845. -- are also part of the recipients). This leads
  846. -- to a bug that self-replies by friends won't
  847. -- show up.
  848. OR ? = ? -- The actor is us
  849. """,
  850. activity.data,
  851. object.data,
  852. ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
  853. activity.recipients,
  854. activity.actor,
  855. activity.actor,
  856. ^user.ap_id
  857. )
  858. )
  859. end
  860. defp restrict_replies(query, _), do: query
  861. defp restrict_reblogs(query, %{exclude_reblogs: true}) do
  862. from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
  863. end
  864. defp restrict_reblogs(query, _), do: query
  865. defp restrict_muted(query, %{with_muted: true}), do: query
  866. defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
  867. mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
  868. query =
  869. from([activity] in query,
  870. where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
  871. where:
  872. fragment(
  873. "not (?->'to' \\?| ?) or ? = ?",
  874. activity.data,
  875. ^mutes,
  876. activity.actor,
  877. ^user.ap_id
  878. )
  879. )
  880. unless opts[:skip_preload] do
  881. from([thread_mute: tm] in query, where: is_nil(tm.user_id))
  882. else
  883. query
  884. end
  885. end
  886. defp restrict_muted(query, _), do: query
  887. defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
  888. blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
  889. domain_blocks = user.domain_blocks || []
  890. following_ap_ids = User.get_friends_ap_ids(user)
  891. query =
  892. if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
  893. from(
  894. [activity, object: o] in query,
  895. # You don't block the author
  896. where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
  897. # You don't block any recipients, and didn't author the post
  898. where:
  899. fragment(
  900. "((not (? && ?)) or ? = ?)",
  901. activity.recipients,
  902. ^blocked_ap_ids,
  903. activity.actor,
  904. ^user.ap_id
  905. ),
  906. # You don't block the domain of any recipients, and didn't author the post
  907. where:
  908. fragment(
  909. "(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
  910. activity.recipients,
  911. ^domain_blocks,
  912. activity.actor,
  913. ^user.ap_id
  914. ),
  915. # It's not a boost of a user you block
  916. where:
  917. fragment(
  918. "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
  919. activity.data,
  920. activity.data,
  921. ^blocked_ap_ids
  922. ),
  923. # You don't block the author's domain, and also don't follow the author
  924. where:
  925. fragment(
  926. "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
  927. activity.actor,
  928. ^domain_blocks,
  929. activity.actor,
  930. ^following_ap_ids
  931. ),
  932. # Same as above, but checks the Object
  933. where:
  934. fragment(
  935. "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
  936. o.data,
  937. ^domain_blocks,
  938. o.data,
  939. ^following_ap_ids
  940. )
  941. )
  942. end
  943. defp restrict_blocked(query, _), do: query
  944. defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
  945. if Config.get([:activitypub, :blockers_visible]) == true do
  946. query
  947. else
  948. blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
  949. from(
  950. activity in query,
  951. # The author doesn't block you
  952. where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
  953. # It's not a boost of a user that blocks you
  954. where:
  955. fragment(
  956. "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
  957. activity.data,
  958. activity.data,
  959. ^blocker_ap_ids
  960. )
  961. )
  962. end
  963. end
  964. defp restrict_blockers_visibility(query, _), do: query
  965. defp restrict_unlisted(query, %{restrict_unlisted: true}) do
  966. from(
  967. activity in query,
  968. where:
  969. fragment(
  970. "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
  971. activity.data,
  972. ^[Constants.as_public()]
  973. )
  974. )
  975. end
  976. defp restrict_unlisted(query, _), do: query
  977. defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
  978. from(
  979. [activity, object: o] in query,
  980. where:
  981. fragment(
  982. "(?)->>'type' = 'Create' and associated_object_id((?)) = any (?)",
  983. activity.data,
  984. activity.data,
  985. ^ids
  986. )
  987. )
  988. end
  989. defp restrict_pinned(query, _), do: query
  990. defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
  991. muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
  992. from(
  993. activity in query,
  994. where:
  995. fragment(
  996. "not ( ?->>'type' = 'Announce' and ? = ANY(?))",
  997. activity.data,
  998. activity.actor,
  999. ^muted_reblogs
  1000. )
  1001. )
  1002. end
  1003. defp restrict_muted_reblogs(query, _), do: query
  1004. defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
  1005. from(
  1006. activity in query,
  1007. where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
  1008. )
  1009. end
  1010. defp restrict_instance(query, _), do: query
  1011. defp restrict_filtered(query, %{user: %User{} = user}) do
  1012. case Filter.compose_regex(user) do
  1013. nil ->
  1014. query
  1015. regex ->
  1016. from([activity, object] in query,
  1017. where:
  1018. fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
  1019. activity.actor == ^user.ap_id
  1020. )
  1021. end
  1022. end
  1023. defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
  1024. restrict_filtered(query, %{user: user})
  1025. end
  1026. defp restrict_filtered(query, _), do: query
  1027. defp restrict_unauthenticated(query, nil) do
  1028. local = Config.restrict_unauthenticated_access?(:activities, :local)
  1029. remote = Config.restrict_unauthenticated_access?(:activities, :remote)
  1030. cond do
  1031. local and remote ->
  1032. from(activity in query, where: false)
  1033. local ->
  1034. from(activity in query, where: activity.local == false)
  1035. remote ->
  1036. from(activity in query, where: activity.local == true)
  1037. true ->
  1038. query
  1039. end
  1040. end
  1041. defp restrict_unauthenticated(query, _), do: query
  1042. defp restrict_quote_url(query, %{quote_url: quote_url}) do
  1043. from([_activity, object] in query,
  1044. where: fragment("(?)->'quoteUrl' = ?", object.data, ^quote_url)
  1045. )
  1046. end
  1047. defp restrict_quote_url(query, _), do: query
  1048. defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
  1049. defp exclude_poll_votes(query, _) do
  1050. if has_named_binding?(query, :object) do
  1051. from([activity, object: o] in query,
  1052. where: fragment("not(?->>'type' = ?)", o.data, "Answer")
  1053. )
  1054. else
  1055. query
  1056. end
  1057. end
  1058. defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
  1059. defp exclude_chat_messages(query, _) do
  1060. if has_named_binding?(query, :object) do
  1061. from([activity, object: o] in query,
  1062. where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
  1063. )
  1064. else
  1065. query
  1066. end
  1067. end
  1068. defp exclude_invisible_actors(query, %{type: "Flag"}), do: query
  1069. defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
  1070. defp exclude_invisible_actors(query, _opts) do
  1071. query
  1072. |> join(:inner, [activity], u in User,
  1073. as: :u,
  1074. on: activity.actor == u.ap_id and u.invisible == false
  1075. )
  1076. end
  1077. defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
  1078. from(activity in query, where: activity.id != ^id)
  1079. end
  1080. defp exclude_id(query, _), do: query
  1081. defp maybe_preload_objects(query, %{skip_preload: true}), do: query
  1082. defp maybe_preload_objects(query, _) do
  1083. query
  1084. |> Activity.with_preloaded_object()
  1085. end
  1086. defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
  1087. defp maybe_preload_bookmarks(query, opts) do
  1088. query
  1089. |> Activity.with_preloaded_bookmark(opts[:user])
  1090. end
  1091. defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
  1092. query
  1093. |> Activity.with_preloaded_report_notes()
  1094. end
  1095. defp maybe_preload_report_notes(query, _), do: query
  1096. defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
  1097. defp maybe_set_thread_muted_field(query, opts) do
  1098. query
  1099. |> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
  1100. end
  1101. defp maybe_order(query, %{order: :desc}) do
  1102. query
  1103. |> order_by(desc: :id)
  1104. end
  1105. defp maybe_order(query, %{order: :asc}) do
  1106. query
  1107. |> order_by(asc: :id)
  1108. end
  1109. defp maybe_order(query, _), do: query
  1110. defp normalize_fetch_activities_query_opts(opts) do
  1111. Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
  1112. case opts[key] do
  1113. value when is_bitstring(value) ->
  1114. Map.put(opts, key, Hashtag.normalize_name(value))
  1115. value when is_list(value) ->
  1116. normalized_value =
  1117. value
  1118. |> Enum.map(&Hashtag.normalize_name/1)
  1119. |> Enum.uniq()
  1120. Map.put(opts, key, normalized_value)
  1121. _ ->
  1122. opts
  1123. end
  1124. end)
  1125. end
  1126. defp fetch_activities_query_ap_ids_ops(opts) do
  1127. source_user = opts[:muting_user]
  1128. ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
  1129. ap_id_relationships =
  1130. if opts[:blocking_user] && opts[:blocking_user] == source_user do
  1131. [:block | ap_id_relationships]
  1132. else
  1133. ap_id_relationships
  1134. end
  1135. preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
  1136. restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
  1137. restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
  1138. restrict_muted_reblogs_opts =
  1139. Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
  1140. {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
  1141. end
  1142. def fetch_activities_query(recipients, opts \\ %{}) do
  1143. opts = normalize_fetch_activities_query_opts(opts)
  1144. {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
  1145. fetch_activities_query_ap_ids_ops(opts)
  1146. config = %{
  1147. skip_thread_containment: Config.get([:instance, :skip_thread_containment])
  1148. }
  1149. query =
  1150. Activity
  1151. |> maybe_preload_objects(opts)
  1152. |> maybe_preload_bookmarks(opts)
  1153. |> maybe_preload_report_notes(opts)
  1154. |> maybe_set_thread_muted_field(opts)
  1155. |> maybe_order(opts)
  1156. |> restrict_recipients(recipients, opts[:user])
  1157. |> restrict_replies(opts)
  1158. |> restrict_since(opts)
  1159. |> restrict_local(opts)
  1160. |> restrict_remote(opts)
  1161. |> restrict_actor(opts)
  1162. |> restrict_type(opts)
  1163. |> restrict_state(opts)
  1164. |> restrict_favorited_by(opts)
  1165. |> restrict_blocked(restrict_blocked_opts)
  1166. |> restrict_blockers_visibility(opts)
  1167. |> restrict_muted(restrict_muted_opts)
  1168. |> restrict_filtered(opts)
  1169. |> restrict_media(opts)
  1170. |> restrict_visibility(opts)
  1171. |> restrict_thread_visibility(opts, config)
  1172. |> restrict_reblogs(opts)
  1173. |> restrict_pinned(opts)
  1174. |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
  1175. |> restrict_instance(opts)
  1176. |> restrict_announce_object_actor(opts)
  1177. |> restrict_filtered(opts)
  1178. |> restrict_quote_url(opts)
  1179. |> maybe_restrict_deactivated_users(opts)
  1180. |> exclude_poll_votes(opts)
  1181. |> exclude_chat_messages(opts)
  1182. |> exclude_invisible_actors(opts)
  1183. |> exclude_visibility(opts)
  1184. if Config.feature_enabled?(:improved_hashtag_timeline) do
  1185. query
  1186. |> restrict_hashtag_any(opts)
  1187. |> restrict_hashtag_all(opts)
  1188. |> restrict_hashtag_reject_any(opts)
  1189. else
  1190. query
  1191. |> restrict_embedded_tag_any(opts)
  1192. |> restrict_embedded_tag_all(opts)
  1193. |> restrict_embedded_tag_reject_any(opts)
  1194. end
  1195. end
  1196. @doc """
  1197. Fetch favorites activities of user with order by sort adds to favorites
  1198. """
  1199. @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
  1200. def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
  1201. user.ap_id
  1202. |> Activity.Queries.by_actor()
  1203. |> Activity.Queries.by_type("Like")
  1204. |> Activity.with_joined_object()
  1205. |> Object.with_joined_activity()
  1206. |> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
  1207. |> order_by([like, _, _], desc_nulls_last: like.id)
  1208. |> Pagination.fetch_paginated(
  1209. Map.merge(params, %{skip_order: true}),
  1210. pagination
  1211. )
  1212. end
  1213. defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
  1214. Enum.map(activities, fn
  1215. %{data: %{"bcc" => [_ | _] = bcc}} = activity ->
  1216. if Enum.any?(bcc, &(&1 in list_memberships)) do
  1217. update_in(activity.data["cc"], &[user_ap_id | &1])
  1218. else
  1219. activity
  1220. end
  1221. activity ->
  1222. activity
  1223. end)
  1224. end
  1225. defp maybe_update_cc(activities, _, _), do: activities
  1226. defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
  1227. from(activity in query,
  1228. where:
  1229. fragment("? && ?", activity.recipients, ^recipients) or
  1230. (fragment("? && ?", activity.recipients, ^recipients_with_public) and
  1231. ^Constants.as_public() in activity.recipients)
  1232. )
  1233. end
  1234. def fetch_activities_bounded(
  1235. recipients,
  1236. recipients_with_public,
  1237. opts \\ %{},
  1238. pagination \\ :keyset
  1239. ) do
  1240. fetch_activities_query([], opts)
  1241. |> fetch_activities_bounded_query(recipients, recipients_with_public)
  1242. |> Pagination.fetch_paginated(opts, pagination)
  1243. |> Enum.reverse()
  1244. end
  1245. @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
  1246. def upload(file, opts \\ []) do
  1247. with {:ok, data} <- Upload.store(sanitize_upload_file(file), opts) do
  1248. obj_data = Maps.put_if_present(data, "actor", opts[:actor])
  1249. Repo.insert(%Object{data: obj_data})
  1250. end
  1251. end
  1252. defp sanitize_upload_file(%Plug.Upload{filename: filename} = upload) when is_binary(filename) do
  1253. %Plug.Upload{
  1254. upload
  1255. | filename: Path.basename(filename)
  1256. }
  1257. end
  1258. defp sanitize_upload_file(upload), do: upload
  1259. @spec get_actor_url(any()) :: binary() | nil
  1260. defp get_actor_url(url) when is_binary(url), do: url
  1261. defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
  1262. defp get_actor_url(url) when is_list(url) do
  1263. url
  1264. |> List.first()
  1265. |> get_actor_url()
  1266. end
  1267. defp get_actor_url(_url), do: nil
  1268. defp normalize_image(%{"url" => url}) do
  1269. %{
  1270. "type" => "Image",
  1271. "url" => [%{"href" => url}]
  1272. }
  1273. end
  1274. defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
  1275. defp normalize_image(_), do: nil
  1276. defp object_to_user_data(data, additional) do
  1277. fields =
  1278. data
  1279. |> Map.get("attachment", [])
  1280. |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
  1281. |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
  1282. emojis =
  1283. data
  1284. |> Map.get("tag", [])
  1285. |> Enum.filter(fn
  1286. %{"type" => "Emoji"} -> true
  1287. _ -> false
  1288. end)
  1289. |> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
  1290. {String.trim(name, ":"), url}
  1291. end)
  1292. is_locked = data["manuallyApprovesFollowers"] || false
  1293. capabilities = data["capabilities"] || %{}
  1294. accepts_chat_messages = capabilities["acceptsChatMessages"]
  1295. data = Transmogrifier.maybe_fix_user_object(data)
  1296. is_discoverable = data["discoverable"] || false
  1297. invisible = data["invisible"] || false
  1298. actor_type = data["type"] || "Person"
  1299. featured_address = data["featured"]
  1300. {:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
  1301. public_key =
  1302. if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
  1303. data["publicKey"]["publicKeyPem"]
  1304. end
  1305. shared_inbox =
  1306. if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
  1307. data["endpoints"]["sharedInbox"]
  1308. end
  1309. birthday =
  1310. if is_binary(data["vcard:bday"]) do
  1311. case Date.from_iso8601(data["vcard:bday"]) do
  1312. {:ok, date} -> date
  1313. {:error, _} -> nil
  1314. end
  1315. end
  1316. show_birthday = !!birthday
  1317. # if WebFinger request was already done, we probably have acct, otherwise
  1318. # we request WebFinger here
  1319. nickname = additional[:nickname_from_acct] || generate_nickname(data)
  1320. %{
  1321. ap_id: data["id"],
  1322. uri: get_actor_url(data["url"]),
  1323. banner: normalize_image(data["image"]),
  1324. fields: fields,
  1325. emoji: emojis,
  1326. is_locked: is_locked,
  1327. is_discoverable: is_discoverable,
  1328. invisible: invisible,
  1329. avatar: normalize_image(data["icon"]),
  1330. name: data["name"],
  1331. follower_address: data["followers"],
  1332. following_address: data["following"],
  1333. featured_address: featured_address,
  1334. bio: data["summary"] || "",
  1335. actor_type: actor_type,
  1336. also_known_as: Map.get(data, "alsoKnownAs", []),
  1337. public_key: public_key,
  1338. inbox: data["inbox"],
  1339. shared_inbox: shared_inbox,
  1340. accepts_chat_messages: accepts_chat_messages,
  1341. birthday: birthday,
  1342. show_birthday: show_birthday,
  1343. pinned_objects: pinned_objects,
  1344. nickname: nickname
  1345. }
  1346. end
  1347. defp generate_nickname(%{"preferredUsername" => username} = data) when is_binary(username) do
  1348. generated = "#{username}@#{URI.parse(data["id"]).host}"
  1349. if Config.get([WebFinger, :update_nickname_on_user_fetch]) do
  1350. case WebFinger.finger(generated) do
  1351. {:ok, %{"subject" => "acct:" <> acct}} -> acct
  1352. _ -> generated
  1353. end
  1354. else
  1355. generated
  1356. end
  1357. end
  1358. # nickname can be nil because of virtual actors
  1359. defp generate_nickname(_), do: nil
  1360. def fetch_follow_information_for_user(user) do
  1361. with {:ok, following_data} <-
  1362. Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
  1363. {:ok, hide_follows} <- collection_private(following_data),
  1364. {:ok, followers_data} <-
  1365. Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
  1366. {:ok, hide_followers} <- collection_private(followers_data) do
  1367. {:ok,
  1368. %{
  1369. hide_follows: hide_follows,
  1370. follower_count: normalize_counter(followers_data["totalItems"]),
  1371. following_count: normalize_counter(following_data["totalItems"]),
  1372. hide_followers: hide_followers
  1373. }}
  1374. else
  1375. {:error, _} = e -> e
  1376. e -> {:error, e}
  1377. end
  1378. end
  1379. defp normalize_counter(counter) when is_integer(counter), do: counter
  1380. defp normalize_counter(_), do: 0
  1381. def maybe_update_follow_information(user_data) do
  1382. with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
  1383. {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
  1384. {_, true} <-
  1385. {:collections_available,
  1386. !!(user_data[:following_address] && user_data[:follower_address])},
  1387. {:ok, info} <-
  1388. fetch_follow_information_for_user(user_data) do
  1389. info = Map.merge(user_data[:info] || %{}, info)
  1390. user_data
  1391. |> Map.put(:info, info)
  1392. else
  1393. {:user_type_check, false} ->
  1394. user_data
  1395. {:collections_available, false} ->
  1396. user_data
  1397. {:enabled, false} ->
  1398. user_data
  1399. e ->
  1400. Logger.error(
  1401. "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
  1402. )
  1403. user_data
  1404. end
  1405. end
  1406. defp collection_private(%{"first" => %{"type" => type}})
  1407. when type in ["CollectionPage", "OrderedCollectionPage"],
  1408. do: {:ok, false}
  1409. defp collection_private(%{"first" => first}) do
  1410. with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
  1411. Fetcher.fetch_and_contain_remote_object_from_id(first) do
  1412. {:ok, false}
  1413. else
  1414. {:error, _} -> {:ok, true}
  1415. end
  1416. end
  1417. defp collection_private(_data), do: {:ok, true}
  1418. def user_data_from_user_object(data, additional \\ []) do
  1419. with {:ok, data} <- MRF.filter(data) do
  1420. {:ok, object_to_user_data(data, additional)}
  1421. else
  1422. e -> {:error, e}
  1423. end
  1424. end
  1425. defp fetch_and_prepare_user_from_ap_id(ap_id, additional) do
  1426. with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
  1427. {:ok, data} <- user_data_from_user_object(data, additional) do
  1428. {:ok, maybe_update_follow_information(data)}
  1429. else
  1430. # If this has been deleted, only log a debug and not an error
  1431. {:error, "Object has been deleted" = e} ->
  1432. Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
  1433. {:error, e}
  1434. {:error, {:reject, reason} = e} ->
  1435. Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
  1436. {:error, e}
  1437. {:error, e} ->
  1438. Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
  1439. {:error, e}
  1440. end
  1441. end
  1442. def maybe_handle_clashing_nickname(data) do
  1443. with nickname when is_binary(nickname) <- data[:nickname],
  1444. %User{} = old_user <- User.get_by_nickname(nickname),
  1445. {_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
  1446. Logger.info(
  1447. "Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
  1448. )
  1449. old_user
  1450. |> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
  1451. |> User.update_and_set_cache()
  1452. else
  1453. {:ap_id_comparison, true} ->
  1454. Logger.info(
  1455. "Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
  1456. )
  1457. _ ->
  1458. nil
  1459. end
  1460. end
  1461. def pin_data_from_featured_collection(%{
  1462. "type" => type,
  1463. "orderedItems" => objects
  1464. })
  1465. when type in ["OrderedCollection", "Collection"] do
  1466. Map.new(objects, fn
  1467. %{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
  1468. object_ap_id when is_binary(object_ap_id) -> {object_ap_id, NaiveDateTime.utc_now()}
  1469. end)
  1470. end
  1471. def pin_data_from_featured_collection(obj) do
  1472. Logger.error("Could not parse featured collection #{inspect(obj)}")
  1473. %{}
  1474. end
  1475. def fetch_and_prepare_featured_from_ap_id(nil) do
  1476. {:ok, %{}}
  1477. end
  1478. def fetch_and_prepare_featured_from_ap_id(ap_id) do
  1479. with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
  1480. {:ok, pin_data_from_featured_collection(data)}
  1481. else
  1482. e ->
  1483. Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
  1484. {:ok, %{}}
  1485. end
  1486. end
  1487. def pinned_fetch_task(nil), do: nil
  1488. def pinned_fetch_task(%{pinned_objects: pins}) do
  1489. if Enum.all?(pins, fn {ap_id, _} ->
  1490. Object.get_cached_by_ap_id(ap_id) ||
  1491. match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
  1492. end) do
  1493. :ok
  1494. else
  1495. :error
  1496. end
  1497. end
  1498. def make_user_from_ap_id(ap_id, additional \\ []) do
  1499. user = User.get_cached_by_ap_id(ap_id)
  1500. with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
  1501. {:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
  1502. if user do
  1503. user
  1504. |> User.remote_user_changeset(data)
  1505. |> User.update_and_set_cache()
  1506. else
  1507. maybe_handle_clashing_nickname(data)
  1508. data
  1509. |> User.remote_user_changeset()
  1510. |> Repo.insert()
  1511. |> User.set_cache()
  1512. end
  1513. end
  1514. end
  1515. def make_user_from_nickname(nickname) do
  1516. with {:ok, %{"ap_id" => ap_id, "subject" => "acct:" <> acct}} when not is_nil(ap_id) <-
  1517. WebFinger.finger(nickname) do
  1518. make_user_from_ap_id(ap_id, nickname_from_acct: acct)
  1519. else
  1520. _e -> {:error, "No AP id in WebFinger"}
  1521. end
  1522. end
  1523. # filter out broken threads
  1524. defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
  1525. entire_thread_visible_for_user?(activity, user)
  1526. end
  1527. # do post-processing on a specific activity
  1528. def contain_activity(%Activity{} = activity, %User{} = user) do
  1529. contain_broken_threads(activity, user)
  1530. end
  1531. def fetch_direct_messages_query do
  1532. Activity
  1533. |> restrict_type(%{type: "Create"})
  1534. |> restrict_visibility(%{visibility: "direct"})
  1535. |> order_by([activity], asc: activity.id)
  1536. end
  1537. defp maybe_restrict_deactivated_users(activity, %{type: "Flag"}), do: activity
  1538. defp maybe_restrict_deactivated_users(activity, _opts),
  1539. do: Activity.restrict_deactivated_users(activity)
  1540. end