12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- module Agents
- class DeDuplicationAgent < Agent
- include FormConfigurable
- cannot_be_scheduled!
- description <<~MD
- The De-duplication Agent receives a stream of events and remits the event if it is not a duplicate.
- `property` the value that should be used to determine the uniqueness of the event (empty to use the whole payload)
- `lookback` amount of past Events to compare the value to (0 for unlimited)
- `expected_update_period_in_days` is used to determine if the Agent is working.
- MD
- event_description <<~MD
- The DeDuplicationAgent just reemits events it received.
- MD
- def default_options
- {
- 'property' => '{{value}}',
- 'lookback' => 100,
- 'expected_update_period_in_days' => 1
- }
- end
- form_configurable :property
- form_configurable :lookback
- form_configurable :expected_update_period_in_days
- after_initialize :initialize_memory
- def initialize_memory
- memory['properties'] ||= []
- end
- def validate_options
- unless options['lookback'].present? && options['expected_update_period_in_days'].present?
- errors.add(:base, "The lookback and expected_update_period_in_days fields are all required.")
- end
- end
- def working?
- event_created_within?(interpolated['expected_update_period_in_days']) && !recent_error_logs?
- end
- def receive(incoming_events)
- incoming_events.each do |event|
- handle(interpolated(event), event)
- end
- end
- private
- def handle(opts, event = nil)
- property = get_hash(options['property'].blank? ? JSON.dump(event.payload) : opts['property'])
- if is_unique?(property)
- outbound_event = create_event payload: event.payload
- log(
- "Propagating new event as '#{property}' is a new unique property.",
- inbound_event: event,
- outbound_event:
- )
- update_memory(property, opts['lookback'].to_i)
- else
- log("Not propagating as incoming event is a duplicate.", inbound_event: event)
- end
- end
- def get_hash(property)
- if property.to_s.length > 10
- Zlib.crc32(property).to_s
- else
- property
- end
- end
- def is_unique?(property)
- !memory['properties'].include?(property)
- end
- def update_memory(property, amount)
- if amount != 0 && memory['properties'].length == amount
- memory['properties'].shift
- end
- memory['properties'].push(property)
- end
- end
- end
|