de_duplication_agent.rb 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. module Agents
  2. class DeDuplicationAgent < Agent
  3. include FormConfigurable
  4. cannot_be_scheduled!
  5. description <<~MD
  6. The De-duplication Agent receives a stream of events and remits the event if it is not a duplicate.
  7. `property` the value that should be used to determine the uniqueness of the event (empty to use the whole payload)
  8. `lookback` amount of past Events to compare the value to (0 for unlimited)
  9. `expected_update_period_in_days` is used to determine if the Agent is working.
  10. MD
  11. event_description <<~MD
  12. The DeDuplicationAgent just reemits events it received.
  13. MD
  14. def default_options
  15. {
  16. 'property' => '{{value}}',
  17. 'lookback' => 100,
  18. 'expected_update_period_in_days' => 1
  19. }
  20. end
  21. form_configurable :property
  22. form_configurable :lookback
  23. form_configurable :expected_update_period_in_days
  24. after_initialize :initialize_memory
  25. def initialize_memory
  26. memory['properties'] ||= []
  27. end
  28. def validate_options
  29. unless options['lookback'].present? && options['expected_update_period_in_days'].present?
  30. errors.add(:base, "The lookback and expected_update_period_in_days fields are all required.")
  31. end
  32. end
  33. def working?
  34. event_created_within?(interpolated['expected_update_period_in_days']) && !recent_error_logs?
  35. end
  36. def receive(incoming_events)
  37. incoming_events.each do |event|
  38. handle(interpolated(event), event)
  39. end
  40. end
  41. private
  42. def handle(opts, event = nil)
  43. property = get_hash(options['property'].blank? ? JSON.dump(event.payload) : opts['property'])
  44. if is_unique?(property)
  45. outbound_event = create_event payload: event.payload
  46. log(
  47. "Propagating new event as '#{property}' is a new unique property.",
  48. inbound_event: event,
  49. outbound_event:
  50. )
  51. update_memory(property, opts['lookback'].to_i)
  52. else
  53. log("Not propagating as incoming event is a duplicate.", inbound_event: event)
  54. end
  55. end
  56. def get_hash(property)
  57. if property.to_s.length > 10
  58. Zlib.crc32(property).to_s
  59. else
  60. property
  61. end
  62. end
  63. def is_unique?(property)
  64. !memory['properties'].include?(property)
  65. end
  66. def update_memory(property, amount)
  67. if amount != 0 && memory['properties'].length == amount
  68. memory['properties'].shift
  69. end
  70. memory['properties'].push(property)
  71. end
  72. end
  73. end