jq_agent.rb 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. require 'open3'
  2. module Agents
  3. class JqAgent < Agent
  4. cannot_be_scheduled!
  5. can_dry_run!
  6. def self.should_run?
  7. !!jq_version
  8. end
  9. def self.jq_command
  10. ENV['USE_JQ'].presence
  11. end
  12. def self.jq_version
  13. if command = jq_command
  14. Open3.capture2(command, '--version', 2 => IO::NULL).first[/\Ajq-\K\S+/]
  15. end
  16. end
  17. def self.jq_info
  18. if version = jq_version
  19. "jq version #{version} is installed"
  20. else
  21. "**This agent is not enabled on this server**"
  22. end
  23. end
  24. gem_dependency_check { jq_version }
  25. description <<~MD
  26. The Jq Agent allows you to process incoming Events with [jq](https://stedolan.github.io/jq/) the JSON processor. (#{jq_info})
  27. It allows you to filter, transform and restructure Events in the way you want using jq's powerful features.
  28. You can specify a jq filter expression to apply to each incoming event in `filter`, and results it produces will become Events to be emitted.
  29. You can optionally pass in variables to the filter program by specifying key-value pairs of a variable name and an associated value in the `variables` key, each of which becomes a predefined variable.
  30. This Agent can be used to parse a complex JSON structure that is too hard to handle with JSONPath or Liquid templating.
  31. For example, suppose that a Post Agent created an Event which contains a `body` key with a value of the JSON formatted string of the following response body:
  32. {
  33. "status": "1",
  34. "since": "1245626956",
  35. "list": {
  36. "93817": {
  37. "item_id": "93817",
  38. "url": "http://url.com",
  39. "title": "Page Title",
  40. "time_updated": "1245626956",
  41. "time_added": "1245626956",
  42. "tags": "comma,seperated,list",
  43. "state": "0"
  44. },
  45. "935812": {
  46. "item_id": "935812",
  47. "url": "http://google.com",
  48. "title": "Google",
  49. "time_updated": "1245635279",
  50. "time_added": "1245635279",
  51. "tags": "comma,seperated,list",
  52. "state": "1"
  53. }
  54. }
  55. }
  56. Then you could have a Jq Agent with the following jq filter:
  57. .body | fromjson | .list | to_entries | map(.value) | map(try(.tags |= split(",")) // .) | sort_by(.time_added | tonumber)
  58. To get the following two Events emitted out of the said incoming Event from Post Agent:
  59. [
  60. {
  61. "item_id": "93817",
  62. "url": "http://url.com",
  63. "title": "Page Title",
  64. "time_updated": "1245626956",
  65. "time_added": "1245626956",
  66. "tags": ["comma", "seperated", "list"],
  67. "state": "0"
  68. },
  69. {
  70. "item_id": "935812",
  71. "url": "http://google.com",
  72. "title": "Google",
  73. "time_updated": "1245626956",
  74. "time_added": "1245626956",
  75. "tags": ["comma", "seperated", "list"],
  76. "state": "1"
  77. }
  78. ]
  79. MD
  80. def validate_options
  81. errors.add(:base, "filter needs to be present.") if !options['filter'].is_a?(String)
  82. errors.add(:base,
  83. "variables must be a hash if present.") if options.key?('variables') && !options['variables'].is_a?(Hash)
  84. end
  85. def default_options
  86. {
  87. 'filter' => '.',
  88. 'variables' => {}
  89. }
  90. end
  91. def working?
  92. self.class.should_run? && !recent_error_logs?
  93. end
  94. def receive(incoming_events)
  95. if !self.class.should_run?
  96. log("Unable to run because this agent is not enabled. Edit the USE_JQ environment variable.")
  97. return
  98. end
  99. incoming_events.each do |event|
  100. interpolate_with(event) do
  101. process_event(event)
  102. end
  103. end
  104. end
  105. private
  106. def get_variables
  107. variables = interpolated['variables']
  108. return {} if !variables.is_a?(Hash)
  109. variables.map { |name, value|
  110. [name.to_s, value.to_json]
  111. }.to_h
  112. end
  113. def process_event(event)
  114. Tempfile.create do |file|
  115. filter = interpolated['filter'].to_s
  116. # There seems to be no way to force jq to treat an arbitrary
  117. # string as a filter without being confused with a command
  118. # line option, so pass one via file.
  119. file.print filter
  120. file.close
  121. variables = get_variables
  122. command_args = [
  123. self.class.jq_command,
  124. '--compact-output',
  125. '--sort-keys',
  126. '--from-file', file.path,
  127. *variables.flat_map { |name, json|
  128. ['--argjson', name, json]
  129. }
  130. ]
  131. log [
  132. "Running jq with filter: #{filter}",
  133. *variables.map { |name, json| "variable: #{name} = #{json}" }
  134. ].join("\n")
  135. Open3.popen3(*command_args) do |stdin, stdout, stderr, wait_thread|
  136. stderr_reader = Thread.new { stderr.read }
  137. stdout_reader = Thread.new { stdout.each_line.flat_map { |line| JSON.parse(line) } }
  138. results, errout, status =
  139. begin
  140. JSON.dump(event.payload, stdin)
  141. stdin.close
  142. [
  143. stdout_reader.value,
  144. stderr_reader.value,
  145. wait_thread.value
  146. ]
  147. rescue Errno::EPIPE
  148. end
  149. if !status.success?
  150. error "Error output from jq:\n#{errout}"
  151. return
  152. end
  153. results.keep_if do |result|
  154. if result.is_a?(Hash)
  155. true
  156. else
  157. error "Ignoring a non-object result: #{result.to_json}"
  158. false
  159. end
  160. end
  161. log "Creating #{results.size} events"
  162. results.each do |payload|
  163. create_event(payload:)
  164. end
  165. end
  166. end
  167. end
  168. end
  169. end