digest_agent.rb 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. module Agents
  2. class DigestAgent < Agent
  3. include FormConfigurable
  4. default_schedule "6am"
  5. description <<~MD
  6. The Digest Agent collects any Events sent to it and emits them as a single event.
  7. The resulting Event will have a payload message of `message`. You can use liquid templating in the `message`, have a look at the [Wiki](https://github.com/huginn/huginn/wiki/Formatting-Events-using-Liquid) for details.
  8. Set `expected_receive_period_in_days` to the maximum amount of time that you'd expect to pass between Events being received by this Agent.
  9. If `retained_events` is set to 0 (the default), all received events are cleared after a digest is sent. Set `retained_events` to a value larger than 0 to keep a certain number of events around on a rolling basis to re-send in future digests.
  10. For instance, say `retained_events` is set to 3 and the Agent has received Events `5`, `4`, and `3`. When a digest is sent, Events `5`, `4`, and `3` are retained for a future digest. After Event `6` is received, the next digest will contain Events `6`, `5`, and `4`.
  11. MD
  12. event_description <<~MD
  13. Events look like this:
  14. {
  15. "events": [ event list ],
  16. "message": "Your message"
  17. }
  18. MD
  19. def default_options
  20. {
  21. "expected_receive_period_in_days" => "2",
  22. "message" => "{{ events | map: 'message' | join: ',' }}",
  23. "retained_events" => "0"
  24. }
  25. end
  26. form_configurable :message, type: :text
  27. form_configurable :expected_receive_period_in_days
  28. form_configurable :retained_events
  29. def validate_options
  30. errors.add(:base,
  31. 'retained_events must be 0 to 999') unless options['retained_events'].to_i >= 0 && options['retained_events'].to_i < 1000
  32. end
  33. def working?
  34. last_receive_at && last_receive_at > interpolated["expected_receive_period_in_days"].to_i.days.ago && !recent_error_logs?
  35. end
  36. def receive(incoming_events)
  37. self.memory["queue"] ||= []
  38. incoming_events.each do |event|
  39. self.memory["queue"] << event.id
  40. end
  41. if interpolated["retained_events"].to_i > 0 && memory["queue"].length > interpolated["retained_events"].to_i
  42. memory["queue"].shift(memory["queue"].length - interpolated["retained_events"].to_i)
  43. end
  44. end
  45. def check
  46. if self.memory["queue"] && self.memory["queue"].length > 0
  47. events = received_events.where(id: self.memory["queue"]).order(id: :asc).to_a
  48. payload = { "events" => events.map { |event| event.payload } }
  49. payload["message"] = interpolated(payload)["message"]
  50. create_event(payload:)
  51. if interpolated["retained_events"].to_i == 0
  52. self.memory["queue"] = []
  53. end
  54. end
  55. end
  56. end
  57. end