require 'cgi' require 'json' require 'em-http-request' require 'pp' class TwitterStream def initialize @running = true end def stop @running = false end def stream!(filters, agent, &block) filters = filters.map(&:downcase).uniq stream = Twitter::JSONStream.connect( :path => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}", :ssl => true, :oauth => { :consumer_key => agent.twitter_consumer_key, :consumer_secret => agent.twitter_consumer_secret, :access_key => agent.twitter_oauth_token, :access_secret => agent.twitter_oauth_token_secret } ) stream.each_item do |status| status = JSON.parse(status) if status.is_a?(String) next unless status next if status.has_key?('delete') next unless status['text'] status['text'] = status['text'].gsub(/</, "<").gsub(/>/, ">").gsub(/[\t\n\r]/, ' ') block.call(status) end stream.on_error do |message| STDERR.puts " --> Twitter error: #{message} <--" end stream.on_no_data do |message| STDERR.puts " --> Got no data for awhile; trying to reconnect." EventMachine::stop_event_loop end stream.on_max_reconnects do |timeout, retries| STDERR.puts " --> Oops, tried too many times! <--" EventMachine::stop_event_loop end end def load_and_run(agents, recent_tweets) agents.group_by { |agent| agent.twitter_oauth_token }.each do |oauth_token, agents| filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m } agents.each do |agent| agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter| filter_to_agent_map[filter] << agent end end stream!(filter_to_agent_map.keys, agents.first) do |status| if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash) # skip retweets elsif recent_tweets.include?(status["id_str"]) puts "Skipping duplicate tweet: #{status["text"]}" else recent_tweets << status["id_str"] recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH puts status["text"] filter_to_agent_map.keys.each do |filter| if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson filter_to_agent_map[filter].each do |agent| puts " -> #{agent.name}" agent.process_tweet(filter, status) end end end end end end end RELOAD_TIMEOUT = 10.minutes DUPLICATE_DETECTION_LENGTH = 5000 SEPARATOR = /[^\w_\-]+/ def run if Agents::TwitterStreamAgent.dependencies_missing? STDERR.puts Agents::TwitterStreamAgent.twitter_dependencies_missing STDERR.flush return end require 'twitter/json_stream' recent_tweets = [] while @running begin agents = Agents::TwitterStreamAgent.active.all EventMachine::run do EventMachine.add_periodic_timer(1) { EventMachine::stop_event_loop if !@running } EventMachine.add_periodic_timer(RELOAD_TIMEOUT) { puts "Reloading EventMachine and all Agents..." EventMachine::stop_event_loop } if agents.length == 0 puts "No agents found. Will look again in a minute." EventMachine.add_timer(60) { EventMachine::stop_event_loop } else puts "Found #{agents.length} agent(s). Loading them now..." load_and_run agents, recent_tweets end end rescue SignalException, SystemExit @running = false EventMachine::stop_event_loop if EventMachine.reactor_running? rescue StandardError => e STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n" STDERR.puts "Waiting for a couple of minutes..." sleep 120 end end end end