commit: 11cbe49ffc2729f9d40588c2e270a89f328cd6dd
parent: dbe00a4156952cf0288828435ccda6bbba5d7a6f
Author: Eugen Rochko <eugen@zeonfederated.com>
Date: Tue, 8 Nov 2016 01:32:34 +0100
ProcessFeedService refactor
Diffstat:
1 file changed, 147 insertions(+), 195 deletions(-)
diff --git a/app/services/process_feed_service.rb b/app/services/process_feed_service.rb
@@ -2,257 +2,209 @@ class ProcessFeedService < BaseService
ACTIVITY_NS = 'http://activitystrea.ms/spec/1.0/'.freeze
THREAD_NS = 'http://purl.org/syndication/thread/1.0'.freeze
- # Create local statuses from an Atom feed
- # @param [String] body Atom feed
- # @param [Account] account Account this feed belongs to
- # @return [Enumerable] created statuses
def call(body, account)
xml = Nokogiri::XML(body)
- update_remote_profile_service.call(xml.at_xpath('/xmlns:feed/xmlns:author'), account) unless xml.at_xpath('/xmlns:feed').nil?
- xml.xpath('//xmlns:entry').reverse_each.map { |entry| process_entry(account, entry) }.compact
+
+ update_author(xml, account)
+ process_entries(xml, account)
end
private
- def process_entry(account, entry)
- return unless [:note, :comment, :activity].include? object_type(entry)
-
- status = Status.find_by(uri: activity_id(entry))
+ def update_author(xml, account)
+ return if xml.at_xpath('/xmlns:feed').nil?
+ UpdateRemoteProfileService.new.call(xml.at_xpath('/xmlns:feed/xmlns:author'), account)
+ end
- # If we already have a post and the verb is now "delete", we gotta delete it and move on!
- if !status.nil? && verb(entry) == :delete
- delete_post!(status)
- return
- end
+ def process_entries(xml, account)
+ xml.xpath('//xmlns:entry').reverse_each.map { |entry| ProcessEntry.new.call(entry, account) }.compact
+ end
- return unless status.nil?
+ class ProcessEntry
+ def call(xml, account)
+ @account = account
+ @xml = xml
- status = Status.new(uri: activity_id(entry), url: activity_link(entry), account: account, text: content(entry), created_at: published(entry), updated_at: updated(entry))
+ return if skip_unsupported_type?
- if verb(entry) == :share
- add_reblog!(entry, status)
- elsif verb(entry) == :post
- if thread_id(entry).nil?
- add_post!(entry, status)
- else
- add_reply!(entry, status)
+ case verb
+ when :post, :share
+ return create_status
+ when :delete
+ return delete_status
end
- else
- return
end
- # If we added a status, go through accounts it mentions and create respective relations
- # Also record all media attachments for the status and for the reblogged status if present
- unless status.new_record?
- record_remote_mentions(status, entry.xpath('./xmlns:link[@rel="mentioned"]'))
- record_remote_mentions(status.reblog, entry.at_xpath('./activity:object', activity: ACTIVITY_NS).xpath('./xmlns:link[@rel="mentioned"]')) if status.reblog?
+ private
- if status.reblog?
- ProcessHashtagsService.new.call(status.reblog, entry.at_xpath('./activity:object', activity: ACTIVITY_NS).xpath('./xmlns:category').map { |category| category['term'] })
- else
- ProcessHashtagsService.new.call(status, entry.xpath('./xmlns:category').map { |category| category['term'] })
- end
+ def create_status
+ Rails.logger.debug "Creating remote status #{id}"
+ status = status_from_xml(@xml)
- process_attachments(entry, status)
- process_attachments(entry.xpath('./activity:object', activity: ACTIVITY_NS), status.reblog) if status.reblog?
+ if verb == :share
+ original_status = status_from_xml(xml.at_xpath('.//activity:object', activity: ACTIVITY_NS))
+ status.reblog = original_status
+ end
- Rails.logger.debug "Queuing remote status #{status.id} for distribution"
+ status.save!
+ Rails.logger.debug "Queuing remote status #{status.id} (#{id}) for distribution"
DistributionWorker.perform_async(status.id)
- return status
+ status
end
- end
- def record_remote_mentions(status, links)
- return if status.local?
+ def delete_status
+ Rails.logger.debug "Deleting remote status #{id}"
+ status = Status.find_by(uri: id)
+ RemoveStatusService.new.call(status) unless status.nil?
+ nil
+ end
- # Here we have to do a reverse lookup of local accounts by their URL!
- # It's not pretty at all! I really wish all these protocols sticked to
- # using acct:username@domain only! It would make things so much easier
- # and tidier
+ def skip_unsupported_type?
+ !([:post, :share, :delete].include?(verb) && [:activity, :note, :comment].include?(type))
+ end
- links.each do |mention_link|
- href_val = mention_link.attribute('href').value
+ def status_from_xml(entry)
+ # Return early if status already exists in db
+ status = find_status(id(entry))
+ return status unless status.nil?
+
+ status = Status.create!({
+ uri: id(entry),
+ url: url(entry),
+ account: account?(entry) ? find_or_resolve_account(acct(entry)) : @account,
+ text: content(entry),
+ created_at: published(entry),
+ })
+
+ if thread?(entry)
+ status.thread = find_or_resolve_status(status, *thread(entry))
+ end
- next if href_val == 'http://activityschema.org/collection/public'
+ mentions_from_xml(status, entry)
+ hashtags_from_xml(status, entry)
+ media_from_xml(status, entry)
- href = Addressable::URI.parse(href_val)
+ status
+ end
- if TagManager.instance.local_domain?(href.host)
- # A local user is mentioned
- mentioned_account = Account.find_local(href.path.gsub('/users/', ''))
+ def find_or_resolve_account(acct)
+ FollowRemoteAccountService.new.call(acct)
+ end
- unless mentioned_account.nil?
- mentioned_account.mentions.where(status: status).first_or_create(status: status)
- NotificationMailer.mention(mentioned_account, status).deliver_later unless mentioned_account.blocking?(status.account)
- end
- else
- # What to do about remote user?
- # This is kinda dodgy because URLs could change, we don't index them
- mentioned_account = Account.find_by(url: href.to_s)
+ def find_or_resolve_status(parent, uri, url)
+ status = find_status(uri)
+ ThreadResolveWorker.perform_async(parent.id, url) if status.nil?
- if mentioned_account.nil?
- mentioned_account = FetchRemoteAccountService.new.call(href)
- end
+ status
+ end
- unless mentioned_account.nil?
- mentioned_account.mentions.where(status: status).first_or_create(status: status)
- end
+ def find_status(uri)
+ if TagManager.instance.local_id?(uri)
+ local_id = TagManager.instance.unique_tag_to_local_id(uri, 'Status')
+ return Status.find(local_id)
end
+
+ Status.find_by(uri: uri)
end
- end
- def process_attachments(entry, status)
- return if status.local?
+ def mentions_from_xml(parent, xml)
+ processed_account_ids = []
- entry.xpath('./xmlns:link[@rel="enclosure"]').each do |enclosure_link|
- next if enclosure_link.attribute('href').nil?
+ xml.xpath('./xmlns:link[@rel="mentioned"]').each do |link|
+ next if link['href'] == 'http://activityschema.org/collection/public'
- media = MediaAttachment.where(status: status, remote_url: enclosure_link.attribute('href').value).first
+ url = Addressable::URI.parse(link['href'])
- next unless media.nil?
+ mentioned_account = if TagManager.instance.local_domain?(url.host)
+ Account.find_local(url.path.gsub('/users/', ''))
+ else
+ Account.find_by(url: link['href']) || FetchRemoteAccountService.new.call(link['href'])
+ end
- begin
- media = MediaAttachment.new(account: status.account, status: status, remote_url: enclosure_link.attribute('href').value)
- media.file_remote_url = enclosure_link.attribute('href').value
- media.save
- rescue Paperclip::Errors::NotIdentifiedByImageMagickError
- Rails.logger.debug "Error saving attachment from #{enclosure_link.attribute('href').value}"
- next
- end
- end
- end
+ next if mentioned_account.nil? || processed_account_ids.include?(mentioned_account.id)
- def add_post!(_entry, status)
- status.save!
- end
+ if mentioned_account.local?
+ # Send notifications
+ NotificationMailer.mention(mentioned_account, parent).deliver_later unless mentioned_account.blocking?(parent.account)
+ end
- def add_reblog!(entry, status)
- status.reblog = find_original_status(entry, target_id(entry))
+ mentioned_account.mentions.where(status: parent).first_or_create(status: parent)
- if status.reblog.nil?
- status.reblog = fetch_remote_status(entry)
- end
-
- if !status.reblog.nil?
- status.save!
- NotificationMailer.reblog(status.reblog, status.account).deliver_later if status.reblog.local? && !status.reblog.account.blocking?(status.account)
+ # So we can skip duplicate mentions
+ processed_account_ids << mentioned_account.id
+ end
end
- end
- def add_reply!(entry, status)
- status.thread = find_original_status(entry, thread_id(entry))
- status.save!
-
- if status.thread.nil? && !thread_href(entry).nil?
- ThreadResolveWorker.perform_async(status.id, thread_href(entry))
+ def hashtags_from_xml(parent, xml)
+ tags = xml.xpath('./xmlns:category').map { |category| category['term'] }
+ ProcessHashtagsService.new.call(parent, tags)
end
- end
- def delete_post!(status)
- remove_status_service.call(status)
- end
+ def media_from_xml(parent, xml)
+ xml.xpath('./xmlns:link[@rel="enclosure"]').each do |link|
+ next unless link['href']
- def find_original_status(_xml, id)
- return nil if id.nil?
+ media = MediaAttachment.where(status: parent, remote_url: link['href']).first_or_initialize(account: parent.account, status: parent, remote_url: link['href'])
- if TagManager.instance.local_id?(id)
- Status.find(TagManager.instance.unique_tag_to_local_id(id, 'Status'))
- else
- Status.find_by(uri: id)
+ begin
+ media.file_remote_url = link['href']
+ media.save
+ rescue Paperclip::Errors::NotIdentifiedByImageMagickError
+ next
+ end
+ end
end
- end
-
- def fetch_remote_status(xml)
- username = xml.at_xpath('./activity:object', activity: ACTIVITY_NS).at_xpath('./xmlns:author/xmlns:name').content
- url = xml.at_xpath('./activity:object', activity: ACTIVITY_NS).at_xpath('./xmlns:author/xmlns:uri').content
- domain = Addressable::URI.parse(url).host
- account = Account.find_remote(username, domain)
- if account.nil?
- account = follow_remote_account_service.call("#{username}@#{domain}")
+ def id(xml = @xml)
+ xml.at_xpath('./xmlns:id').content
end
- status = Status.new(account: account, uri: target_id(xml), text: target_content(xml), url: target_url(xml), created_at: published(xml), updated_at: updated(xml))
- status.thread = find_original_status(xml, thread_id(xml))
-
- if status.save && status.thread.nil? && !thread_href(xml).nil?
- ThreadResolveWorker.perform_async(status.id, thread_href(xml))
+ def verb(xml = @xml)
+ raw = xml.at_xpath('./activity:verb', activity: ACTIVITY_NS).content
+ raw.gsub('http://activitystrea.ms/schema/1.0/', '').gsub('http://ostatus.org/schema/1.0/', '').to_sym
+ rescue
+ :post
end
- status
- rescue Goldfinger::Error, HTTP::Error
- nil
- end
-
- def published(xml)
- xml.at_xpath('./xmlns:published').content
- end
-
- def updated(xml)
- xml.at_xpath('./xmlns:updated').content
- end
-
- def content(xml)
- xml.at_xpath('./xmlns:content').try(:content)
- end
-
- def thread_id(xml)
- xml.at_xpath('./thr:in-reply-to', thr: THREAD_NS).attribute('ref').value
- rescue
- nil
- end
-
- def thread_href(xml)
- xml.at_xpath('./thr:in-reply-to', thr: THREAD_NS).attribute('href').value
- rescue
- nil
- end
-
- def target_id(xml)
- xml.at_xpath('.//activity:object', activity: ACTIVITY_NS).at_xpath('./xmlns:id').content
- rescue
- nil
- end
-
- def activity_id(xml)
- xml.at_xpath('./xmlns:id').content
- end
+ def type(xml = @xml)
+ raw = xml.at_xpath('./activity:object-type', activity: ACTIVITY_NS).content
+ raw.gsub('http://activitystrea.ms/schema/1.0/', '').gsub('http://ostatus.org/schema/1.0/', '').to_sym
+ rescue
+ :activity
+ end
- def activity_link(xml)
- xml.at_xpath('./xmlns:link[@rel="alternate"]').attribute('href').value
- rescue
- ''
- end
+ def url(xml = @xml)
+ link = xml.at_xpath('./xmlns:link[@rel="alternate"]')
+ link['href']
+ end
- def target_content(xml)
- xml.at_xpath('.//activity:object', activity: ACTIVITY_NS).at_xpath('./xmlns:content').content
- end
+ def content(xml = @xml)
+ xml.at_xpath('./xmlns:content').content
+ end
- def target_url(xml)
- xml.at_xpath('.//activity:object', activity: ACTIVITY_NS).at_xpath('./xmlns:link[@rel="alternate"]').attribute('href').value
- end
+ def published(xml = @xml)
+ xml.at_xpath('./xmlns:published').content
+ end
- def object_type(xml)
- xml.at_xpath('./activity:object-type', activity: ACTIVITY_NS).content.gsub('http://activitystrea.ms/schema/1.0/', '').gsub('http://ostatus.org/schema/1.0/', '').to_sym
- rescue
- :activity
- end
+ def thread?(xml = @xml)
+ !xml.at_xpath('./thr:in-reply-to', thr: THREAD_NS).nil?
+ end
- def verb(xml)
- xml.at_xpath('./activity:verb', activity: ACTIVITY_NS).content.gsub('http://activitystrea.ms/schema/1.0/', '').gsub('http://ostatus.org/schema/1.0/', '').to_sym
- rescue
- :post
- end
+ def thread(xml = @xml)
+ thr = xml.at_xpath('./thr:in-reply-to', thr: THREAD_NS)
+ [thr['ref'], thr['href']]
+ end
- def follow_remote_account_service
- @follow_remote_account_service ||= FollowRemoteAccountService.new
- end
+ def account?(xml = @xml)
+ !xml.at_xpath('./xmlns:author').nil?
+ end
- def update_remote_profile_service
- @update_remote_profile_service ||= UpdateRemoteProfileService.new
- end
+ def acct(xml = @xml)
+ username = xml.at_xpath('./xmlns:author/xmlns:name').content
+ url = xml.at_xpath('./xmlns:author/xmlns:uri').content
+ domain = Addressable::URI.parse(url).host
- def remove_status_service
- @remove_status_service ||= RemoveStatusService.new
+ "#{username}@#{domain}"
+ end
end
end