123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- module Agents
- class DeDuplicationAgent < Agent
- include FormConfigurable
- cannot_be_scheduled!
- description <<-MD
- The DeDuplicationAgent receives a stream of events and remits the event if it is not a duplicate.
- `property` the value that should be used to determine the uniqueness of the event (empty to use the whole payload)
- `lookback` amount of past Events to compare the value to (0 for unlimited)
- `expected_update_period_in_days` is used to determine if the Agent is working.
- MD
- event_description <<-MD
- The DeDuplicationAgent just reemits events it received.
- MD
- def default_options
- {
- 'property' => '{{value}}',
- 'lookback' => 100,
- 'expected_update_period_in_days' => 1
- }
- end
- form_configurable :property
- form_configurable :lookback
- form_configurable :expected_update_period_in_days
- before_create :initialize_memory
- def initialize_memory
- memory['properties'] = []
- end
- def validate_options
- unless options['lookback'].present? && options['expected_update_period_in_days'].present?
- errors.add(:base, "The lookback and expected_update_period_in_days fields are all required.")
- end
- end
- def working?
- event_created_within?(interpolated['expected_update_period_in_days']) && !recent_error_logs?
- end
- def receive(incoming_events)
- incoming_events.each do |event|
- handle(interpolated(event), event)
- end
- end
- private
- def handle(opts, event = nil)
- property = get_hash(options['property'].blank? ? JSON.dump(event.payload) : opts['property'])
- if is_unique?(property)
- created_event = create_event :payload => event.payload
- log("Propagating new event as '#{property}' is a new unique property.", :inbound_event => event )
- update_memory(property, opts['lookback'].to_i)
- else
- log("Not propagating as incoming event is a duplicate.", :inbound_event => event )
- end
- end
- def get_hash(property)
- if property.to_s.length > 10
- Zlib::crc32(property).to_s
- else
- property
- end
- end
- def is_unique?(property)
- !memory['properties'].include?(property)
- end
- def update_memory(property, amount)
- if amount != 0 && memory['properties'].length == amount
- memory['properties'].shift
- end
- memory['properties'].push(property)
- end
- end
- end
|