1
0

gap_detector_agent.rb 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. module Agents
  2. class GapDetectorAgent < Agent
  3. default_schedule "every_10m"
  4. description <<~MD
  5. The Gap Detector Agent will watch for holes or gaps in a stream of incoming Events and generate "no data alerts".
  6. The `value_path` value is a [JSONPath](http://goessner.net/articles/JsonPath/) to a value of interest. If either
  7. this value is empty, or no Events are received, during `window_duration_in_days`, an Event will be created with
  8. a payload of `message`.
  9. MD
  10. event_description <<~MD
  11. Events look like:
  12. {
  13. "message": "No data has been received!",
  14. "gap_started_at": "1234567890"
  15. }
  16. MD
  17. def validate_options
  18. unless options['message'].present?
  19. errors.add(:base, "message is required")
  20. end
  21. unless options['window_duration_in_days'].present? && options['window_duration_in_days'].to_f > 0
  22. errors.add(:base, "window_duration_in_days must be provided as an integer or floating point number")
  23. end
  24. end
  25. def default_options
  26. {
  27. 'window_duration_in_days' => "2",
  28. 'message' => "No data has been received!"
  29. }
  30. end
  31. def working?
  32. true
  33. end
  34. def receive(incoming_events)
  35. incoming_events.sort_by(&:created_at).each do |event|
  36. memory['newest_event_created_at'] ||= 0
  37. if !interpolated['value_path'].present? || Utils.value_at(event.payload, interpolated['value_path']).present?
  38. if event.created_at.to_i > memory['newest_event_created_at']
  39. memory['newest_event_created_at'] = event.created_at.to_i
  40. memory.delete('alerted_at')
  41. end
  42. end
  43. end
  44. end
  45. def check
  46. window = interpolated['window_duration_in_days'].to_f.days.ago
  47. if memory['newest_event_created_at'].present? && Time.at(memory['newest_event_created_at']) < window
  48. unless memory['alerted_at']
  49. memory['alerted_at'] = Time.now.to_i
  50. create_event payload: {
  51. message: interpolated['message'],
  52. gap_started_at: memory['newest_event_created_at']
  53. }
  54. end
  55. end
  56. end
  57. end
  58. end