1
0

ftpsite_agent.rb 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. require 'uri'
  2. require 'time'
  3. module Agents
  4. class FtpsiteAgent < Agent
  5. include FileHandling
  6. default_schedule "every_12h"
  7. gem_dependency_check { defined?(Net::FTP) && defined?(Net::FTP::List) }
  8. emits_file_pointer!
  9. description do
  10. <<~MD
  11. The Ftp Site Agent checks an FTP site and creates Events based on newly uploaded files in a directory. When receiving events it creates files on the configured FTP server.
  12. #{'## Include `net-ftp-list` in your Gemfile to use this Agent!' if dependencies_missing?}
  13. `mode` must be present and either `read` or `write`, in `read` mode the agent checks the FTP site for changed files, with `write` it writes received events to a file on the server.
  14. ### Universal options
  15. Specify a `url` that represents a directory of an FTP site to watch, and a list of `patterns` to match against file names.
  16. Login credentials can be included in `url` if authentication is required: `ftp://username:password@ftp.example.com/path`. Liquid formatting is supported as well: `ftp://{% credential ftp_credentials %}@ftp.example.com/`
  17. Optionally specify the encoding of the files you want to read/write in `force_encoding`, by default UTF-8 is used.
  18. ### Reading
  19. Only files with a last modification time later than the `after` value, if specifed, are emitted as event.
  20. ### Writing
  21. Specify the filename to use in `filename`, Liquid interpolation is possible to change the name per event.
  22. Use [Liquid](https://github.com/huginn/huginn/wiki/Formatting-Events-using-Liquid) templating in `data` to specify which part of the received event should be written.
  23. #{emitting_file_handling_agent_description}
  24. MD
  25. end
  26. event_description <<~MD
  27. Events look like this:
  28. {
  29. "url": "ftp://example.org/pub/releases/foo-1.2.tar.gz",
  30. "filename": "foo-1.2.tar.gz",
  31. "timestamp": "2014-04-10T22:50:00Z"
  32. }
  33. MD
  34. def working?
  35. if interpolated['mode'] == 'read'
  36. event_created_within?(interpolated['expected_update_period_in_days']) && !recent_error_logs?
  37. else
  38. received_event_without_error?
  39. end
  40. end
  41. def default_options
  42. {
  43. 'mode' => 'read',
  44. 'expected_update_period_in_days' => "1",
  45. 'url' => "ftp://example.org/pub/releases/",
  46. 'patterns' => [
  47. 'foo-*.tar.gz',
  48. ],
  49. 'after' => Time.now.iso8601,
  50. 'force_encoding' => '',
  51. 'filename' => '',
  52. 'data' => '{{ data }}'
  53. }
  54. end
  55. def validate_options
  56. # Check for required fields
  57. begin
  58. if !options['url'].include?('{{')
  59. url = interpolated['url']
  60. String === url or raise
  61. uri = URI(url)
  62. URI::FTP === uri or raise
  63. errors.add(:base, "url must end with a slash") if uri.path.present? && !uri.path.end_with?('/')
  64. end
  65. rescue StandardError
  66. errors.add(:base, "url must be a valid FTP URL")
  67. end
  68. options['mode'] = 'read' if options['mode'].blank? && new_record?
  69. if options['mode'].blank? || !['read', 'write'].include?(options['mode'])
  70. errors.add(:base, "The 'mode' option is required and must be set to 'read' or 'write'")
  71. end
  72. case interpolated['mode']
  73. when 'read'
  74. patterns = options['patterns']
  75. case patterns
  76. when Array
  77. if patterns.empty?
  78. errors.add(:base, "patterns must not be empty")
  79. end
  80. when nil, ''
  81. errors.add(:base, "patterns must be specified")
  82. else
  83. errors.add(:base, "patterns must be an array")
  84. end
  85. when 'write'
  86. if options['filename'].blank?
  87. errors.add(:base, "filename must be specified in 'write' mode")
  88. end
  89. if options['data'].blank?
  90. errors.add(:base, "data must be specified in 'write' mode")
  91. end
  92. end
  93. # Check for optional fields
  94. if (timestamp = options['timestamp']).present?
  95. begin
  96. Time.parse(timestamp)
  97. rescue StandardError
  98. errors.add(:base, "timestamp cannot be parsed as time")
  99. end
  100. end
  101. if options['expected_update_period_in_days'].present?
  102. errors.add(:base,
  103. "Invalid expected_update_period_in_days format") unless is_positive_integer?(options['expected_update_period_in_days'])
  104. end
  105. end
  106. def check
  107. return if interpolated['mode'] != 'read'
  108. saving_entries do |found|
  109. each_entry { |filename, mtime|
  110. found[filename, mtime]
  111. }
  112. end
  113. end
  114. def receive(incoming_events)
  115. return if interpolated['mode'] != 'write'
  116. incoming_events.each do |event|
  117. mo = interpolated(event)
  118. mo['data'].encode!(
  119. interpolated['force_encoding'],
  120. invalid: :replace,
  121. undef: :replace
  122. ) if interpolated['force_encoding'].present?
  123. open_ftp(base_uri) do |ftp|
  124. ftp.storbinary("STOR #{mo['filename']}", StringIO.new(mo['data']), Net::FTP::DEFAULT_BLOCKSIZE)
  125. end
  126. end
  127. end
  128. def each_entry
  129. patterns = interpolated['patterns']
  130. after =
  131. if str = interpolated['after']
  132. Time.parse(str)
  133. else
  134. Time.at(0)
  135. end
  136. open_ftp(base_uri) do |ftp|
  137. log "Listing the directory"
  138. # Do not use a block style call because we need to call other
  139. # commands during iteration.
  140. list = ftp.list('-a')
  141. list.each do |line|
  142. entry = Net::FTP::List.parse line
  143. filename = entry.basename
  144. mtime = Time.parse(entry.mtime.to_s).utc
  145. patterns.any? { |pattern|
  146. File.fnmatch?(pattern, filename)
  147. } or next
  148. after < mtime or next
  149. yield filename, mtime
  150. end
  151. end
  152. end
  153. def open_ftp(uri)
  154. ftp = Net::FTP.new
  155. log "Connecting to #{uri.host}#{':%d' % uri.port if uri.port != uri.default_port}"
  156. ftp.connect(uri.host, uri.port)
  157. user =
  158. if str = uri.user
  159. CGI.unescape(str)
  160. else
  161. 'anonymous'
  162. end
  163. password =
  164. if str = uri.password
  165. CGI.unescape(str)
  166. else
  167. 'anonymous@'
  168. end
  169. log "Logging in as #{user}"
  170. ftp.login(user, password)
  171. ftp.passive = true
  172. if (path = uri.path.chomp('/')).present?
  173. log "Changing directory to #{path}"
  174. ftp.chdir(path)
  175. end
  176. yield ftp
  177. ensure
  178. log "Closing the connection"
  179. ftp.close
  180. end
  181. def base_uri
  182. @base_uri ||= URI(interpolated['url'])
  183. end
  184. def saving_entries
  185. known_entries = memory['known_entries'] || {}
  186. found_entries = {}
  187. new_files = []
  188. yield proc { |filename, mtime|
  189. found_entries[filename] = misotime = mtime.utc.iso8601
  190. unless (prev = known_entries[filename]) && misotime <= prev
  191. new_files << filename
  192. end
  193. }
  194. new_files.sort_by { |filename|
  195. found_entries[filename]
  196. }.each { |filename|
  197. create_event payload: get_file_pointer(filename).merge({
  198. 'url' => (base_uri + uri_path_escape(filename)).to_s,
  199. 'filename' => filename,
  200. 'timestamp' => found_entries[filename],
  201. })
  202. }
  203. memory['known_entries'] = found_entries
  204. save!
  205. end
  206. def get_io(file)
  207. data = StringIO.new
  208. open_ftp(base_uri) do |ftp|
  209. ftp.getbinaryfile(file, nil) do |chunk|
  210. data.write chunk.force_encoding(options['force_encoding'].presence || 'UTF-8')
  211. end
  212. end
  213. data.rewind
  214. data
  215. end
  216. private
  217. def uri_path_escape(string)
  218. str = string.b
  219. str.gsub!(/([^A-Za-z0-9\-._~!$&()*+,=@]+)/) { |m|
  220. '%' + m.unpack('H2' * m.bytesize).join('%').upcase
  221. }
  222. str.force_encoding(Encoding::US_ASCII)
  223. end
  224. end
  225. end