delay_agent.rb 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. module Agents
  2. class DelayAgent < Agent
  3. include FormConfigurable
  4. default_schedule 'every_12h'
  5. description <<~MD
  6. The DelayAgent stores received Events and emits copies of them on a schedule. Use this as a buffer or queue of Events.
  7. `max_events` should be set to the maximum number of events that you'd like to hold in the buffer. When this number is
  8. reached, new events will either be ignored, or will displace the oldest event already in the buffer, depending on
  9. whether you set `keep` to `newest` or `oldest`.
  10. `expected_receive_period_in_days` is used to determine if the Agent is working. Set it to the maximum number of days
  11. that you anticipate passing without this Agent receiving an incoming Event.
  12. `max_emitted_events` is used to limit the number of the maximum events which should be created. If you omit this DelayAgent will create events for every event stored in the memory.
  13. MD
  14. def default_options
  15. {
  16. 'expected_receive_period_in_days' => '10',
  17. 'max_events' => '100',
  18. 'keep' => 'newest',
  19. 'max_emitted_events' => ''
  20. }
  21. end
  22. form_configurable :expected_receive_period_in_days, type: :string
  23. form_configurable :max_events, type: :string
  24. form_configurable :keep, type: :array, values: %w[newest oldest]
  25. form_configurable :max_emitted_events, type: :string
  26. def validate_options
  27. unless options['expected_receive_period_in_days'].present? && options['expected_receive_period_in_days'].to_i > 0
  28. errors.add(:base,
  29. "Please provide 'expected_receive_period_in_days' to indicate how many days can pass before this Agent is considered to be not working")
  30. end
  31. unless options['keep'].present? && options['keep'].in?(%w[newest oldest])
  32. errors.add(:base, "The 'keep' option is required and must be set to 'oldest' or 'newest'")
  33. end
  34. unless interpolated['max_events'].present? && interpolated['max_events'].to_i > 0
  35. errors.add(:base, "The 'max_events' option is required and must be an integer greater than 0")
  36. end
  37. if interpolated['max_emitted_events'].present?
  38. unless interpolated['max_emitted_events'].to_i > 0
  39. errors.add(:base, "The 'max_emitted_events' option is optional and should be an integer greater than 0")
  40. end
  41. end
  42. end
  43. def working?
  44. last_receive_at && last_receive_at > options['expected_receive_period_in_days'].to_i.days.ago && !recent_error_logs?
  45. end
  46. def receive(incoming_events)
  47. incoming_events.each do |event|
  48. memory['event_ids'] ||= []
  49. memory['event_ids'] << event.id
  50. if memory['event_ids'].length > interpolated['max_events'].to_i
  51. if options['keep'] == 'newest'
  52. memory['event_ids'].shift
  53. else
  54. memory['event_ids'].pop
  55. end
  56. end
  57. end
  58. end
  59. def check
  60. if memory['event_ids'] && memory['event_ids'].length > 0
  61. events = received_events.where(id: memory['event_ids']).reorder('events.id asc')
  62. if interpolated['max_emitted_events'].present?
  63. events = events.limit(interpolated['max_emitted_events'].to_i)
  64. end
  65. events.each do |event|
  66. create_event payload: event.payload
  67. memory['event_ids'].delete(event.id)
  68. end
  69. end
  70. end
  71. end
  72. end