123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- require 'cgi'
- require 'json'
- require 'em-http-request'
- require 'pp'
- class TwitterStream
- def initialize
- @running = true
- end
- def stop
- @running = false
- end
- def stream!(filters, agent, &block)
- filters = filters.map(&:downcase).uniq
- stream = Twitter::JSONStream.connect(
- :path => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}",
- :ssl => true,
- :oauth => {
- :consumer_key => agent.twitter_consumer_key,
- :consumer_secret => agent.twitter_consumer_secret,
- :access_key => agent.twitter_oauth_token,
- :access_secret => agent.twitter_oauth_token_secret
- }
- )
- stream.each_item do |status|
- status = JSON.parse(status) if status.is_a?(String)
- next unless status
- next if status.has_key?('delete')
- next unless status['text']
- status['text'] = status['text'].gsub(/</, "<").gsub(/>/, ">").gsub(/[\t\n\r]/, ' ')
- block.call(status)
- end
- stream.on_error do |message|
- STDERR.puts " --> Twitter error: #{message} <--"
- end
- stream.on_no_data do |message|
- STDERR.puts " --> Got no data for awhile; trying to reconnect."
- EventMachine::stop_event_loop
- end
- stream.on_max_reconnects do |timeout, retries|
- STDERR.puts " --> Oops, tried too many times! <--"
- EventMachine::stop_event_loop
- end
- end
- def load_and_run(agents, recent_tweets)
- agents.group_by { |agent| agent.twitter_oauth_token }.each do |oauth_token, agents|
- filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m }
- agents.each do |agent|
- agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter|
- filter_to_agent_map[filter] << agent
- end
- end
- stream!(filter_to_agent_map.keys, agents.first) do |status|
- if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash)
- # skip retweets
- elsif recent_tweets.include?(status["id_str"])
- puts "Skipping duplicate tweet: #{status["text"]}"
- else
- recent_tweets << status["id_str"]
- recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH
- puts status["text"]
- filter_to_agent_map.keys.each do |filter|
- if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
- filter_to_agent_map[filter].each do |agent|
- puts " -> #{agent.name}"
- agent.process_tweet(filter, status)
- end
- end
- end
- end
- end
- end
- end
- RELOAD_TIMEOUT = 10.minutes
- DUPLICATE_DETECTION_LENGTH = 5000
- SEPARATOR = /[^\w_\-]+/
- def run
- if Agents::TwitterStreamAgent.dependencies_missing?
- STDERR.puts Agents::TwitterStreamAgent.twitter_dependencies_missing
- STDERR.flush
- return
- end
- require 'twitter/json_stream'
- recent_tweets = []
- while @running
- begin
- agents = Agents::TwitterStreamAgent.active.all
- EventMachine::run do
- EventMachine.add_periodic_timer(1) {
- EventMachine::stop_event_loop if !@running
- }
- EventMachine.add_periodic_timer(RELOAD_TIMEOUT) {
- puts "Reloading EventMachine and all Agents..."
- EventMachine::stop_event_loop
- }
- if agents.length == 0
- puts "No agents found. Will look again in a minute."
- EventMachine.add_timer(60) {
- EventMachine::stop_event_loop
- }
- else
- puts "Found #{agents.length} agent(s). Loading them now..."
- load_and_run agents, recent_tweets
- end
- end
- rescue SignalException, SystemExit
- @running = false
- EventMachine::stop_event_loop if EventMachine.reactor_running?
- rescue StandardError => e
- STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n"
- STDERR.puts "Waiting for a couple of minutes..."
- sleep 120
- end
- end
- end
- end
|