twitter_stream_agent.rb 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. module Agents
  2. class TwitterStreamAgent < Agent
  3. include TwitterConcern
  4. cannot_receive_events!
  5. description <<-MD
  6. The TwitterStreamAgent follows the Twitter stream in real time, watching for certain keywords, or filters, that you provide.
  7. To follow the Twitter stream, provide an array of `filters`. Multiple words in a filter must all show up in a tweet, but are independent of order.
  8. If you provide an array instead of a filter, the first entry will be considered primary and any additional values will be treated as aliases.
  9. To be able to use this Agent you need to authenticate with Twitter in the [Services](/services) section first.
  10. Set `expected_update_period_in_days` to the maximum amount of time that you'd expect to pass between Events being created by this Agent.
  11. `generate` should be either `events` or `counts`. If set to `counts`, it will output event summaries whenever the Agent is scheduled.
  12. MD
  13. event_description <<-MD
  14. When in `counts` mode, TwitterStreamAgent events look like:
  15. {
  16. "filter": "hello world",
  17. "count": 25,
  18. "time": 3456785456
  19. }
  20. When in `events` mode, TwitterStreamAgent events look like:
  21. {
  22. "filter": "selectorgadget",
  23. ... every Tweet field, including ...
  24. "text": "something",
  25. "user": {
  26. "name": "Mr. Someone",
  27. "screen_name": "Someone",
  28. "location": "Vancouver BC Canada",
  29. "description": "...",
  30. "followers_count": 486,
  31. "friends_count": 1983,
  32. "created_at": "Mon Aug 29 23:38:14 +0000 2011",
  33. "time_zone": "Pacific Time (US & Canada)",
  34. "statuses_count": 3807,
  35. "lang": "en"
  36. },
  37. "retweet_count": 0,
  38. "entities": ...
  39. "lang": "en"
  40. }
  41. MD
  42. default_schedule "11pm"
  43. def validate_options
  44. unless options['filters'].present? &&
  45. options['expected_update_period_in_days'].present? &&
  46. options['generate'].present?
  47. errors.add(:base, "expected_update_period_in_days, generate, and filters are required fields")
  48. end
  49. end
  50. def working?
  51. event_created_within?(interpolated['expected_update_period_in_days']) && !recent_error_logs?
  52. end
  53. def default_options
  54. {
  55. 'filters' => %w[keyword1 keyword2],
  56. 'expected_update_period_in_days' => "2",
  57. 'generate' => "events"
  58. }
  59. end
  60. def process_tweet(filter, status)
  61. filter = lookup_filter(filter)
  62. if filter
  63. if interpolated['generate'] == "counts"
  64. # Avoid memory pollution by reloading the Agent.
  65. agent = Agent.find(id)
  66. agent.memory['filter_counts'] ||= {}
  67. agent.memory['filter_counts'][filter] ||= 0
  68. agent.memory['filter_counts'][filter] += 1
  69. remove_unused_keys!(agent, 'filter_counts')
  70. agent.save!
  71. else
  72. create_event :payload => status.merge('filter' => filter)
  73. end
  74. end
  75. end
  76. def check
  77. if interpolated['generate'] == "counts" && memory['filter_counts'] && memory['filter_counts'].length > 0
  78. memory['filter_counts'].each do |filter, count|
  79. create_event :payload => { 'filter' => filter, 'count' => count, 'time' => Time.now.to_i }
  80. end
  81. end
  82. memory['filter_counts'] = {}
  83. end
  84. protected
  85. def lookup_filter(filter)
  86. interpolated['filters'].each do |known_filter|
  87. if known_filter == filter
  88. return filter
  89. elsif known_filter.is_a?(Array)
  90. if known_filter.include?(filter)
  91. return known_filter.first
  92. end
  93. end
  94. end
  95. end
  96. def remove_unused_keys!(agent, base)
  97. if agent.memory[base]
  98. (agent.memory[base].keys - agent.interpolated['filters'].map {|f| f.is_a?(Array) ? f.first.to_s : f.to_s }).each do |removed_key|
  99. agent.memory[base].delete(removed_key)
  100. end
  101. end
  102. end
  103. end
  104. end