twitter_stream.rb 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. require 'cgi'
  2. require 'json'
  3. require 'em-http-request'
  4. require 'pp'
  5. class TwitterStream
  6. def initialize
  7. @running = true
  8. end
  9. def stop
  10. @running = false
  11. end
  12. def stream!(filters, agent, &block)
  13. filters = filters.map(&:downcase).uniq
  14. stream = Twitter::JSONStream.connect(
  15. :path => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}",
  16. :ssl => true,
  17. :oauth => {
  18. :consumer_key => agent.twitter_consumer_key,
  19. :consumer_secret => agent.twitter_consumer_secret,
  20. :access_key => agent.twitter_oauth_token,
  21. :access_secret => agent.twitter_oauth_token_secret
  22. }
  23. )
  24. stream.each_item do |status|
  25. status = JSON.parse(status) if status.is_a?(String)
  26. next unless status
  27. next if status.has_key?('delete')
  28. next unless status['text']
  29. status['text'] = status['text'].gsub(/&lt;/, "<").gsub(/&gt;/, ">").gsub(/[\t\n\r]/, ' ')
  30. block.call(status)
  31. end
  32. stream.on_error do |message|
  33. STDERR.puts " --> Twitter error: #{message} <--"
  34. end
  35. stream.on_no_data do |message|
  36. STDERR.puts " --> Got no data for awhile; trying to reconnect."
  37. EventMachine::stop_event_loop
  38. end
  39. stream.on_max_reconnects do |timeout, retries|
  40. STDERR.puts " --> Oops, tried too many times! <--"
  41. EventMachine::stop_event_loop
  42. end
  43. end
  44. def load_and_run(agents, recent_tweets)
  45. agents.group_by { |agent| agent.twitter_oauth_token }.each do |oauth_token, agents|
  46. filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m }
  47. agents.each do |agent|
  48. agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter|
  49. filter_to_agent_map[filter] << agent
  50. end
  51. end
  52. stream!(filter_to_agent_map.keys, agents.first) do |status|
  53. if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash)
  54. # skip retweets
  55. elsif recent_tweets.include?(status["id_str"])
  56. puts "Skipping duplicate tweet: #{status["text"]}"
  57. else
  58. recent_tweets << status["id_str"]
  59. recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH
  60. puts status["text"]
  61. filter_to_agent_map.keys.each do |filter|
  62. if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
  63. filter_to_agent_map[filter].each do |agent|
  64. puts " -> #{agent.name}"
  65. agent.process_tweet(filter, status)
  66. end
  67. end
  68. end
  69. end
  70. end
  71. end
  72. end
  73. RELOAD_TIMEOUT = 10.minutes
  74. DUPLICATE_DETECTION_LENGTH = 5000
  75. SEPARATOR = /[^\w_\-]+/
  76. def run
  77. if Agents::TwitterStreamAgent.dependencies_missing?
  78. STDERR.puts Agents::TwitterStreamAgent.twitter_dependencies_missing
  79. STDERR.flush
  80. return
  81. end
  82. require 'twitter/json_stream'
  83. recent_tweets = []
  84. while @running
  85. begin
  86. agents = Agents::TwitterStreamAgent.active.all
  87. EventMachine::run do
  88. EventMachine.add_periodic_timer(1) {
  89. EventMachine::stop_event_loop if !@running
  90. }
  91. EventMachine.add_periodic_timer(RELOAD_TIMEOUT) {
  92. puts "Reloading EventMachine and all Agents..."
  93. EventMachine::stop_event_loop
  94. }
  95. if agents.length == 0
  96. puts "No agents found. Will look again in a minute."
  97. EventMachine.add_timer(60) {
  98. EventMachine::stop_event_loop
  99. }
  100. else
  101. puts "Found #{agents.length} agent(s). Loading them now..."
  102. load_and_run agents, recent_tweets
  103. end
  104. end
  105. rescue SignalException, SystemExit
  106. @running = false
  107. EventMachine::stop_event_loop if EventMachine.reactor_running?
  108. rescue StandardError => e
  109. STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n"
  110. STDERR.puts "Waiting for a couple of minutes..."
  111. sleep 120
  112. end
  113. end
  114. end
  115. end