twitter_action_agent.rb 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. module Agents
  2. class TwitterActionAgent < Agent
  3. include TwitterConcern
  4. cannot_be_scheduled!
  5. description <<~MD
  6. The Twitter Action Agent is able to retweet or favorite tweets from the events it receives.
  7. #{twitter_dependencies_missing if dependencies_missing?}
  8. It expects to consume events generated by twitter agents where the payload is a hash of tweet information. The existing TwitterStreamAgent is one example of a valid event producer for this Agent.
  9. To be able to use this Agent you need to authenticate with Twitter in the [Services](/services) section first.
  10. Set `expected_receive_period_in_days` to the maximum amount of time that you'd expect to pass between Events being received by this Agent.
  11. Set `retweet` to either true or false.
  12. Set `favorite` to either true or false.
  13. Set `emit_error_events` to true to emit an Event when the action failed, otherwise the action will be retried.
  14. MD
  15. def validate_options
  16. unless options['expected_receive_period_in_days'].present?
  17. errors.add(:base, "expected_receive_period_in_days is required")
  18. end
  19. unless retweet? || favorite?
  20. errors.add(:base, "at least one action must be true")
  21. end
  22. if emit_error_events?.nil?
  23. errors.add(:base, "emit_error_events must be set to 'true' or 'false'")
  24. end
  25. end
  26. def working?
  27. last_receive_at && last_receive_at > interpolated['expected_receive_period_in_days'].to_i.days.ago && !recent_error_logs?
  28. end
  29. def default_options
  30. {
  31. 'expected_receive_period_in_days' => '2',
  32. 'favorite' => 'false',
  33. 'retweet' => 'true',
  34. 'emit_error_events' => 'false'
  35. }
  36. end
  37. def retweet?
  38. boolify(options['retweet'])
  39. end
  40. def favorite?
  41. boolify(options['favorite'])
  42. end
  43. def emit_error_events?
  44. boolify(options['emit_error_events'])
  45. end
  46. def receive(incoming_events)
  47. tweets = tweets_from_events(incoming_events)
  48. begin
  49. twitter.favorite(tweets) if favorite?
  50. twitter.retweet(tweets) if retweet?
  51. rescue Twitter::Error => e
  52. case e
  53. when Twitter::Error::AlreadyRetweeted, Twitter::Error::AlreadyFavorited
  54. error e.message
  55. else
  56. raise e unless emit_error_events?
  57. end
  58. if emit_error_events?
  59. create_event payload: {
  60. 'success' => false,
  61. 'error' => e.message,
  62. 'tweets' => Hash[tweets.map { |t| [t.id, t.text] }],
  63. 'agent_ids' => incoming_events.map(&:agent_id),
  64. 'event_ids' => incoming_events.map(&:id)
  65. }
  66. end
  67. end
  68. end
  69. def tweets_from_events(events)
  70. events.map do |e|
  71. Twitter::Tweet.new(
  72. id: e.payload["id"],
  73. text: e.payload["expanded_text"] || e.payload["full_text"] || e.payload["text"]
  74. )
  75. end
  76. end
  77. end
  78. end