delay_agent.rb 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. module Agents
  2. class DelayAgent < Agent
  3. include FormConfigurable
  4. default_schedule 'every_12h'
  5. description <<~MD
  6. The DelayAgent stores received Events and emits copies of them on a schedule. Use this as a buffer or queue of Events.
  7. `max_events` should be set to the maximum number of events that you'd like to hold in the buffer. When this number is
  8. reached, new events will either be ignored, or will displace the oldest event already in the buffer, depending on
  9. whether you set `keep` to `newest` or `oldest`.
  10. `expected_receive_period_in_days` is used to determine if the Agent is working. Set it to the maximum number of days
  11. that you anticipate passing without this Agent receiving an incoming Event.
  12. `emit_interval` specifies the interval in seconds between emitting events. This is zero (no interval) by default.
  13. `max_emitted_events` is used to limit the number of the maximum events which should be created. If you omit this DelayAgent will create events for every event stored in the memory.
  14. # Ordering Events
  15. #{description_events_order("events in which buffered events are emitted")}
  16. MD
  17. def default_options
  18. {
  19. 'expected_receive_period_in_days' => 10,
  20. 'max_events' => 100,
  21. 'keep' => 'newest',
  22. 'max_emitted_events' => '',
  23. 'emit_interval' => 0,
  24. 'events_order' => [],
  25. }
  26. end
  27. form_configurable :expected_receive_period_in_days, type: :number, html_options: { min: 1 }
  28. form_configurable :max_events, type: :number, html_options: { min: 1 }
  29. form_configurable :keep, type: :array, values: %w[newest oldest]
  30. form_configurable :max_emitted_events, type: :number, html_options: { min: 0 }
  31. form_configurable :emit_interval, type: :number, html_options: { min: 0, step: 0.001 }
  32. form_configurable :events_order, type: :json
  33. def validate_options
  34. unless options['expected_receive_period_in_days'].present? && options['expected_receive_period_in_days'].to_i > 0
  35. errors.add(:base,
  36. "Please provide 'expected_receive_period_in_days' to indicate how many days can pass before this Agent is considered to be not working")
  37. end
  38. unless options['keep'].present? && options['keep'].in?(%w[newest oldest])
  39. errors.add(:base, "The 'keep' option is required and must be set to 'oldest' or 'newest'")
  40. end
  41. unless interpolated['max_events'].present? && interpolated['max_events'].to_i > 0
  42. errors.add(:base, "The 'max_events' option is required and must be an integer greater than 0")
  43. end
  44. if interpolated['max_emitted_events'].present?
  45. unless interpolated['max_emitted_events'].to_i > 0
  46. errors.add(:base, "The 'max_emitted_events' option is optional and should be an integer greater than 0")
  47. end
  48. end
  49. unless interpolated['emit_interval'] in nil | 0.. | /\A\d+(?:\.\d+)?\z/
  50. errors.add(:base, "The 'emit_interval' option should be a non-negative number if set")
  51. end
  52. end
  53. def working?
  54. last_receive_at && last_receive_at > options['expected_receive_period_in_days'].to_i.days.ago && !recent_error_logs?
  55. end
  56. def receive(incoming_events)
  57. save!
  58. with_lock do
  59. incoming_events.each do |event|
  60. event_ids = memory['event_ids'] || []
  61. event_ids << event.id
  62. if event_ids.length > interpolated['max_events'].to_i
  63. if options['keep'] == 'newest'
  64. event_ids.shift
  65. else
  66. event_ids.pop
  67. end
  68. end
  69. memory['event_ids'] = event_ids
  70. end
  71. end
  72. end
  73. private def extract_emitted_events!
  74. save!
  75. with_lock do
  76. emitted_events = received_events.where(id: memory['event_ids']).reorder(:id).to_a
  77. if interpolated[SortableEvents::EVENTS_ORDER_KEY].present?
  78. emitted_events = sort_events(emitted_events)
  79. end
  80. max_emitted_events = interpolated['max_emitted_events'].presence&.to_i
  81. if max_emitted_events&.< emitted_events.length
  82. emitted_events[max_emitted_events..] = []
  83. end
  84. memory['event_ids'] -= emitted_events.map(&:id)
  85. save!
  86. emitted_events
  87. end
  88. end
  89. def check
  90. return if memory['event_ids'].blank?
  91. interval = (options['emit_interval'].presence&.to_f || 0).clamp(0..)
  92. extract_emitted_events!.each_with_index do |event, i|
  93. sleep interval unless i.zero?
  94. create_event payload: event.payload
  95. end
  96. end
  97. end
  98. end