de_duplication_agent.rb 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. module Agents
  2. class DeDuplicationAgent < Agent
  3. include FormConfigurable
  4. cannot_be_scheduled!
  5. description <<-MD
  6. The De-duplication Agent receives a stream of events and remits the event if it is not a duplicate.
  7. `property` the value that should be used to determine the uniqueness of the event (empty to use the whole payload)
  8. `lookback` amount of past Events to compare the value to (0 for unlimited)
  9. `expected_update_period_in_days` is used to determine if the Agent is working.
  10. MD
  11. event_description <<-MD
  12. The DeDuplicationAgent just reemits events it received.
  13. MD
  14. def default_options
  15. {
  16. 'property' => '{{value}}',
  17. 'lookback' => 100,
  18. 'expected_update_period_in_days' => 1
  19. }
  20. end
  21. form_configurable :property
  22. form_configurable :lookback
  23. form_configurable :expected_update_period_in_days
  24. after_initialize :initialize_memory
  25. def initialize_memory
  26. memory['properties'] ||= []
  27. end
  28. def validate_options
  29. unless options['lookback'].present? && options['expected_update_period_in_days'].present?
  30. errors.add(:base, "The lookback and expected_update_period_in_days fields are all required.")
  31. end
  32. end
  33. def working?
  34. event_created_within?(interpolated['expected_update_period_in_days']) && !recent_error_logs?
  35. end
  36. def receive(incoming_events)
  37. incoming_events.each do |event|
  38. handle(interpolated(event), event)
  39. end
  40. end
  41. private
  42. def handle(opts, event = nil)
  43. property = get_hash(options['property'].blank? ? JSON.dump(event.payload) : opts['property'])
  44. if is_unique?(property)
  45. created_event = create_event :payload => event.payload
  46. log("Propagating new event as '#{property}' is a new unique property.", :inbound_event => event )
  47. update_memory(property, opts['lookback'].to_i)
  48. else
  49. log("Not propagating as incoming event is a duplicate.", :inbound_event => event )
  50. end
  51. end
  52. def get_hash(property)
  53. if property.to_s.length > 10
  54. Zlib::crc32(property).to_s
  55. else
  56. property
  57. end
  58. end
  59. def is_unique?(property)
  60. !memory['properties'].include?(property)
  61. end
  62. def update_memory(property, amount)
  63. if amount != 0 && memory['properties'].length == amount
  64. memory['properties'].shift
  65. end
  66. memory['properties'].push(property)
  67. end
  68. end
  69. end