change_detector_agent.rb 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. module Agents
  2. class ChangeDetectorAgent < Agent
  3. cannot_be_scheduled!
  4. description <<~MD
  5. The Change Detector Agent receives a stream of events and emits a new event when a property of the received event changes.
  6. `property` specifies a [Liquid](https://github.com/huginn/huginn/wiki/Formatting-Events-using-Liquid) template that expands to the property to be watched, where you can use a variable `last_property` for the last property value. If you want to detect a new lowest price, try this: `{% assign drop = last_property | minus: price %}{% if last_property == blank or drop > 0 %}{{ price | default: last_property }}{% else %}{{ last_property }}{% endif %}`
  7. `expected_update_period_in_days` is used to determine if the Agent is working.
  8. The resulting event will be a copy of the received event.
  9. MD
  10. event_description <<~MD
  11. This will change based on the source event. If you were event from the ShellCommandAgent, your outbound event might look like:
  12. {
  13. 'command' => 'pwd',
  14. 'path' => '/home/Huginn',
  15. 'exit_status' => '0',
  16. 'errors' => '',
  17. 'output' => '/home/Huginn'
  18. }
  19. MD
  20. def default_options
  21. {
  22. 'property' => '{{output}}',
  23. 'expected_update_period_in_days' => 1
  24. }
  25. end
  26. def validate_options
  27. unless options['property'].present? && options['expected_update_period_in_days'].present?
  28. errors.add(:base, "The property and expected_update_period_in_days fields are all required.")
  29. end
  30. end
  31. def working?
  32. event_created_within?(interpolated['expected_update_period_in_days']) && !recent_error_logs?
  33. end
  34. def receive(incoming_events)
  35. incoming_events.each do |event|
  36. interpolation_context.stack do
  37. interpolation_context['last_property'] = last_property
  38. handle(interpolated(event), event)
  39. end
  40. end
  41. end
  42. private
  43. def handle(opts, event = nil)
  44. property = opts['property']
  45. if has_changed?(property)
  46. created_event = create_event payload: event.payload
  47. log("Propagating new event as property has changed to #{property} from #{last_property}",
  48. outbound_event: created_event, inbound_event: event)
  49. update_memory(property)
  50. else
  51. log("Not propagating as incoming event has not changed from #{last_property}.", inbound_event: event)
  52. end
  53. end
  54. def has_changed?(property)
  55. property != last_property
  56. end
  57. def last_property
  58. self.memory['last_property']
  59. end
  60. def update_memory(property)
  61. self.memory['last_property'] = property
  62. end
  63. end
  64. end