logo

pleroma

My custom branche(s) on git.pleroma.social/pleroma/pleroma

websub.ex (10182B)


      1 # Pleroma: A lightweight social networking server
      2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
      3 # SPDX-License-Identifier: AGPL-3.0-only
      4 
      5 defmodule Pleroma.Web.Websub do
      6   alias Ecto.Changeset
      7   alias Pleroma.Activity
      8   alias Pleroma.HTTP
      9   alias Pleroma.Instances
     10   alias Pleroma.Repo
     11   alias Pleroma.User
     12   alias Pleroma.Web.ActivityPub.Visibility
     13   alias Pleroma.Web.Endpoint
     14   alias Pleroma.Web.Federator
     15   alias Pleroma.Web.Federator.Publisher
     16   alias Pleroma.Web.OStatus
     17   alias Pleroma.Web.OStatus.FeedRepresenter
     18   alias Pleroma.Web.Router.Helpers
     19   alias Pleroma.Web.Websub.WebsubClientSubscription
     20   alias Pleroma.Web.Websub.WebsubServerSubscription
     21   alias Pleroma.Web.XML
     22   require Logger
     23 
     24   import Ecto.Query
     25 
     26   @behaviour Pleroma.Web.Federator.Publisher
     27 
     28   def verify(subscription, getter \\ &HTTP.get/3) do
     29     challenge = Base.encode16(:crypto.strong_rand_bytes(8))
     30     lease_seconds = NaiveDateTime.diff(subscription.valid_until, subscription.updated_at)
     31     lease_seconds = lease_seconds |> to_string
     32 
     33     params = %{
     34       "hub.challenge": challenge,
     35       "hub.lease_seconds": lease_seconds,
     36       "hub.topic": subscription.topic,
     37       "hub.mode": "subscribe"
     38     }
     39 
     40     url = hd(String.split(subscription.callback, "?"))
     41     query = URI.parse(subscription.callback).query || ""
     42     params = Map.merge(params, URI.decode_query(query))
     43 
     44     with {:ok, response} <- getter.(url, [], params: params),
     45          ^challenge <- response.body do
     46       changeset = Changeset.change(subscription, %{state: "active"})
     47       Repo.update(changeset)
     48     else
     49       e ->
     50         Logger.debug("Couldn't verify subscription")
     51         Logger.debug(inspect(e))
     52         {:error, subscription}
     53     end
     54   end
     55 
     56   @supported_activities [
     57     "Create",
     58     "Follow",
     59     "Like",
     60     "Announce",
     61     "Undo",
     62     "Delete"
     63   ]
     64 
     65   def is_representable?(%Activity{data: %{"type" => type}} = activity)
     66       when type in @supported_activities,
     67       do: Visibility.is_public?(activity)
     68 
     69   def is_representable?(_), do: false
     70 
     71   def publish(topic, user, %{data: %{"type" => type}} = activity)
     72       when type in @supported_activities do
     73     response =
     74       user
     75       |> FeedRepresenter.to_simple_form([activity], [user])
     76       |> :xmerl.export_simple(:xmerl_xml)
     77       |> to_string
     78 
     79     query =
     80       from(
     81         sub in WebsubServerSubscription,
     82         where: sub.topic == ^topic and sub.state == "active",
     83         where: fragment("? > (NOW() at time zone 'UTC')", sub.valid_until)
     84       )
     85 
     86     subscriptions = Repo.all(query)
     87 
     88     callbacks = Enum.map(subscriptions, & &1.callback)
     89     reachable_callbacks_metadata = Instances.filter_reachable(callbacks)
     90     reachable_callbacks = Map.keys(reachable_callbacks_metadata)
     91 
     92     subscriptions
     93     |> Enum.filter(&(&1.callback in reachable_callbacks))
     94     |> Enum.each(fn sub ->
     95       data = %{
     96         xml: response,
     97         topic: topic,
     98         callback: sub.callback,
     99         secret: sub.secret,
    100         unreachable_since: reachable_callbacks_metadata[sub.callback]
    101       }
    102 
    103       Publisher.enqueue_one(__MODULE__, data)
    104     end)
    105   end
    106 
    107   def publish(_, _, _), do: ""
    108 
    109   def publish(actor, activity), do: publish(Pleroma.Web.OStatus.feed_path(actor), actor, activity)
    110 
    111   def sign(secret, doc) do
    112     :crypto.hmac(:sha, secret, to_string(doc)) |> Base.encode16() |> String.downcase()
    113   end
    114 
    115   def incoming_subscription_request(user, %{"hub.mode" => "subscribe"} = params) do
    116     with {:ok, topic} <- valid_topic(params, user),
    117          {:ok, lease_time} <- lease_time(params),
    118          secret <- params["hub.secret"],
    119          callback <- params["hub.callback"] do
    120       subscription = get_subscription(topic, callback)
    121 
    122       data = %{
    123         state: subscription.state || "requested",
    124         topic: topic,
    125         secret: secret,
    126         callback: callback
    127       }
    128 
    129       change = Changeset.change(subscription, data)
    130       websub = Repo.insert_or_update!(change)
    131 
    132       change =
    133         Changeset.change(websub, %{valid_until: NaiveDateTime.add(websub.updated_at, lease_time)})
    134 
    135       websub = Repo.update!(change)
    136 
    137       Federator.verify_websub(websub)
    138 
    139       {:ok, websub}
    140     else
    141       {:error, reason} ->
    142         Logger.debug("Couldn't create subscription")
    143         Logger.debug(inspect(reason))
    144 
    145         {:error, reason}
    146     end
    147   end
    148 
    149   def incoming_subscription_request(user, params) do
    150     Logger.info("Unhandled WebSub request for #{user.nickname}: #{inspect(params)}")
    151 
    152     {:error, "Invalid WebSub request"}
    153   end
    154 
    155   defp get_subscription(topic, callback) do
    156     Repo.get_by(WebsubServerSubscription, topic: topic, callback: callback) ||
    157       %WebsubServerSubscription{}
    158   end
    159 
    160   # Temp hack for mastodon.
    161   defp lease_time(%{"hub.lease_seconds" => ""}) do
    162     # three days
    163     {:ok, 60 * 60 * 24 * 3}
    164   end
    165 
    166   defp lease_time(%{"hub.lease_seconds" => lease_seconds}) do
    167     {:ok, String.to_integer(lease_seconds)}
    168   end
    169 
    170   defp lease_time(_) do
    171     # three days
    172     {:ok, 60 * 60 * 24 * 3}
    173   end
    174 
    175   defp valid_topic(%{"hub.topic" => topic}, user) do
    176     if topic == OStatus.feed_path(user) do
    177       {:ok, OStatus.feed_path(user)}
    178     else
    179       {:error, "Wrong topic requested, expected #{OStatus.feed_path(user)}, got #{topic}"}
    180     end
    181   end
    182 
    183   def subscribe(subscriber, subscribed, requester \\ &request_subscription/1) do
    184     topic = subscribed.info.topic
    185     # FIXME: Race condition, use transactions
    186     {:ok, subscription} =
    187       with subscription when not is_nil(subscription) <-
    188              Repo.get_by(WebsubClientSubscription, topic: topic) do
    189         subscribers = [subscriber.ap_id | subscription.subscribers] |> Enum.uniq()
    190         change = Ecto.Changeset.change(subscription, %{subscribers: subscribers})
    191         Repo.update(change)
    192       else
    193         _e ->
    194           subscription = %WebsubClientSubscription{
    195             topic: topic,
    196             hub: subscribed.info.hub,
    197             subscribers: [subscriber.ap_id],
    198             state: "requested",
    199             secret: :crypto.strong_rand_bytes(8) |> Base.url_encode64(),
    200             user: subscribed
    201           }
    202 
    203           Repo.insert(subscription)
    204       end
    205 
    206     requester.(subscription)
    207   end
    208 
    209   def gather_feed_data(topic, getter \\ &HTTP.get/1) do
    210     with {:ok, response} <- getter.(topic),
    211          status when status in 200..299 <- response.status,
    212          body <- response.body,
    213          doc <- XML.parse_document(body),
    214          uri when not is_nil(uri) <- XML.string_from_xpath("/feed/author[1]/uri", doc),
    215          hub when not is_nil(hub) <- XML.string_from_xpath(~S{/feed/link[@rel="hub"]/@href}, doc) do
    216       name = XML.string_from_xpath("/feed/author[1]/name", doc)
    217       preferred_username = XML.string_from_xpath("/feed/author[1]/poco:preferredUsername", doc)
    218       display_name = XML.string_from_xpath("/feed/author[1]/poco:displayName", doc)
    219       avatar = OStatus.make_avatar_object(doc)
    220       bio = XML.string_from_xpath("/feed/author[1]/summary", doc)
    221 
    222       {:ok,
    223        %{
    224          "uri" => uri,
    225          "hub" => hub,
    226          "nickname" => preferred_username || name,
    227          "name" => display_name || name,
    228          "host" => URI.parse(uri).host,
    229          "avatar" => avatar,
    230          "bio" => bio
    231        }}
    232     else
    233       e ->
    234         {:error, e}
    235     end
    236   end
    237 
    238   def request_subscription(websub, poster \\ &HTTP.post/3, timeout \\ 10_000) do
    239     data = [
    240       "hub.mode": "subscribe",
    241       "hub.topic": websub.topic,
    242       "hub.secret": websub.secret,
    243       "hub.callback": Helpers.websub_url(Endpoint, :websub_subscription_confirmation, websub.id)
    244     ]
    245 
    246     # This checks once a second if we are confirmed yet
    247     websub_checker = fn ->
    248       helper = fn helper ->
    249         :timer.sleep(1000)
    250         websub = Repo.get_by(WebsubClientSubscription, id: websub.id, state: "accepted")
    251         if websub, do: websub, else: helper.(helper)
    252       end
    253 
    254       helper.(helper)
    255     end
    256 
    257     task = Task.async(websub_checker)
    258 
    259     with {:ok, %{status: 202}} <-
    260            poster.(websub.hub, {:form, data}, "Content-type": "application/x-www-form-urlencoded"),
    261          {:ok, websub} <- Task.yield(task, timeout) do
    262       {:ok, websub}
    263     else
    264       e ->
    265         Task.shutdown(task)
    266 
    267         change = Ecto.Changeset.change(websub, %{state: "rejected"})
    268         {:ok, websub} = Repo.update(change)
    269 
    270         Logger.debug(fn -> "Couldn't confirm subscription: #{inspect(websub)}" end)
    271         Logger.debug(fn -> "error: #{inspect(e)}" end)
    272 
    273         {:error, websub}
    274     end
    275   end
    276 
    277   def refresh_subscriptions(delta \\ 60 * 60 * 24) do
    278     Logger.debug("Refreshing subscriptions")
    279 
    280     cut_off = NaiveDateTime.add(NaiveDateTime.utc_now(), delta)
    281 
    282     query = from(sub in WebsubClientSubscription, where: sub.valid_until < ^cut_off)
    283 
    284     subs = Repo.all(query)
    285 
    286     Enum.each(subs, fn sub ->
    287       Federator.request_subscription(sub)
    288     end)
    289   end
    290 
    291   def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret} = params) do
    292     signature = sign(secret || "", xml)
    293     Logger.info(fn -> "Pushing #{topic} to #{callback}" end)
    294 
    295     with {:ok, %{status: code}} when code in 200..299 <-
    296            HTTP.post(
    297              callback,
    298              xml,
    299              [
    300                {"Content-Type", "application/atom+xml"},
    301                {"X-Hub-Signature", "sha1=#{signature}"}
    302              ]
    303            ) do
    304       if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
    305         do: Instances.set_reachable(callback)
    306 
    307       Logger.info(fn -> "Pushed to #{callback}, code #{code}" end)
    308       {:ok, code}
    309     else
    310       {_post_result, response} ->
    311         unless params[:unreachable_since], do: Instances.set_reachable(callback)
    312         Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(response)}" end)
    313         {:error, response}
    314     end
    315   end
    316 
    317   def gather_webfinger_links(%User{} = user) do
    318     [
    319       %{
    320         "rel" => "http://schemas.google.com/g/2010#updates-from",
    321         "type" => "application/atom+xml",
    322         "href" => OStatus.feed_path(user)
    323       },
    324       %{
    325         "rel" => "http://ostatus.org/schema/1.0/subscribe",
    326         "template" => OStatus.remote_follow_path()
    327       }
    328     ]
    329   end
    330 
    331   def gather_nodeinfo_protocol_names, do: ["ostatus"]
    332 end