twitter_stream_agent.rb 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. module Agents
  2. class TwitterStreamAgent < Agent
  3. include TwitterConcern
  4. include LongRunnable
  5. cannot_receive_events!
  6. description <<-MD
  7. The Twitter Stream Agent follows the Twitter stream in real time, watching for certain keywords, or filters, that you provide.
  8. #{twitter_dependencies_missing if dependencies_missing?}
  9. To follow the Twitter stream, provide an array of `filters`. Multiple words in a filter must all show up in a tweet, but are independent of order.
  10. If you provide an array instead of a filter, the first entry will be considered primary and any additional values will be treated as aliases.
  11. To be able to use this Agent you need to authenticate with Twitter in the [Services](/services) section first.
  12. Set `expected_update_period_in_days` to the maximum amount of time that you'd expect to pass between Events being created by this Agent.
  13. `generate` should be either `events` or `counts`. If set to `counts`, it will output event summaries whenever the Agent is scheduled.
  14. MD
  15. event_description <<-MD
  16. When in `counts` mode, TwitterStreamAgent events look like:
  17. {
  18. "filter": "hello world",
  19. "count": 25,
  20. "time": 3456785456
  21. }
  22. When in `events` mode, TwitterStreamAgent events look like:
  23. {
  24. "filter": "selectorgadget",
  25. ... every Tweet field, including ...
  26. "text": "something",
  27. "user": {
  28. "name": "Mr. Someone",
  29. "screen_name": "Someone",
  30. "location": "Vancouver BC Canada",
  31. "description": "...",
  32. "followers_count": 486,
  33. "friends_count": 1983,
  34. "created_at": "Mon Aug 29 23:38:14 +0000 2011",
  35. "time_zone": "Pacific Time (US & Canada)",
  36. "statuses_count": 3807,
  37. "lang": "en"
  38. },
  39. "retweet_count": 0,
  40. "entities": ...
  41. "lang": "en"
  42. }
  43. MD
  44. default_schedule "11pm"
  45. def validate_options
  46. unless options['filters'].present? &&
  47. options['expected_update_period_in_days'].present? &&
  48. options['generate'].present?
  49. errors.add(:base, "expected_update_period_in_days, generate, and filters are required fields")
  50. end
  51. end
  52. def working?
  53. event_created_within?(interpolated['expected_update_period_in_days']) && !recent_error_logs?
  54. end
  55. def default_options
  56. {
  57. 'filters' => %w[keyword1 keyword2],
  58. 'expected_update_period_in_days' => "2",
  59. 'generate' => "events"
  60. }
  61. end
  62. def process_tweet(filter, status)
  63. filter = lookup_filter(filter)
  64. if filter
  65. if interpolated['generate'] == "counts"
  66. # Avoid memory pollution by reloading the Agent.
  67. agent = Agent.find(id)
  68. agent.memory['filter_counts'] ||= {}
  69. agent.memory['filter_counts'][filter] ||= 0
  70. agent.memory['filter_counts'][filter] += 1
  71. remove_unused_keys!(agent, 'filter_counts')
  72. agent.save!
  73. else
  74. create_event :payload => status.merge('filter' => filter)
  75. end
  76. end
  77. end
  78. def check
  79. if interpolated['generate'] == "counts" && memory['filter_counts'] && memory['filter_counts'].length > 0
  80. memory['filter_counts'].each do |filter, count|
  81. create_event :payload => { 'filter' => filter, 'count' => count, 'time' => Time.now.to_i }
  82. end
  83. end
  84. memory['filter_counts'] = {}
  85. end
  86. protected
  87. def lookup_filter(filter)
  88. interpolated['filters'].each do |known_filter|
  89. if known_filter == filter
  90. return filter
  91. elsif known_filter.is_a?(Array)
  92. if known_filter.include?(filter)
  93. return known_filter.first
  94. end
  95. end
  96. end
  97. end
  98. def remove_unused_keys!(agent, base)
  99. if agent.memory[base]
  100. (agent.memory[base].keys - agent.interpolated['filters'].map {|f| f.is_a?(Array) ? f.first.to_s : f.to_s }).each do |removed_key|
  101. agent.memory[base].delete(removed_key)
  102. end
  103. end
  104. end
  105. def self.setup_worker
  106. Agents::TwitterStreamAgent.active.order(:id).group_by { |agent| agent.twitter_oauth_token }.map do |oauth_token, agents|
  107. if Agents::TwitterStreamAgent.dependencies_missing?
  108. STDERR.puts Agents::TwitterStreamAgent.twitter_dependencies_missing
  109. STDERR.flush
  110. return false
  111. end
  112. filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m }
  113. agents.each do |agent|
  114. agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter|
  115. filter_to_agent_map[filter] << agent
  116. end
  117. end
  118. config_hash = filter_to_agent_map.map { |k, v| [k, v.map(&:id)] }
  119. config_hash.push(oauth_token)
  120. Worker.new(id: agents.first.worker_id(config_hash),
  121. config: {filter_to_agent_map: filter_to_agent_map},
  122. agent: agents.first)
  123. end
  124. end
  125. class Worker < LongRunnable::Worker
  126. RELOAD_TIMEOUT = 60.minutes
  127. DUPLICATE_DETECTION_LENGTH = 1000
  128. SEPARATOR = /[^\w_\-]+/
  129. def setup
  130. require 'twitter/json_stream'
  131. @filter_to_agent_map = @config[:filter_to_agent_map]
  132. end
  133. def run
  134. @recent_tweets = []
  135. EventMachine.run do
  136. EventMachine.add_periodic_timer(RELOAD_TIMEOUT) do
  137. restart!
  138. end
  139. stream!(@filter_to_agent_map.keys, @agent) do |status|
  140. handle_status(status)
  141. end
  142. end
  143. Thread.stop
  144. end
  145. def stop
  146. EventMachine.stop_event_loop if EventMachine.reactor_running?
  147. terminate_thread!
  148. end
  149. private
  150. def stream!(filters, agent, &block)
  151. filters = filters.map(&:downcase).uniq
  152. stream = Twitter::JSONStream.connect(
  153. :path => "/1.1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}",
  154. :ssl => true,
  155. :oauth => {
  156. :consumer_key => agent.twitter_consumer_key,
  157. :consumer_secret => agent.twitter_consumer_secret,
  158. :access_key => agent.twitter_oauth_token,
  159. :access_secret => agent.twitter_oauth_token_secret
  160. }
  161. )
  162. stream.each_item do |status|
  163. block.call(status)
  164. end
  165. stream.on_error do |message|
  166. STDERR.puts " --> Twitter error: #{message} at #{Time.now} <--"
  167. STDERR.puts " --> Sleeping for 15 seconds"
  168. sleep 15
  169. restart!
  170. end
  171. stream.on_no_data do |message|
  172. STDERR.puts " --> Got no data for awhile; trying to reconnect at #{Time.now} <--"
  173. restart!
  174. end
  175. stream.on_max_reconnects do |timeout, retries|
  176. STDERR.puts " --> Oops, tried too many times! at #{Time.now} <--"
  177. sleep 60
  178. restart!
  179. end
  180. end
  181. def handle_status(status)
  182. status = JSON.parse(status) if status.is_a?(String)
  183. return unless status
  184. return if status.has_key?('delete')
  185. return unless status['text']
  186. status['text'] = status['text'].gsub(/&lt;/, "<").gsub(/&gt;/, ">").gsub(/[\t\n\r]/, ' ')
  187. if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash)
  188. return
  189. elsif @recent_tweets.include?(status["id_str"])
  190. puts "(#{Time.now}) Skipping duplicate tweet: #{status["text"]}"
  191. return
  192. end
  193. @recent_tweets << status["id_str"]
  194. @recent_tweets.shift if @recent_tweets.length > DUPLICATE_DETECTION_LENGTH
  195. @filter_to_agent_map.keys.each do |filter|
  196. next unless (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
  197. @filter_to_agent_map[filter].each do |agent|
  198. puts "(#{Time.now}) #{agent.name} received: #{status["text"]}"
  199. AgentRunner.with_connection do
  200. agent.process_tweet(filter, status)
  201. end
  202. end
  203. end
  204. end
  205. end
  206. end
  207. end