peak_detector_agent.rb 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. require 'pp'
  2. module Agents
  3. class PeakDetectorAgent < Agent
  4. include LiquidInterpolatable
  5. cannot_be_scheduled!
  6. description <<-MD
  7. Use a PeakDetectorAgent to watch for peaks in an event stream. When a peak is detected, the resulting Event will have a payload message of `message`. You can include extractions in the message, for example: `I saw a bar of: {{foo.bar}}`, have a look at the [Wiki](https://github.com/cantino/huginn/wiki/Formatting-Events-using-Liquid) for details.
  8. The `value_path` value is a [JSONPaths](http://goessner.net/articles/JsonPath/) to the value of interest. `group_by_path` is a hash path that will be used to group values, if present.
  9. Set `expected_receive_period_in_days` to the maximum amount of time that you'd expect to pass between Events being received by this Agent.
  10. You may set `window_duration_in_days` to change the default memory window length of `14` days,
  11. `min_peak_spacing_in_days` to change the default minimum peak spacing of `2` days (peaks closer together will be ignored), and
  12. `std_multiple` to change the default standard deviation threshold multiple of `3`.
  13. MD
  14. event_description <<-MD
  15. Events look like:
  16. {
  17. "message": "Your message",
  18. "peak": 6,
  19. "peak_time": 3456789242,
  20. "grouped_by": "something"
  21. }
  22. MD
  23. def validate_options
  24. unless options['expected_receive_period_in_days'].present? && options['message'].present? && options['value_path'].present?
  25. errors.add(:base, "expected_receive_period_in_days, value_path, and message are required")
  26. end
  27. end
  28. def default_options
  29. {
  30. 'expected_receive_period_in_days' => "2",
  31. 'group_by_path' => "filter",
  32. 'value_path' => "count",
  33. 'message' => "A peak was found"
  34. }
  35. end
  36. def working?
  37. last_receive_at && last_receive_at > options['expected_receive_period_in_days'].to_i.days.ago && !recent_error_logs?
  38. end
  39. def receive(incoming_events)
  40. incoming_events.sort_by(&:created_at).each do |event|
  41. group = group_for(event)
  42. remember group, event
  43. check_for_peak group, event
  44. end
  45. end
  46. private
  47. def check_for_peak(group, event)
  48. memory['peaks'] ||= {}
  49. memory['peaks'][group] ||= []
  50. if memory['data'][group].length > 4 && (memory['peaks'][group].empty? || memory['peaks'][group].last < event.created_at.to_i - peak_spacing)
  51. average_value, standard_deviation = stats_for(group, :skip_last => 1)
  52. newest_value, newest_time = memory['data'][group][-1].map(&:to_f)
  53. if newest_value > average_value + std_multiple * standard_deviation
  54. memory['peaks'][group] << newest_time
  55. memory['peaks'][group].reject! { |p| p <= newest_time - window_duration }
  56. create_event :payload => { 'message' => interpolate_string(options['message'], event.payload), 'peak' => newest_value, 'peak_time' => newest_time, 'grouped_by' => group.to_s }
  57. end
  58. end
  59. end
  60. def stats_for(group, options = {})
  61. data = memory['data'][group].map { |d| d.first.to_f }
  62. data = data[0...(data.length - (options[:skip_last] || 0))]
  63. length = data.length.to_f
  64. mean = 0
  65. mean_variance = 0
  66. data.each do |value|
  67. mean += value
  68. end
  69. mean /= length
  70. data.each do |value|
  71. variance = (value - mean)**2
  72. mean_variance += variance
  73. end
  74. mean_variance /= length
  75. standard_deviation = Math.sqrt(mean_variance)
  76. [mean, standard_deviation]
  77. end
  78. def window_duration
  79. if options['window_duration'].present? # The older option
  80. options['window_duration'].to_i
  81. else
  82. (options['window_duration_in_days'] || 14).to_f.days
  83. end
  84. end
  85. def std_multiple
  86. (options['std_multiple'] || 3).to_f
  87. end
  88. def peak_spacing
  89. if options['peak_spacing'].present? # The older option
  90. options['peak_spacing'].to_i
  91. else
  92. (options['min_peak_spacing_in_days'] || 2).to_f.days
  93. end
  94. end
  95. def group_for(event)
  96. ((options['group_by_path'].present? && Utils.value_at(event.payload, options['group_by_path'])) || 'no_group')
  97. end
  98. def remember(group, event)
  99. memory['data'] ||= {}
  100. memory['data'][group] ||= []
  101. memory['data'][group] << [ Utils.value_at(event.payload, options['value_path']), event.created_at.to_i ]
  102. cleanup group
  103. end
  104. def cleanup(group)
  105. newest_time = memory['data'][group].last.last
  106. memory['data'][group].reject! { |value, time| time <= newest_time - window_duration }
  107. end
  108. end
  109. end