module Agents class TwitterStreamAgent < Agent include TwitterConcern include LongRunnable cannot_receive_events! description <<~MD The Twitter Stream Agent follows the Twitter stream in real time, watching for certain keywords, or filters, that you provide. #{twitter_dependencies_missing if dependencies_missing?} To follow the Twitter stream, provide an array of `filters`. Multiple words in a filter must all show up in a tweet, but are independent of order. If you provide an array instead of a filter, the first entry will be considered primary and any additional values will be treated as aliases. To be able to use this Agent you need to authenticate with Twitter in the [Services](/services) section first. Set `include_retweets` to `true` to not include retweets (default: `false`) Set `expected_update_period_in_days` to the maximum amount of time that you'd expect to pass between Events being created by this Agent. `generate` should be either `events` or `counts`. If set to `counts`, it will output event summaries whenever the Agent is scheduled. MD event_description <<~MD When in `counts` mode, TwitterStreamAgent events look like: { "filter": "hello world", "count": 25, "time": 3456785456 } When in `events` mode, TwitterStreamAgent events look like: #{ tweet_event_description('text', <<~MD) "filter": "selectorgadget", MD } MD default_schedule "11pm" def validate_options unless options[:filters].present? && options[:expected_update_period_in_days].present? && options[:generate].present? errors.add(:base, "expected_update_period_in_days, generate, and filters are required fields") end if options[:include_retweets].present? && boolify(options[:include_retweets]).nil? errors.add(:base, "include_retweets must be a boolean value") end end def working? event_created_within?(interpolated[:expected_update_period_in_days]) && !recent_error_logs? end def default_options { 'filters' => %w[keyword1 keyword2], 'include_retweets' => false, 'expected_update_period_in_days' => "2", 'generate' => "events" } end def process_tweet(filter, status) filter = lookup_filter(filter) if filter if interpolated[:generate] == "counts" # Avoid memory pollution by reloading the Agent. agent = Agent.find(id) agent.memory[:filter_counts] ||= {} agent.memory[:filter_counts][filter] ||= 0 agent.memory[:filter_counts][filter] += 1 remove_unused_keys!(agent, 'filter_counts') agent.save! else create_event payload: status.merge('filter' => filter) end end end def check if interpolated[:generate] == "counts" && memory[:filter_counts].present? memory[:filter_counts].each do |filter, count| create_event payload: { 'filter' => filter, 'count' => count, 'time' => Time.now.to_i } end end memory[:filter_counts] = {} end protected def lookup_filter(filter) interpolated[:filters].each do |known_filter| if known_filter == filter return filter elsif known_filter.is_a?(Array) if known_filter.include?(filter) return known_filter.first end end end end def remove_unused_keys!(agent, base) if agent.memory[base] ( agent.memory[base].keys - agent.interpolated[:filters].map { |f| f.is_a?(Array) ? f.first.to_s : f.to_s } ).each do |removed_key| agent.memory[base].delete(removed_key) end end end def self.setup_worker Agents::TwitterStreamAgent.active.order(:id).group_by { |agent| agent.twitter_oauth_token }.map do |oauth_token, agents| if Agents::TwitterStreamAgent.dependencies_missing? warn Agents::TwitterStreamAgent.twitter_dependencies_missing STDERR.flush return false end filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).each_with_object({}) { |f, m| m[f] = [] } agents.each do |agent| agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter| filter_to_agent_map[filter] << agent end end config_hash = filter_to_agent_map.map { |k, v| [k, v.map(&:id)] } config_hash.push(oauth_token) Worker.new(id: agents.first.worker_id(config_hash), config: { filter_to_agent_map: }, agent: agents.first) end end class Worker < LongRunnable::Worker RELOAD_TIMEOUT = 60.minutes DUPLICATE_DETECTION_LENGTH = 1000 SEPARATOR = /[^\w-]+/ def setup require 'twitter/json_stream' @filter_to_agent_map = @config[:filter_to_agent_map] end def run @recent_tweets = [] EventMachine.run do EventMachine.add_periodic_timer(RELOAD_TIMEOUT) do restart! end stream!(@filter_to_agent_map.keys, @agent) do |status| handle_status(status) end end Thread.stop end def stop EventMachine.stop_event_loop if EventMachine.reactor_running? terminate_thread! end private def stream!(filters, agent, &block) track = filters.map(&:downcase).uniq.join(",") path = if track.present? "/1.1/statuses/filter.json?#{{ track: }.to_param}" else "/1.1/statuses/sample.json" end stream = Twitter::JSONStream.connect( path:, ssl: true, oauth: { consumer_key: agent.twitter_consumer_key, consumer_secret: agent.twitter_consumer_secret, access_key: agent.twitter_oauth_token, access_secret: agent.twitter_oauth_token_secret } ) stream.each_item(&block) stream.on_error do |message| warn " --> Twitter error: #{message} at #{Time.now} <--" warn " --> Sleeping for 15 seconds" sleep 15 restart! end stream.on_no_data do |_message| warn " --> Got no data for awhile; trying to reconnect at #{Time.now} <--" restart! end stream.on_max_reconnects do |_timeout, _retries| warn " --> Oops, tried too many times! at #{Time.now} <--" sleep 60 restart! end end def handle_status(status) status = JSON.parse(status, symbolize_names: true) if status.is_a?(String) status = TwitterConcern.format_tweet(status) return unless status && status[:text] && !status.has_key?(:delete) if status[:retweeted_status] && !boolify(agent.options[:include_retweets]) return elsif @recent_tweets.include?(status[:id_str]) puts "(#{Time.now}) Skipping duplicate tweet: #{status[:text]}" return end @recent_tweets << status[:id_str] @recent_tweets.shift if @recent_tweets.length > DUPLICATE_DETECTION_LENGTH @filter_to_agent_map.keys.each do |filter| next unless (filter.downcase.split(SEPARATOR) - status[:text].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson @filter_to_agent_map[filter].each do |agent| puts "(#{Time.now}) #{agent.name} received: #{status[:text]}" AgentRunner.with_connection do agent.process_tweet(filter, status) end end end end end end end