1
0

peak_detector_agent.rb 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. module Agents
  2. class PeakDetectorAgent < Agent
  3. cannot_be_scheduled!
  4. DEFAULT_SEARCH_URL = 'https://twitter.com/search?q={q}'
  5. description <<~MD
  6. The Peak Detector Agent will watch for peaks in an event stream. When a peak is detected, the resulting Event will have a payload message of `message`. You can include extractions in the message, for example: `I saw a bar of: {{foo.bar}}`, have a look at the [Wiki](https://github.com/huginn/huginn/wiki/Formatting-Events-using-Liquid) for details.
  7. The `value_path` value is a [JSONPath](http://goessner.net/articles/JsonPath/) to the value of interest. `group_by_path` is a JSONPath that will be used to group values, if present.
  8. Set `expected_receive_period_in_days` to the maximum amount of time that you'd expect to pass between Events being received by this Agent.
  9. You may set `window_duration_in_days` to change the default memory window length of `14` days, `min_peak_spacing_in_days` to change the default minimum peak spacing of `2` days (peaks closer together will be ignored), and `std_multiple` to change the default standard deviation threshold multiple of `3`.
  10. You may set `min_events` for the minimal number of accumulated events before the agent starts detecting.
  11. You may set `search_url` to point to something else than Twitter search, using the URI Template syntax defined in [RFC 6570](https://tools.ietf.org/html/rfc6570). Default value is `#{DEFAULT_SEARCH_URL}` where `{q}` will be replaced with group name.
  12. MD
  13. event_description <<~MD
  14. Events look like:
  15. {
  16. "message": "Your message",
  17. "peak": 6,
  18. "peak_time": 3456789242,
  19. "grouped_by": "something"
  20. }
  21. MD
  22. def validate_options
  23. unless options['expected_receive_period_in_days'].present? && options['message'].present? && options['value_path'].present? && options['min_events'].present?
  24. errors.add(:base, "expected_receive_period_in_days, value_path, min_events and message are required")
  25. end
  26. begin
  27. tmpl = search_url
  28. rescue StandardError => e
  29. errors.add(:base, "search_url must be a valid URI template: #{e.message}")
  30. else
  31. unless tmpl.keys.include?('q')
  32. errors.add(:base, "search_url must include a variable named 'q'")
  33. end
  34. end
  35. end
  36. def default_options
  37. {
  38. 'expected_receive_period_in_days' => "2",
  39. 'group_by_path' => "filter",
  40. 'value_path' => "count",
  41. 'message' => "A peak of {{count}} was found in {{filter}}",
  42. 'min_events' => '4',
  43. }
  44. end
  45. def working?
  46. last_receive_at && last_receive_at > interpolated['expected_receive_period_in_days'].to_i.days.ago && !recent_error_logs?
  47. end
  48. def receive(incoming_events)
  49. incoming_events.sort_by(&:created_at).each do |event|
  50. group = group_for(event)
  51. remember group, event
  52. check_for_peak group, event
  53. end
  54. end
  55. def search_url
  56. Addressable::Template.new(options[:search_url].presence || DEFAULT_SEARCH_URL)
  57. end
  58. private
  59. def check_for_peak(group, event)
  60. memory['peaks'] ||= {}
  61. memory['peaks'][group] ||= []
  62. return if memory['data'][group].length <= options['min_events'].to_i
  63. if memory['peaks'][group].empty? || memory['peaks'][group].last < event.created_at.to_i - peak_spacing
  64. average_value, standard_deviation = stats_for(group, skip_last: 1)
  65. newest_value, newest_time = memory['data'][group][-1].map(&:to_f)
  66. if newest_value > average_value + std_multiple * standard_deviation
  67. memory['peaks'][group] << newest_time
  68. memory['peaks'][group].reject! { |p| p <= newest_time - window_duration }
  69. create_event payload: {
  70. 'message' => interpolated(event)['message'],
  71. 'peak' => newest_value,
  72. 'peak_time' => newest_time,
  73. 'grouped_by' => group.to_s
  74. }
  75. end
  76. end
  77. end
  78. def stats_for(group, options = {})
  79. data = memory['data'][group].map { |d| d.first.to_f }
  80. data = data[0...(data.length - (options[:skip_last] || 0))]
  81. length = data.length.to_f
  82. mean = 0
  83. mean_variance = 0
  84. data.each do |value|
  85. mean += value
  86. end
  87. mean /= length
  88. data.each do |value|
  89. variance = (value - mean)**2
  90. mean_variance += variance
  91. end
  92. mean_variance /= length
  93. standard_deviation = Math.sqrt(mean_variance)
  94. [mean, standard_deviation]
  95. end
  96. def window_duration
  97. if interpolated['window_duration'].present? # The older option
  98. interpolated['window_duration'].to_i
  99. else
  100. (interpolated['window_duration_in_days'] || 14).to_f.days
  101. end
  102. end
  103. def std_multiple
  104. (interpolated['std_multiple'] || 3).to_f
  105. end
  106. def peak_spacing
  107. if interpolated['peak_spacing'].present? # The older option
  108. interpolated['peak_spacing'].to_i
  109. else
  110. (interpolated['min_peak_spacing_in_days'] || 2).to_f.days
  111. end
  112. end
  113. def group_for(event)
  114. group_by_path = interpolated['group_by_path'].presence
  115. (group_by_path && Utils.value_at(event.payload, group_by_path)) || 'no_group'
  116. end
  117. def remember(group, event)
  118. memory['data'] ||= {}
  119. memory['data'][group] ||= []
  120. memory['data'][group] << [Utils.value_at(event.payload, interpolated['value_path']).to_f, event.created_at.to_i]
  121. cleanup group
  122. end
  123. def cleanup(group)
  124. newest_time = memory['data'][group].last.last
  125. memory['data'][group].reject! { |_value, time| time <= newest_time - window_duration }
  126. end
  127. end
  128. end