123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- require 'open3'
- module Agents
- class JqAgent < Agent
- cannot_be_scheduled!
- can_dry_run!
- def self.should_run?
- !!jq_version
- end
- def self.jq_command
- ENV['USE_JQ'].presence
- end
- def self.jq_version
- if command = jq_command
- Open3.capture2(command, '--version', 2 => IO::NULL).first[/\Ajq-\K\S+/]
- end
- end
- def self.jq_info
- if version = jq_version
- "jq version #{version} is installed"
- else
- "**This agent is not enabled on this server**"
- end
- end
- gem_dependency_check { jq_version }
- description <<~MD
- The Jq Agent allows you to process incoming Events with [jq](https://stedolan.github.io/jq/) the JSON processor. (#{jq_info})
- It allows you to filter, transform and restructure Events in the way you want using jq's powerful features.
- You can specify a jq filter expression to apply to each incoming event in `filter`, and results it produces will become Events to be emitted.
- You can optionally pass in variables to the filter program by specifying key-value pairs of a variable name and an associated value in the `variables` key, each of which becomes a predefined variable.
- This Agent can be used to parse a complex JSON structure that is too hard to handle with JSONPath or Liquid templating.
- For example, suppose that a Post Agent created an Event which contains a `body` key with a value of the JSON formatted string of the following response body:
- {
- "status": "1",
- "since": "1245626956",
- "list": {
- "93817": {
- "item_id": "93817",
- "url": "http://url.com",
- "title": "Page Title",
- "time_updated": "1245626956",
- "time_added": "1245626956",
- "tags": "comma,seperated,list",
- "state": "0"
- },
- "935812": {
- "item_id": "935812",
- "url": "http://google.com",
- "title": "Google",
- "time_updated": "1245635279",
- "time_added": "1245635279",
- "tags": "comma,seperated,list",
- "state": "1"
- }
- }
- }
- Then you could have a Jq Agent with the following jq filter:
- .body | fromjson | .list | to_entries | map(.value) | map(try(.tags |= split(",")) // .) | sort_by(.time_added | tonumber)
- To get the following two Events emitted out of the said incoming Event from Post Agent:
- [
- {
- "item_id": "93817",
- "url": "http://url.com",
- "title": "Page Title",
- "time_updated": "1245626956",
- "time_added": "1245626956",
- "tags": ["comma", "seperated", "list"],
- "state": "0"
- },
- {
- "item_id": "935812",
- "url": "http://google.com",
- "title": "Google",
- "time_updated": "1245626956",
- "time_added": "1245626956",
- "tags": ["comma", "seperated", "list"],
- "state": "1"
- }
- ]
- MD
- def validate_options
- errors.add(:base, "filter needs to be present.") if !options['filter'].is_a?(String)
- errors.add(:base,
- "variables must be a hash if present.") if options.key?('variables') && !options['variables'].is_a?(Hash)
- end
- def default_options
- {
- 'filter' => '.',
- 'variables' => {}
- }
- end
- def working?
- self.class.should_run? && !recent_error_logs?
- end
- def receive(incoming_events)
- if !self.class.should_run?
- log("Unable to run because this agent is not enabled. Edit the USE_JQ environment variable.")
- return
- end
- incoming_events.each do |event|
- interpolate_with(event) do
- process_event(event)
- end
- end
- end
- private
- def get_variables
- variables = interpolated['variables']
- return {} if !variables.is_a?(Hash)
- variables.map { |name, value|
- [name.to_s, value.to_json]
- }.to_h
- end
- def process_event(event)
- Tempfile.create do |file|
- filter = interpolated['filter'].to_s
- # There seems to be no way to force jq to treat an arbitrary
- # string as a filter without being confused with a command
- # line option, so pass one via file.
- file.print filter
- file.close
- variables = get_variables
- command_args = [
- self.class.jq_command,
- '--compact-output',
- '--sort-keys',
- '--from-file', file.path,
- *variables.flat_map { |name, json|
- ['--argjson', name, json]
- }
- ]
- log [
- "Running jq with filter: #{filter}",
- *variables.map { |name, json| "variable: #{name} = #{json}" }
- ].join("\n")
- Open3.popen3(*command_args) do |stdin, stdout, stderr, wait_thread|
- stderr_reader = Thread.new { stderr.read }
- stdout_reader = Thread.new { stdout.each_line.flat_map { |line| JSON.parse(line) } }
- results, errout, status =
- begin
- JSON.dump(event.payload, stdin)
- stdin.close
- [
- stdout_reader.value,
- stderr_reader.value,
- wait_thread.value
- ]
- rescue Errno::EPIPE
- end
- if !status.success?
- error "Error output from jq:\n#{errout}"
- return
- end
- results.keep_if do |result|
- if result.is_a?(Hash)
- true
- else
- error "Ignoring a non-object result: #{result.to_json}"
- false
- end
- end
- log "Creating #{results.size} events"
- results.each do |payload|
- create_event(payload:)
- end
- end
- end
- end
- end
- end
|