twitter_stream_agent.rb 7.8 KB


  1. module Agents
  2. class TwitterStreamAgent < Agent
  3. include TwitterConcern
  4. include LongRunnable
  5. cannot_receive_events!
  6. description <<~MD
  7. The Twitter Stream Agent follows the Twitter stream in real time, watching for certain keywords, or filters, that you provide.
  8. #{twitter_dependencies_missing if dependencies_missing?}
  9. To follow the Twitter stream, provide an array of `filters`. Multiple words in a filter must all show up in a tweet, but are independent of order.
  10. If you provide an array instead of a filter, the first entry will be considered primary and any additional values will be treated as aliases.
  11. To be able to use this Agent you need to authenticate with Twitter in the [Services](/services) section first.
  12. Set `include_retweets` to `true` to not include retweets (default: `false`)
  13. Set `expected_update_period_in_days` to the maximum amount of time that you'd expect to pass between Events being created by this Agent.
  14. `generate` should be either `events` or `counts`. If set to `counts`, it will output event summaries whenever the Agent is scheduled.
  15. MD
  16. event_description <<~MD
  17. When in `counts` mode, TwitterStreamAgent events look like:
  18. {
  19. "filter": "hello world",
  20. "count": 25,
  21. "time": 3456785456
  22. }
  23. When in `events` mode, TwitterStreamAgent events look like:
  24. #{
  25. tweet_event_description('text', <<~MD)
  26. "filter": "selectorgadget",
  27. MD
  28. }
  29. MD
  30. default_schedule "11pm"
  31. def validate_options
  32. unless options[:filters].present? &&
  33. options[:expected_update_period_in_days].present? &&
  34. options[:generate].present?
  35. errors.add(:base, "expected_update_period_in_days, generate, and filters are required fields")
  36. end
  37. if options[:include_retweets].present? && boolify(options[:include_retweets]).nil?
  38. errors.add(:base, "include_retweets must be a boolean value")
  39. end
  40. end
  41. def working?
  42. event_created_within?(interpolated[:expected_update_period_in_days]) && !recent_error_logs?
  43. end
  44. def default_options
  45. {
  46. 'filters' => %w[keyword1 keyword2],
  47. 'include_retweets' => false,
  48. 'expected_update_period_in_days' => "2",
  49. 'generate' => "events"
  50. }
  51. end
  52. def process_tweet(filter, status)
  53. filter = lookup_filter(filter)
  54. if filter
  55. if interpolated[:generate] == "counts"
  56. # Avoid memory pollution by reloading the Agent.
  57. agent = Agent.find(id)
  58. agent.memory[:filter_counts] ||= {}
  59. agent.memory[:filter_counts][filter] ||= 0
  60. agent.memory[:filter_counts][filter] += 1
  61. remove_unused_keys!(agent, 'filter_counts')
  62. agent.save!
  63. else
  64. create_event payload: status.merge('filter' => filter)
  65. end
  66. end
  67. end
  68. def check
  69. if interpolated[:generate] == "counts" && memory[:filter_counts].present?
  70. memory[:filter_counts].each do |filter, count|
  71. create_event payload: { 'filter' => filter, 'count' => count, 'time' => Time.now.to_i }
  72. end
  73. end
  74. memory[:filter_counts] = {}
  75. end
  76. protected
  77. def lookup_filter(filter)
  78. interpolated[:filters].each do |known_filter|
  79. if known_filter == filter
  80. return filter
  81. elsif known_filter.is_a?(Array)
  82. if known_filter.include?(filter)
  83. return known_filter.first
  84. end
  85. end
  86. end
  87. end
  88. def remove_unused_keys!(agent, base)
  89. if agent.memory[base]
  90. (
  91. agent.memory[base].keys - agent.interpolated[:filters].map { |f|
  92. f.is_a?(Array) ? f.first.to_s : f.to_s
  93. }
  94. ).each do |removed_key|
  95. agent.memory[base].delete(removed_key)
  96. end
  97. end
  98. end
  99. def self.setup_worker
  100. Agents::TwitterStreamAgent.active.order(:id).group_by { |agent|
  101. agent.twitter_oauth_token
  102. }.map do |oauth_token, agents|
  103. if Agents::TwitterStreamAgent.dependencies_missing?
  104. warn Agents::TwitterStreamAgent.twitter_dependencies_missing
  105. STDERR.flush
  106. return false
  107. end
  108. filter_to_agent_map =
  109. agents.map { |agent|
  110. agent.options[:filters]
  111. }.flatten.uniq.compact.map(&:strip).each_with_object({}) { |f, m|
  112. m[f] = []
  113. }
  114. agents.each do |agent|
  115. agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter|
  116. filter_to_agent_map[filter] << agent
  117. end
  118. end
  119. config_hash = filter_to_agent_map.map { |k, v| [k, v.map(&:id)] }
  120. config_hash.push(oauth_token)
  121. Worker.new(id: agents.first.worker_id(config_hash),
  122. config: { filter_to_agent_map: },
  123. agent: agents.first)
  124. end
  125. end
  126. class Worker < LongRunnable::Worker
  127. RELOAD_TIMEOUT = 60.minutes
  128. DUPLICATE_DETECTION_LENGTH = 1000
  129. SEPARATOR = /[^\w-]+/
  130. def setup
  131. require 'twitter/json_stream'
  132. @filter_to_agent_map = @config[:filter_to_agent_map]
  133. end
  134. def run
  135. @recent_tweets = []
  136. EventMachine.run do
  137. EventMachine.add_periodic_timer(RELOAD_TIMEOUT) do
  138. restart!
  139. end
  140. stream!(@filter_to_agent_map.keys, @agent) do |status|
  141. handle_status(status)
  142. end
  143. end
  144. Thread.stop
  145. end
  146. def stop
  147. EventMachine.stop_event_loop if EventMachine.reactor_running?
  148. terminate_thread!
  149. end
  150. private
  151. def stream!(filters, agent, &block)
  152. track = filters.map(&:downcase).uniq.join(",")
  153. path =
  154. if track.present?
  155. "/1.1/statuses/filter.json?#{{ track: }.to_param}"
  156. else
  157. "/1.1/statuses/sample.json"
  158. end
  159. stream = Twitter::JSONStream.connect(
  160. path:,
  161. ssl: true,
  162. oauth: {
  163. consumer_key: agent.twitter_consumer_key,
  164. consumer_secret: agent.twitter_consumer_secret,
  165. access_key: agent.twitter_oauth_token,
  166. access_secret: agent.twitter_oauth_token_secret
  167. }
  168. )
  169. stream.each_item(&block)
  170. stream.on_error do |message|
  171. warn " --> Twitter error: #{message} at #{Time.now} <--"
  172. warn " --> Sleeping for 15 seconds"
  173. sleep 15
  174. restart!
  175. end
  176. stream.on_no_data do |_message|
  177. warn " --> Got no data for awhile; trying to reconnect at #{Time.now} <--"
  178. restart!
  179. end
  180. stream.on_max_reconnects do |_timeout, _retries|
  181. warn " --> Oops, tried too many times! at #{Time.now} <--"
  182. sleep 60
  183. restart!
  184. end
  185. end
  186. def handle_status(status)
  187. status = JSON.parse(status, symbolize_names: true) if status.is_a?(String)
  188. status = TwitterConcern.format_tweet(status)
  189. return unless status && status[:text] && !status.has_key?(:delete)
  190. if status[:retweeted_status] && !boolify(agent.options[:include_retweets])
  191. return
  192. elsif @recent_tweets.include?(status[:id_str])
  193. puts "(#{Time.now}) Skipping duplicate tweet: #{status[:text]}"
  194. return
  195. end
  196. @recent_tweets << status[:id_str]
  197. @recent_tweets.shift if @recent_tweets.length > DUPLICATE_DETECTION_LENGTH
  198. @filter_to_agent_map.keys.each do |filter|
  199. next unless (filter.downcase.split(SEPARATOR) - status[:text].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
  200. @filter_to_agent_map[filter].each do |agent|
  201. puts "(#{Time.now}) #{agent.name} received: #{status[:text]}"
  202. AgentRunner.with_connection do
  203. agent.process_tweet(filter, status)
  204. end
  205. end
  206. end
  207. end
  208. end
  209. end
  210. end