websub.ex (10182B)
1 # Pleroma: A lightweight social networking server 2 # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> 3 # SPDX-License-Identifier: AGPL-3.0-only 4 5 defmodule Pleroma.Web.Websub do 6 alias Ecto.Changeset 7 alias Pleroma.Activity 8 alias Pleroma.HTTP 9 alias Pleroma.Instances 10 alias Pleroma.Repo 11 alias Pleroma.User 12 alias Pleroma.Web.ActivityPub.Visibility 13 alias Pleroma.Web.Endpoint 14 alias Pleroma.Web.Federator 15 alias Pleroma.Web.Federator.Publisher 16 alias Pleroma.Web.OStatus 17 alias Pleroma.Web.OStatus.FeedRepresenter 18 alias Pleroma.Web.Router.Helpers 19 alias Pleroma.Web.Websub.WebsubClientSubscription 20 alias Pleroma.Web.Websub.WebsubServerSubscription 21 alias Pleroma.Web.XML 22 require Logger 23 24 import Ecto.Query 25 26 @behaviour Pleroma.Web.Federator.Publisher 27 28 def verify(subscription, getter \\ &HTTP.get/3) do 29 challenge = Base.encode16(:crypto.strong_rand_bytes(8)) 30 lease_seconds = NaiveDateTime.diff(subscription.valid_until, subscription.updated_at) 31 lease_seconds = lease_seconds |> to_string 32 33 params = %{ 34 "hub.challenge": challenge, 35 "hub.lease_seconds": lease_seconds, 36 "hub.topic": subscription.topic, 37 "hub.mode": "subscribe" 38 } 39 40 url = hd(String.split(subscription.callback, "?")) 41 query = URI.parse(subscription.callback).query || "" 42 params = Map.merge(params, URI.decode_query(query)) 43 44 with {:ok, response} <- getter.(url, [], params: params), 45 ^challenge <- response.body do 46 changeset = Changeset.change(subscription, %{state: "active"}) 47 Repo.update(changeset) 48 else 49 e -> 50 Logger.debug("Couldn't verify subscription") 51 Logger.debug(inspect(e)) 52 {:error, subscription} 53 end 54 end 55 56 @supported_activities [ 57 "Create", 58 "Follow", 59 "Like", 60 "Announce", 61 "Undo", 62 "Delete" 63 ] 64 65 def is_representable?(%Activity{data: %{"type" => type}} = activity) 66 when type in @supported_activities, 67 do: Visibility.is_public?(activity) 68 69 def is_representable?(_), do: false 70 71 def publish(topic, user, %{data: %{"type" => type}} = activity) 72 when type in @supported_activities do 73 response = 74 user 75 |> FeedRepresenter.to_simple_form([activity], [user]) 76 |> :xmerl.export_simple(:xmerl_xml) 77 |> to_string 78 79 query = 80 from( 81 sub in WebsubServerSubscription, 82 where: sub.topic == ^topic and sub.state == "active", 83 where: fragment("? > (NOW() at time zone 'UTC')", sub.valid_until) 84 ) 85 86 subscriptions = Repo.all(query) 87 88 callbacks = Enum.map(subscriptions, & &1.callback) 89 reachable_callbacks_metadata = Instances.filter_reachable(callbacks) 90 reachable_callbacks = Map.keys(reachable_callbacks_metadata) 91 92 subscriptions 93 |> Enum.filter(&(&1.callback in reachable_callbacks)) 94 |> Enum.each(fn sub -> 95 data = %{ 96 xml: response, 97 topic: topic, 98 callback: sub.callback, 99 secret: sub.secret, 100 unreachable_since: reachable_callbacks_metadata[sub.callback] 101 } 102 103 Publisher.enqueue_one(__MODULE__, data) 104 end) 105 end 106 107 def publish(_, _, _), do: "" 108 109 def publish(actor, activity), do: publish(Pleroma.Web.OStatus.feed_path(actor), actor, activity) 110 111 def sign(secret, doc) do 112 :crypto.hmac(:sha, secret, to_string(doc)) |> Base.encode16() |> String.downcase() 113 end 114 115 def incoming_subscription_request(user, %{"hub.mode" => "subscribe"} = params) do 116 with {:ok, topic} <- valid_topic(params, user), 117 {:ok, lease_time} <- lease_time(params), 118 secret <- params["hub.secret"], 119 callback <- params["hub.callback"] do 120 subscription = get_subscription(topic, callback) 121 122 data = %{ 123 state: subscription.state || "requested", 124 topic: topic, 125 secret: secret, 126 callback: callback 127 } 128 129 change = Changeset.change(subscription, data) 130 websub = Repo.insert_or_update!(change) 131 132 change = 133 Changeset.change(websub, %{valid_until: NaiveDateTime.add(websub.updated_at, lease_time)}) 134 135 websub = Repo.update!(change) 136 137 Federator.verify_websub(websub) 138 139 {:ok, websub} 140 else 141 {:error, reason} -> 142 Logger.debug("Couldn't create subscription") 143 Logger.debug(inspect(reason)) 144 145 {:error, reason} 146 end 147 end 148 149 def incoming_subscription_request(user, params) do 150 Logger.info("Unhandled WebSub request for #{user.nickname}: #{inspect(params)}") 151 152 {:error, "Invalid WebSub request"} 153 end 154 155 defp get_subscription(topic, callback) do 156 Repo.get_by(WebsubServerSubscription, topic: topic, callback: callback) || 157 %WebsubServerSubscription{} 158 end 159 160 # Temp hack for mastodon. 161 defp lease_time(%{"hub.lease_seconds" => ""}) do 162 # three days 163 {:ok, 60 * 60 * 24 * 3} 164 end 165 166 defp lease_time(%{"hub.lease_seconds" => lease_seconds}) do 167 {:ok, String.to_integer(lease_seconds)} 168 end 169 170 defp lease_time(_) do 171 # three days 172 {:ok, 60 * 60 * 24 * 3} 173 end 174 175 defp valid_topic(%{"hub.topic" => topic}, user) do 176 if topic == OStatus.feed_path(user) do 177 {:ok, OStatus.feed_path(user)} 178 else 179 {:error, "Wrong topic requested, expected #{OStatus.feed_path(user)}, got #{topic}"} 180 end 181 end 182 183 def subscribe(subscriber, subscribed, requester \\ &request_subscription/1) do 184 topic = subscribed.info.topic 185 # FIXME: Race condition, use transactions 186 {:ok, subscription} = 187 with subscription when not is_nil(subscription) <- 188 Repo.get_by(WebsubClientSubscription, topic: topic) do 189 subscribers = [subscriber.ap_id | subscription.subscribers] |> Enum.uniq() 190 change = Ecto.Changeset.change(subscription, %{subscribers: subscribers}) 191 Repo.update(change) 192 else 193 _e -> 194 subscription = %WebsubClientSubscription{ 195 topic: topic, 196 hub: subscribed.info.hub, 197 subscribers: [subscriber.ap_id], 198 state: "requested", 199 secret: :crypto.strong_rand_bytes(8) |> Base.url_encode64(), 200 user: subscribed 201 } 202 203 Repo.insert(subscription) 204 end 205 206 requester.(subscription) 207 end 208 209 def gather_feed_data(topic, getter \\ &HTTP.get/1) do 210 with {:ok, response} <- getter.(topic), 211 status when status in 200..299 <- response.status, 212 body <- response.body, 213 doc <- XML.parse_document(body), 214 uri when not is_nil(uri) <- XML.string_from_xpath("/feed/author[1]/uri", doc), 215 hub when not is_nil(hub) <- XML.string_from_xpath(~S{/feed/link[@rel="hub"]/@href}, doc) do 216 name = XML.string_from_xpath("/feed/author[1]/name", doc) 217 preferred_username = XML.string_from_xpath("/feed/author[1]/poco:preferredUsername", doc) 218 display_name = XML.string_from_xpath("/feed/author[1]/poco:displayName", doc) 219 avatar = OStatus.make_avatar_object(doc) 220 bio = XML.string_from_xpath("/feed/author[1]/summary", doc) 221 222 {:ok, 223 %{ 224 "uri" => uri, 225 "hub" => hub, 226 "nickname" => preferred_username || name, 227 "name" => display_name || name, 228 "host" => URI.parse(uri).host, 229 "avatar" => avatar, 230 "bio" => bio 231 }} 232 else 233 e -> 234 {:error, e} 235 end 236 end 237 238 def request_subscription(websub, poster \\ &HTTP.post/3, timeout \\ 10_000) do 239 data = [ 240 "hub.mode": "subscribe", 241 "hub.topic": websub.topic, 242 "hub.secret": websub.secret, 243 "hub.callback": Helpers.websub_url(Endpoint, :websub_subscription_confirmation, websub.id) 244 ] 245 246 # This checks once a second if we are confirmed yet 247 websub_checker = fn -> 248 helper = fn helper -> 249 :timer.sleep(1000) 250 websub = Repo.get_by(WebsubClientSubscription, id: websub.id, state: "accepted") 251 if websub, do: websub, else: helper.(helper) 252 end 253 254 helper.(helper) 255 end 256 257 task = Task.async(websub_checker) 258 259 with {:ok, %{status: 202}} <- 260 poster.(websub.hub, {:form, data}, "Content-type": "application/x-www-form-urlencoded"), 261 {:ok, websub} <- Task.yield(task, timeout) do 262 {:ok, websub} 263 else 264 e -> 265 Task.shutdown(task) 266 267 change = Ecto.Changeset.change(websub, %{state: "rejected"}) 268 {:ok, websub} = Repo.update(change) 269 270 Logger.debug(fn -> "Couldn't confirm subscription: #{inspect(websub)}" end) 271 Logger.debug(fn -> "error: #{inspect(e)}" end) 272 273 {:error, websub} 274 end 275 end 276 277 def refresh_subscriptions(delta \\ 60 * 60 * 24) do 278 Logger.debug("Refreshing subscriptions") 279 280 cut_off = NaiveDateTime.add(NaiveDateTime.utc_now(), delta) 281 282 query = from(sub in WebsubClientSubscription, where: sub.valid_until < ^cut_off) 283 284 subs = Repo.all(query) 285 286 Enum.each(subs, fn sub -> 287 Federator.request_subscription(sub) 288 end) 289 end 290 291 def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret} = params) do 292 signature = sign(secret || "", xml) 293 Logger.info(fn -> "Pushing #{topic} to #{callback}" end) 294 295 with {:ok, %{status: code}} when code in 200..299 <- 296 HTTP.post( 297 callback, 298 xml, 299 [ 300 {"Content-Type", "application/atom+xml"}, 301 {"X-Hub-Signature", "sha1=#{signature}"} 302 ] 303 ) do 304 if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since], 305 do: Instances.set_reachable(callback) 306 307 Logger.info(fn -> "Pushed to #{callback}, code #{code}" end) 308 {:ok, code} 309 else 310 {_post_result, response} -> 311 unless params[:unreachable_since], do: Instances.set_reachable(callback) 312 Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(response)}" end) 313 {:error, response} 314 end 315 end 316 317 def gather_webfinger_links(%User{} = user) do 318 [ 319 %{ 320 "rel" => "http://schemas.google.com/g/2010#updates-from", 321 "type" => "application/atom+xml", 322 "href" => OStatus.feed_path(user) 323 }, 324 %{ 325 "rel" => "http://ostatus.org/schema/1.0/subscribe", 326 "template" => OStatus.remote_follow_path() 327 } 328 ] 329 end 330 331 def gather_nodeinfo_protocol_names, do: ["ostatus"] 332 end