1
0

attribute_difference_agent.rb 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. module Agents
  2. class AttributeDifferenceAgent < Agent
  3. cannot_be_scheduled!
  4. description <<-MD
  5. The Attribute Difference Agent receives events and emits a new event with
  6. the difference or change of a specific attribute in comparison to the previous
  7. event received.
  8. `path` specifies the JSON path of the attribute to be used from the event.
  9. `output` specifies the new attribute name that will be created on the original payload
  10. and it will contain the difference or change.
  11. `method` specifies if it should be...
  12. * `percentage_change` eg. Previous value was `160`, new value is `116`. Percentage change is `-27.5`
  13. * `decimal_difference` eg. Previous value was `5.5`, new value is `15.2`. Difference is `9.7`
  14. * `integer_difference` eg. Previous value was `50`, new value is `40`. Difference is `-10`
  15. `decimal_precision` defaults to `3`, but you can override this if you want.
  16. `expected_update_period_in_days` is used to determine if the Agent is working.
  17. The resulting event will be a copy of the received event with the difference
  18. or change added as an extra attribute. If you use the `percentage_change` the
  19. attribute will be formatted as such `{{attribute}}_change`, otherwise it will
  20. be `{{attribute}}_diff`.
  21. All configuration options will be liquid interpolated based on the incoming event.
  22. MD
  23. event_description <<-MD
  24. This will change based on the source event.
  25. MD
  26. def default_options
  27. {
  28. 'path' => '.data.rate',
  29. 'output' => 'rate_diff',
  30. 'method' => 'integer_difference',
  31. 'expected_update_period_in_days' => 1
  32. }
  33. end
  34. def validate_options
  35. unless options['path'].present? && options['method'].present? && options['output'].present? && options['expected_update_period_in_days'].present?
  36. errors.add(:base, 'The attribute, method and expected_update_period_in_days fields are all required.')
  37. end
  38. end
  39. def working?
  40. event_created_within?(interpolated['expected_update_period_in_days']) && !recent_error_logs?
  41. end
  42. def receive(incoming_events)
  43. incoming_events.each do |event|
  44. handle(interpolated(event), event)
  45. end
  46. end
  47. private
  48. def handle(opts, event)
  49. opts['decimal_precision'] ||= 3
  50. attribute_value = Utils.value_at(event.payload, opts['path'])
  51. attribute_value = attribute_value.nil? ? 0 : attribute_value
  52. payload = event.payload.deep_dup
  53. if opts['method'] == 'percentage_change'
  54. change = calculate_percentage_change(attribute_value, opts['decimal_precision'])
  55. payload[opts['output']] = change
  56. elsif opts['method'] == 'decimal_difference'
  57. difference = calculate_decimal_difference(attribute_value, opts['decimal_precision'])
  58. payload[opts['output']] = difference
  59. elsif opts['method'] == 'integer_difference'
  60. difference = calculate_integer_difference(attribute_value)
  61. payload[opts['output']] = difference
  62. end
  63. created_event = create_event(payload: payload)
  64. log('Propagating new event', outbound_event: created_event, inbound_event: event)
  65. update_memory(attribute_value)
  66. end
  67. def calculate_integer_difference(new_value)
  68. return 0 if last_value.nil?
  69. (new_value.to_i - last_value.to_i)
  70. end
  71. def calculate_decimal_difference(new_value, dec_pre)
  72. return 0.0 if last_value.nil?
  73. (new_value.to_f - last_value.to_f).round(dec_pre.to_i)
  74. end
  75. def calculate_percentage_change(new_value, dec_pre)
  76. return 0.0 if last_value.nil?
  77. (((new_value.to_f / last_value.to_f) * 100) - 100).round(dec_pre.to_i)
  78. end
  79. def last_value
  80. memory['last_value']
  81. end
  82. def update_memory(new_value)
  83. memory['last_value'] = new_value
  84. end
  85. end
  86. end