twitter_stream_agent.rb 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. module Agents
  2. class TwitterStreamAgent < Agent
  3. include TwitterConcern
  4. cannot_receive_events!
  5. description <<-MD
  6. The TwitterStreamAgent follows the Twitter stream in real time, watching for certain keywords, or filters, that you provide.
  7. To follow the Twitter stream, provide an array of `filters`. Multiple words in a filter must all show up in a tweet, but are independent of order.
  8. If you provide an array instead of a filter, the first entry will be considered primary and any additional values will be treated as aliases.
  9. Twitter credentials must be supplied as either [credentials](/user_credentials) called
  10. `twitter_consumer_key`, `twitter_consumer_secret`, `twitter_oauth_token`, and `twitter_oauth_token_secret`,
  11. or as options to this Agent called `consumer_key`, `consumer_secret`, `oauth_token`, and `oauth_token_secret`.
  12. To get oAuth credentials for Twitter, [follow these instructions](https://github.com/cantino/huginn/wiki/Getting-a-twitter-oauth-token).
  13. Set `expected_update_period_in_days` to the maximum amount of time that you'd expect to pass between Events being created by this Agent.
  14. `generate` should be either `events` or `counts`. If set to `counts`, it will output event summaries whenever the Agent is scheduled.
  15. MD
  16. event_description <<-MD
  17. When in `counts` mode, TwitterStreamAgent events look like:
  18. {
  19. "filter": "hello world",
  20. "count": 25,
  21. "time": 3456785456
  22. }
  23. When in `events` mode, TwitterStreamAgent events look like:
  24. {
  25. "filter": "selectorgadget",
  26. ... every Tweet field, including ...
  27. "text": "something",
  28. "user": {
  29. "name": "Mr. Someone",
  30. "screen_name": "Someone",
  31. "location": "Vancouver BC Canada",
  32. "description": "...",
  33. "followers_count": 486,
  34. "friends_count": 1983,
  35. "created_at": "Mon Aug 29 23:38:14 +0000 2011",
  36. "time_zone": "Pacific Time (US & Canada)",
  37. "statuses_count": 3807,
  38. "lang": "en"
  39. },
  40. "retweet_count": 0,
  41. "entities": ...
  42. "lang": "en"
  43. }
  44. MD
  45. default_schedule "11pm"
  46. def validate_options
  47. unless options['filters'].present? &&
  48. options['expected_update_period_in_days'].present? &&
  49. options['generate'].present?
  50. errors.add(:base, "expected_update_period_in_days, generate, and filters are required fields")
  51. end
  52. end
  53. def working?
  54. event_created_within?(interpolated_options['expected_update_period_in_days']) && !recent_error_logs?
  55. end
  56. def default_options
  57. {
  58. 'filters' => %w[keyword1 keyword2],
  59. 'expected_update_period_in_days' => "2",
  60. 'generate' => "events"
  61. }
  62. end
  63. def process_tweet(filter, status)
  64. filter = lookup_filter(filter)
  65. if filter
  66. if interpolated_options['generate'] == "counts"
  67. # Avoid memory pollution by reloading the Agent.
  68. agent = Agent.find(id)
  69. agent.memory['filter_counts'] ||= {}
  70. agent.memory['filter_counts'][filter] ||= 0
  71. agent.memory['filter_counts'][filter] += 1
  72. remove_unused_keys!(agent, 'filter_counts')
  73. agent.save!
  74. else
  75. create_event :payload => status.merge('filter' => filter)
  76. end
  77. end
  78. end
  79. def check
  80. if interpolated_options['generate'] == "counts" && memory['filter_counts'] && memory['filter_counts'].length > 0
  81. memory['filter_counts'].each do |filter, count|
  82. create_event :payload => { 'filter' => filter, 'count' => count, 'time' => Time.now.to_i }
  83. end
  84. end
  85. memory['filter_counts'] = {}
  86. end
  87. protected
  88. def lookup_filter(filter)
  89. interpolated_options['filters'].each do |known_filter|
  90. if known_filter == filter
  91. return filter
  92. elsif known_filter.is_a?(Array)
  93. if known_filter.include?(filter)
  94. return known_filter.first
  95. end
  96. end
  97. end
  98. end
  99. def remove_unused_keys!(agent, base)
  100. if agent.memory[base]
  101. (agent.memory[base].keys - agent.interpolated_options['filters'].map {|f| f.is_a?(Array) ? f.first.to_s : f.to_s }).each do |removed_key|
  102. agent.memory[base].delete(removed_key)
  103. end
  104. end
  105. end
  106. end
  107. end