csv_agent.rb 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. module Agents
  2. class CsvAgent < Agent
  3. include FormConfigurable
  4. include FileHandling
  5. cannot_be_scheduled!
  6. consumes_file_pointer!
  7. def default_options
  8. {
  9. 'mode' => 'parse',
  10. 'separator' => ',',
  11. 'use_fields' => '',
  12. 'output' => 'event_per_row',
  13. 'with_header' => 'true',
  14. 'data_path' => '$.data',
  15. 'data_key' => 'data'
  16. }
  17. end
  18. description do
  19. <<~MD
  20. The `CsvAgent` parses or serializes CSV data. When parsing, events can either be emitted for the entire CSV, or one per row.
  21. Set `mode` to `parse` to parse CSV from incoming event, when set to `serialize` the agent serilizes the data of events to CSV.
  22. ### Universal options
  23. Specify the `separator` which is used to seperate the fields from each other (default is `,`).
  24. `data_key` sets the key which contains the serialized CSV or parsed CSV data in emitted events.
  25. ### Parsing
  26. If `use_fields` is set to a comma seperated string and the CSV file contains field headers the agent will only extract the specified fields.
  27. `output` determines wheather one event per row is emitted or one event that includes all the rows.
  28. Set `with_header` to `true` if first line of the CSV file are field names.
  29. #{receiving_file_handling_agent_description}
  30. When receiving the CSV data in a regular event use [JSONPath](http://goessner.net/articles/JsonPath/) to select the path in `data_path`. `data_path` is only used when the received event does not contain a 'file pointer'.
  31. ### Serializing
  32. If `use_fields` is set to a comma seperated string and the first received event has a object at the specified `data_path` the generated CSV will only include the given fields.
  33. Set `with_header` to `true` to include a field header in the CSV.
  34. Use [JSONPath](http://goessner.net/articles/JsonPath/) in `data_path` to select with part of the received events should be serialized.
  35. MD
  36. end
  37. event_description do
  38. data =
  39. if interpolated['mode'] == 'parse'
  40. rows =
  41. if boolify(interpolated['with_header'])
  42. [
  43. { 'column' => 'row1 value1', 'column2' => 'row1 value2' },
  44. { 'column' => 'row2 value3', 'column2' => 'row2 value4' },
  45. ]
  46. else
  47. [
  48. ['row1 value1', 'row1 value2'],
  49. ['row2 value1', 'row2 value2'],
  50. ]
  51. end
  52. if interpolated['output'] == 'event_per_row'
  53. rows[0]
  54. else
  55. rows
  56. end
  57. else
  58. <<~EOS
  59. "generated","csv","data"
  60. "column1","column2","column3"
  61. EOS
  62. end
  63. "Events will looks like this:\n\n " +
  64. Utils.pretty_print({
  65. interpolated['data_key'] => data
  66. })
  67. end
  68. form_configurable :mode, type: :array, values: %w[parse serialize]
  69. form_configurable :separator, type: :string
  70. form_configurable :data_key, type: :string
  71. form_configurable :with_header, type: :boolean
  72. form_configurable :use_fields, type: :string
  73. form_configurable :output, type: :array, values: %w[event_per_row event_per_file]
  74. form_configurable :data_path, type: :string
  75. def validate_options
  76. if options['with_header'].blank? || ![true, false].include?(boolify(options['with_header']))
  77. errors.add(:base, "The 'with_header' options is required and must be set to 'true' or 'false'")
  78. end
  79. if options['mode'] == 'serialize' && options['data_path'].blank?
  80. errors.add(:base, "When mode is set to serialize data_path has to be present.")
  81. end
  82. end
  83. def working?
  84. received_event_without_error?
  85. end
  86. def receive(incoming_events)
  87. case options['mode']
  88. when 'parse'
  89. parse(incoming_events)
  90. when 'serialize'
  91. serialize(incoming_events)
  92. end
  93. end
  94. private
  95. def serialize(incoming_events)
  96. mo = interpolated(incoming_events.first)
  97. rows = rows_from_events(incoming_events, mo)
  98. csv = CSV.generate(col_sep: separator(mo), force_quotes: true) do |csv|
  99. if boolify(mo['with_header']) && rows.first.is_a?(Hash)
  100. csv << if mo['use_fields'].present?
  101. extract_options(mo)
  102. else
  103. rows.first.keys
  104. end
  105. end
  106. rows.each do |data|
  107. csv << if data.is_a?(Hash)
  108. if mo['use_fields'].present?
  109. data.extract!(*extract_options(mo)).values
  110. else
  111. data.values
  112. end
  113. else
  114. data
  115. end
  116. end
  117. end
  118. create_event payload: { mo['data_key'] => csv }
  119. end
  120. def rows_from_events(incoming_events, mo)
  121. [].tap do |rows|
  122. incoming_events.each do |event|
  123. data = Utils.value_at(event.payload, mo['data_path'])
  124. if data.is_a?(Array) && (data[0].is_a?(Array) || data[0].is_a?(Hash))
  125. data.each { |row| rows << row }
  126. else
  127. rows << data
  128. end
  129. end
  130. end
  131. end
  132. def parse(incoming_events)
  133. incoming_events.each do |event|
  134. mo = interpolated(event)
  135. next unless io = local_get_io(event)
  136. if mo['output'] == 'event_per_row'
  137. parse_csv(io, mo) do |payload|
  138. create_event payload: { mo['data_key'] => payload }
  139. end
  140. else
  141. create_event payload: { mo['data_key'] => parse_csv(io, mo, []) }
  142. end
  143. end
  144. end
  145. def local_get_io(event)
  146. if io = get_io(event)
  147. io
  148. else
  149. Utils.value_at(event.payload, interpolated['data_path'])
  150. end
  151. end
  152. def parse_csv_options(mo)
  153. options = {
  154. col_sep: separator(mo),
  155. headers: boolify(mo['with_header']),
  156. }
  157. options[:liberal_parsing] = true if CSV::DEFAULT_OPTIONS.key?(:liberal_parsing)
  158. options
  159. end
  160. def parse_csv(io, mo, array = nil)
  161. CSV.new(io, **parse_csv_options(mo)).each do |row|
  162. if block_given?
  163. yield get_payload(row, mo)
  164. else
  165. array << get_payload(row, mo)
  166. end
  167. end
  168. array
  169. end
  170. def separator(mo)
  171. mo['separator'] == '\\t' ? "\t" : mo['separator']
  172. end
  173. def get_payload(row, mo)
  174. if boolify(mo['with_header'])
  175. if mo['use_fields'].present?
  176. row.to_hash.extract!(*extract_options(mo))
  177. else
  178. row.to_hash
  179. end
  180. else
  181. row
  182. end
  183. end
  184. def extract_options(mo)
  185. mo['use_fields'].split(',').map(&:strip)
  186. end
  187. end
  188. end