123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- module Agents
- class TwitterStreamAgent < Agent
- include TwitterConcern
- include LongRunnable
- cannot_receive_events!
- description <<~MD
- The Twitter Stream Agent follows the Twitter stream in real time, watching for certain keywords, or filters, that you provide.
- #{twitter_dependencies_missing if dependencies_missing?}
- To follow the Twitter stream, provide an array of `filters`. Multiple words in a filter must all show up in a tweet, but are independent of order.
- If you provide an array instead of a filter, the first entry will be considered primary and any additional values will be treated as aliases.
- To be able to use this Agent you need to authenticate with Twitter in the [Services](/services) section first.
- Set `include_retweets` to `true` to not include retweets (default: `false`)
- Set `expected_update_period_in_days` to the maximum amount of time that you'd expect to pass between Events being created by this Agent.
- `generate` should be either `events` or `counts`. If set to `counts`, it will output event summaries whenever the Agent is scheduled.
- MD
- event_description <<~MD
- When in `counts` mode, TwitterStreamAgent events look like:
- {
- "filter": "hello world",
- "count": 25,
- "time": 3456785456
- }
- When in `events` mode, TwitterStreamAgent events look like:
- #{
- tweet_event_description('text', <<~MD)
- "filter": "selectorgadget",
- MD
- }
- MD
- default_schedule "11pm"
- def validate_options
- unless options[:filters].present? &&
- options[:expected_update_period_in_days].present? &&
- options[:generate].present?
- errors.add(:base, "expected_update_period_in_days, generate, and filters are required fields")
- end
- if options[:include_retweets].present? && boolify(options[:include_retweets]).nil?
- errors.add(:base, "include_retweets must be a boolean value")
- end
- end
- def working?
- event_created_within?(interpolated[:expected_update_period_in_days]) && !recent_error_logs?
- end
- def default_options
- {
- 'filters' => %w[keyword1 keyword2],
- 'include_retweets' => false,
- 'expected_update_period_in_days' => "2",
- 'generate' => "events"
- }
- end
- def process_tweet(filter, status)
- filter = lookup_filter(filter)
- if filter
- if interpolated[:generate] == "counts"
- # Avoid memory pollution by reloading the Agent.
- agent = Agent.find(id)
- agent.memory[:filter_counts] ||= {}
- agent.memory[:filter_counts][filter] ||= 0
- agent.memory[:filter_counts][filter] += 1
- remove_unused_keys!(agent, 'filter_counts')
- agent.save!
- else
- create_event payload: status.merge('filter' => filter)
- end
- end
- end
- def check
- if interpolated[:generate] == "counts" && memory[:filter_counts].present?
- memory[:filter_counts].each do |filter, count|
- create_event payload: { 'filter' => filter, 'count' => count, 'time' => Time.now.to_i }
- end
- end
- memory[:filter_counts] = {}
- end
- protected
- def lookup_filter(filter)
- interpolated[:filters].each do |known_filter|
- if known_filter == filter
- return filter
- elsif known_filter.is_a?(Array)
- if known_filter.include?(filter)
- return known_filter.first
- end
- end
- end
- end
- def remove_unused_keys!(agent, base)
- if agent.memory[base]
- (
- agent.memory[base].keys - agent.interpolated[:filters].map { |f|
- f.is_a?(Array) ? f.first.to_s : f.to_s
- }
- ).each do |removed_key|
- agent.memory[base].delete(removed_key)
- end
- end
- end
- def self.setup_worker
- Agents::TwitterStreamAgent.active.order(:id).group_by { |agent|
- agent.twitter_oauth_token
- }.map do |oauth_token, agents|
- if Agents::TwitterStreamAgent.dependencies_missing?
- warn Agents::TwitterStreamAgent.twitter_dependencies_missing
- STDERR.flush
- return false
- end
- filter_to_agent_map =
- agents.map { |agent|
- agent.options[:filters]
- }.flatten.uniq.compact.map(&:strip).each_with_object({}) { |f, m|
- m[f] = []
- }
- agents.each do |agent|
- agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter|
- filter_to_agent_map[filter] << agent
- end
- end
- config_hash = filter_to_agent_map.map { |k, v| [k, v.map(&:id)] }
- config_hash.push(oauth_token)
- Worker.new(id: agents.first.worker_id(config_hash),
- config: { filter_to_agent_map: },
- agent: agents.first)
- end
- end
- class Worker < LongRunnable::Worker
- RELOAD_TIMEOUT = 60.minutes
- DUPLICATE_DETECTION_LENGTH = 1000
- SEPARATOR = /[^\w-]+/
- def setup
- require 'twitter/json_stream'
- @filter_to_agent_map = @config[:filter_to_agent_map]
- end
- def run
- @recent_tweets = []
- EventMachine.run do
- EventMachine.add_periodic_timer(RELOAD_TIMEOUT) do
- restart!
- end
- stream!(@filter_to_agent_map.keys, @agent) do |status|
- handle_status(status)
- end
- end
- Thread.stop
- end
- def stop
- EventMachine.stop_event_loop if EventMachine.reactor_running?
- terminate_thread!
- end
- private
- def stream!(filters, agent, &block)
- track = filters.map(&:downcase).uniq.join(",")
- path =
- if track.present?
- "/1.1/statuses/filter.json?#{{ track: }.to_param}"
- else
- "/1.1/statuses/sample.json"
- end
- stream = Twitter::JSONStream.connect(
- path:,
- ssl: true,
- oauth: {
- consumer_key: agent.twitter_consumer_key,
- consumer_secret: agent.twitter_consumer_secret,
- access_key: agent.twitter_oauth_token,
- access_secret: agent.twitter_oauth_token_secret
- }
- )
- stream.each_item(&block)
- stream.on_error do |message|
- warn " --> Twitter error: #{message} at #{Time.now} <--"
- warn " --> Sleeping for 15 seconds"
- sleep 15
- restart!
- end
- stream.on_no_data do |_message|
- warn " --> Got no data for awhile; trying to reconnect at #{Time.now} <--"
- restart!
- end
- stream.on_max_reconnects do |_timeout, _retries|
- warn " --> Oops, tried too many times! at #{Time.now} <--"
- sleep 60
- restart!
- end
- end
- def handle_status(status)
- status = JSON.parse(status, symbolize_names: true) if status.is_a?(String)
- status = TwitterConcern.format_tweet(status)
- return unless status && status[:text] && !status.has_key?(:delete)
- if status[:retweeted_status] && !boolify(agent.options[:include_retweets])
- return
- elsif @recent_tweets.include?(status[:id_str])
- puts "(#{Time.now}) Skipping duplicate tweet: #{status[:text]}"
- return
- end
- @recent_tweets << status[:id_str]
- @recent_tweets.shift if @recent_tweets.length > DUPLICATE_DETECTION_LENGTH
- @filter_to_agent_map.keys.each do |filter|
- next unless (filter.downcase.split(SEPARATOR) - status[:text].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
- @filter_to_agent_map[filter].each do |agent|
- puts "(#{Time.now}) #{agent.name} received: #{status[:text]}"
- AgentRunner.with_connection do
- agent.process_tweet(filter, status)
- end
- end
- end
- end
- end
- end
- end
|