123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- module Agents
- class DelayAgent < Agent
- include FormConfigurable
- default_schedule 'every_12h'
- description <<~MD
- The DelayAgent stores received Events and emits copies of them on a schedule. Use this as a buffer or queue of Events.
- `max_events` should be set to the maximum number of events that you'd like to hold in the buffer. When this number is
- reached, new events will either be ignored, or will displace the oldest event already in the buffer, depending on
- whether you set `keep` to `newest` or `oldest`.
- `expected_receive_period_in_days` is used to determine if the Agent is working. Set it to the maximum number of days
- that you anticipate passing without this Agent receiving an incoming Event.
- `emit_interval` specifies the interval in seconds between emitting events. This is zero (no interval) by default.
- `max_emitted_events` is used to limit the number of the maximum events which should be created. If you omit this DelayAgent will create events for every event stored in the memory.
- # Ordering Events
- #{description_events_order("events in which buffered events are emitted")}
- MD
- def default_options
- {
- 'expected_receive_period_in_days' => 10,
- 'max_events' => 100,
- 'keep' => 'newest',
- 'max_emitted_events' => '',
- 'emit_interval' => 0,
- 'events_order' => [],
- }
- end
- form_configurable :expected_receive_period_in_days, type: :number, html_options: { min: 1 }
- form_configurable :max_events, type: :number, html_options: { min: 1 }
- form_configurable :keep, type: :array, values: %w[newest oldest]
- form_configurable :max_emitted_events, type: :number, html_options: { min: 0 }
- form_configurable :emit_interval, type: :number, html_options: { min: 0, step: 0.001 }
- form_configurable :events_order, type: :json
- def validate_options
- unless options['expected_receive_period_in_days'].present? && options['expected_receive_period_in_days'].to_i > 0
- errors.add(:base,
- "Please provide 'expected_receive_period_in_days' to indicate how many days can pass before this Agent is considered to be not working")
- end
- unless options['keep'].present? && options['keep'].in?(%w[newest oldest])
- errors.add(:base, "The 'keep' option is required and must be set to 'oldest' or 'newest'")
- end
- unless interpolated['max_events'].present? && interpolated['max_events'].to_i > 0
- errors.add(:base, "The 'max_events' option is required and must be an integer greater than 0")
- end
- if interpolated['max_emitted_events'].present?
- unless interpolated['max_emitted_events'].to_i > 0
- errors.add(:base, "The 'max_emitted_events' option is optional and should be an integer greater than 0")
- end
- end
- unless interpolated['emit_interval'] in nil | 0.. | /\A\d+(?:\.\d+)?\z/
- errors.add(:base, "The 'emit_interval' option should be a non-negative number if set")
- end
- end
- def working?
- last_receive_at && last_receive_at > options['expected_receive_period_in_days'].to_i.days.ago && !recent_error_logs?
- end
- def receive(incoming_events)
- incoming_events.each do |event|
- event_ids = memory['event_ids'] || []
- event_ids << event.id
- if event_ids.length > interpolated['max_events'].to_i
- if options['keep'] == 'newest'
- event_ids.shift
- else
- event_ids.pop
- end
- end
- memory['event_ids'] = event_ids
- end
- end
- def check
- return if memory['event_ids'].blank?
- events = received_events.where(id: memory['event_ids']).reorder(:id).to_a
- if interpolated[SortableEvents::EVENTS_ORDER_KEY].present?
- events = sort_events(events)
- end
- max_emitted_events = interpolated['max_emitted_events'].presence&.to_i
- if max_emitted_events&.< events.length
- events[max_emitted_events..] = []
- end
- interval = (options['emit_interval'].presence&.to_f || 0).clamp(0..)
- events.each_with_index do |event, i|
- sleep interval unless i.zero?
- create_event payload: event.payload
- memory['event_ids'].delete(event.id)
- end
- end
- end
- end
|