1
0

csv_agent.rb 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. module Agents
  2. class CsvAgent < Agent
  3. include FormConfigurable
  4. include FileHandling
  5. cannot_be_scheduled!
  6. consumes_file_pointer!
  7. def default_options
  8. {
  9. 'mode' => 'parse',
  10. 'separator' => ',',
  11. 'use_fields' => '',
  12. 'output' => 'event_per_row',
  13. 'with_header' => 'true',
  14. 'data_path' => '$.data',
  15. 'data_key' => 'data'
  16. }
  17. end
  18. description do
  19. <<-MD
  20. The `CsvAgent` parses or serializes CSV data. When parsing, events can either be emitted for the entire CSV, or one per row.
  21. Set `mode` to `parse` to parse CSV from incoming event, when set to `serialize` the agent serilizes the data of events to CSV.
  22. ### Universal options
  23. Specify the `separator` which is used to seperate the fields from each other (default is `,`).
  24. `data_key` sets the key which contains the serialized CSV or parsed CSV data in emitted events.
  25. ### Parsing
  26. If `use_fields` is set to a comma seperated string and the CSV file contains field headers the agent will only extract the specified fields.
  27. `output` determines wheather one event per row is emitted or one event that includes all the rows.
  28. Set `with_header` to `true` if first line of the CSV file are field names.
  29. #{receiving_file_handling_agent_description}
  30. When receiving the CSV data in a regular event use [JSONPath](http://goessner.net/articles/JsonPath/) to select the path in `data_path`. `data_path` is only used when the received event does not contain a 'file pointer'.
  31. ### Serializing
  32. If `use_fields` is set to a comma seperated string and the first received event has a object at the specified `data_path` the generated CSV will only include the given fields.
  33. Set `with_header` to `true` to include a field header in the CSV.
  34. Use [JSONPath](http://goessner.net/articles/JsonPath/) in `data_path` to select with part of the received events should be serialized.
  35. MD
  36. end
  37. event_description do
  38. "Events will looks like this:\n\n %s" % if interpolated['mode'] == 'parse'
  39. rows = if boolify(interpolated['with_header'])
  40. [{'column' => 'row1 value1', 'column2' => 'row1 value2'}, {'column' => 'row2 value3', 'column2' => 'row2 value4'}]
  41. else
  42. [['row1 value1', 'row1 value2'], ['row2 value1', 'row2 value2']]
  43. end
  44. if interpolated['output'] == 'event_per_row'
  45. Utils.pretty_print(interpolated['data_key'] => rows[0])
  46. else
  47. Utils.pretty_print(interpolated['data_key'] => rows)
  48. end
  49. else
  50. Utils.pretty_print(interpolated['data_key'] => '"generated","csv","data"' + "\n" + '"column1","column2","column3"')
  51. end
  52. end
  53. form_configurable :mode, type: :array, values: %w(parse serialize)
  54. form_configurable :separator, type: :string
  55. form_configurable :data_key, type: :string
  56. form_configurable :with_header, type: :boolean
  57. form_configurable :use_fields, type: :string
  58. form_configurable :output, type: :array, values: %w(event_per_row event_per_file)
  59. form_configurable :data_path, type: :string
  60. def validate_options
  61. if options['with_header'].blank? || ![true, false].include?(boolify(options['with_header']))
  62. errors.add(:base, "The 'with_header' options is required and must be set to 'true' or 'false'")
  63. end
  64. if options['mode'] == 'serialize' && options['data_path'].blank?
  65. errors.add(:base, "When mode is set to serialize data_path has to be present.")
  66. end
  67. end
  68. def working?
  69. received_event_without_error?
  70. end
  71. def receive(incoming_events)
  72. case options['mode']
  73. when 'parse'
  74. parse(incoming_events)
  75. when 'serialize'
  76. serialize(incoming_events)
  77. end
  78. end
  79. private
  80. def serialize(incoming_events)
  81. mo = interpolated(incoming_events.first)
  82. rows = rows_from_events(incoming_events, mo)
  83. csv = CSV.generate(col_sep: separator(mo), force_quotes: true ) do |csv|
  84. if boolify(mo['with_header']) && rows.first.is_a?(Hash)
  85. if mo['use_fields'].present?
  86. csv << extract_options(mo)
  87. else
  88. csv << rows.first.keys
  89. end
  90. end
  91. rows.each do |data|
  92. if data.is_a?(Hash)
  93. if mo['use_fields'].present?
  94. csv << data.extract!(*extract_options(mo)).values
  95. else
  96. csv << data.values
  97. end
  98. else
  99. csv << data
  100. end
  101. end
  102. end
  103. create_event payload: { mo['data_key'] => csv }
  104. end
  105. def rows_from_events(incoming_events, mo)
  106. [].tap do |rows|
  107. incoming_events.each do |event|
  108. data = Utils.value_at(event.payload, mo['data_path'])
  109. if data.is_a?(Array) && (data[0].is_a?(Array) || data[0].is_a?(Hash))
  110. data.each { |row| rows << row }
  111. else
  112. rows << data
  113. end
  114. end
  115. end
  116. end
  117. def parse(incoming_events)
  118. incoming_events.each do |event|
  119. mo = interpolated(event)
  120. next unless io = local_get_io(event)
  121. if mo['output'] == 'event_per_row'
  122. parse_csv(io, mo) do |payload|
  123. create_event payload: { mo['data_key'] => payload }
  124. end
  125. else
  126. create_event payload: { mo['data_key'] => parse_csv(io, mo, []) }
  127. end
  128. end
  129. end
  130. def local_get_io(event)
  131. if io = get_io(event)
  132. io
  133. else
  134. Utils.value_at(event.payload, interpolated['data_path'])
  135. end
  136. end
  137. def parse_csv_options(mo)
  138. options = {
  139. col_sep: separator(mo),
  140. headers: boolify(mo['with_header']),
  141. }
  142. options[:liberal_parsing] = true if CSV::DEFAULT_OPTIONS.key?(:liberal_parsing)
  143. options
  144. end
  145. def parse_csv(io, mo, array = nil)
  146. CSV.new(io, **parse_csv_options(mo)).each do |row|
  147. if block_given?
  148. yield get_payload(row, mo)
  149. else
  150. array << get_payload(row, mo)
  151. end
  152. end
  153. array
  154. end
  155. def separator(mo)
  156. mo['separator'] == '\\t' ? "\t" : mo['separator']
  157. end
  158. def get_payload(row, mo)
  159. if boolify(mo['with_header'])
  160. if mo['use_fields'].present?
  161. row.to_hash.extract!(*extract_options(mo))
  162. else
  163. row.to_hash
  164. end
  165. else
  166. row
  167. end
  168. end
  169. def extract_options(mo)
  170. mo['use_fields'].split(',').map(&:strip)
  171. end
  172. end
  173. end