123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 |
- require 'uri'
- require 'time'
- module Agents
- class FtpsiteAgent < Agent
- include FileHandling
- default_schedule "every_12h"
- gem_dependency_check { defined?(Net::FTP) && defined?(Net::FTP::List) }
- emits_file_pointer!
- description do
- <<~MD
- The Ftp Site Agent checks an FTP site and creates Events based on newly uploaded files in a directory. When receiving events it creates files on the configured FTP server.
- #{'## Include `net-ftp-list` in your Gemfile to use this Agent!' if dependencies_missing?}
- `mode` must be present and either `read` or `write`, in `read` mode the agent checks the FTP site for changed files, with `write` it writes received events to a file on the server.
- ### Universal options
- Specify a `url` that represents a directory of an FTP site to watch, and a list of `patterns` to match against file names.
- Login credentials can be included in `url` if authentication is required: `ftp://username:password@ftp.example.com/path`. Liquid formatting is supported as well: `ftp://{% credential ftp_credentials %}@ftp.example.com/`
- Optionally specify the encoding of the files you want to read/write in `force_encoding`, by default UTF-8 is used.
- ### Reading
- Only files with a last modification time later than the `after` value, if specifed, are emitted as event.
- ### Writing
- Specify the filename to use in `filename`, Liquid interpolation is possible to change the name per event.
- Use [Liquid](https://github.com/huginn/huginn/wiki/Formatting-Events-using-Liquid) templating in `data` to specify which part of the received event should be written.
- #{emitting_file_handling_agent_description}
- MD
- end
- event_description <<~MD
- Events look like this:
- {
- "url": "ftp://example.org/pub/releases/foo-1.2.tar.gz",
- "filename": "foo-1.2.tar.gz",
- "timestamp": "2014-04-10T22:50:00Z"
- }
- MD
- def working?
- if interpolated['mode'] == 'read'
- event_created_within?(interpolated['expected_update_period_in_days']) && !recent_error_logs?
- else
- received_event_without_error?
- end
- end
- def default_options
- {
- 'mode' => 'read',
- 'expected_update_period_in_days' => "1",
- 'url' => "ftp://example.org/pub/releases/",
- 'patterns' => [
- 'foo-*.tar.gz',
- ],
- 'after' => Time.now.iso8601,
- 'force_encoding' => '',
- 'filename' => '',
- 'data' => '{{ data }}'
- }
- end
- def validate_options
- # Check for required fields
- begin
- if !options['url'].include?('{{')
- url = interpolated['url']
- String === url or raise
- uri = URI(url)
- URI::FTP === uri or raise
- errors.add(:base, "url must end with a slash") if uri.path.present? && !uri.path.end_with?('/')
- end
- rescue StandardError
- errors.add(:base, "url must be a valid FTP URL")
- end
- options['mode'] = 'read' if options['mode'].blank? && new_record?
- if options['mode'].blank? || !['read', 'write'].include?(options['mode'])
- errors.add(:base, "The 'mode' option is required and must be set to 'read' or 'write'")
- end
- case interpolated['mode']
- when 'read'
- patterns = options['patterns']
- case patterns
- when Array
- if patterns.empty?
- errors.add(:base, "patterns must not be empty")
- end
- when nil, ''
- errors.add(:base, "patterns must be specified")
- else
- errors.add(:base, "patterns must be an array")
- end
- when 'write'
- if options['filename'].blank?
- errors.add(:base, "filename must be specified in 'write' mode")
- end
- if options['data'].blank?
- errors.add(:base, "data must be specified in 'write' mode")
- end
- end
- # Check for optional fields
- if (timestamp = options['timestamp']).present?
- begin
- Time.parse(timestamp)
- rescue StandardError
- errors.add(:base, "timestamp cannot be parsed as time")
- end
- end
- if options['expected_update_period_in_days'].present?
- errors.add(:base,
- "Invalid expected_update_period_in_days format") unless is_positive_integer?(options['expected_update_period_in_days'])
- end
- end
- def check
- return if interpolated['mode'] != 'read'
- saving_entries do |found|
- each_entry { |filename, mtime|
- found[filename, mtime]
- }
- end
- end
- def receive(incoming_events)
- return if interpolated['mode'] != 'write'
- incoming_events.each do |event|
- mo = interpolated(event)
- mo['data'].encode!(
- interpolated['force_encoding'],
- invalid: :replace,
- undef: :replace
- ) if interpolated['force_encoding'].present?
- open_ftp(base_uri) do |ftp|
- ftp.storbinary("STOR #{mo['filename']}", StringIO.new(mo['data']), Net::FTP::DEFAULT_BLOCKSIZE)
- end
- end
- end
- def each_entry
- patterns = interpolated['patterns']
- after =
- if str = interpolated['after']
- Time.parse(str)
- else
- Time.at(0)
- end
- open_ftp(base_uri) do |ftp|
- log "Listing the directory"
- # Do not use a block style call because we need to call other
- # commands during iteration.
- list = ftp.list('-a')
- list.each do |line|
- entry = Net::FTP::List.parse line
- filename = entry.basename
- mtime = Time.parse(entry.mtime.to_s).utc
- patterns.any? { |pattern|
- File.fnmatch?(pattern, filename)
- } or next
- after < mtime or next
- yield filename, mtime
- end
- end
- end
- def open_ftp(uri)
- ftp = Net::FTP.new
- log "Connecting to #{uri.host}#{':%d' % uri.port if uri.port != uri.default_port}"
- ftp.connect(uri.host, uri.port)
- user =
- if str = uri.user
- CGI.unescape(str)
- else
- 'anonymous'
- end
- password =
- if str = uri.password
- CGI.unescape(str)
- else
- 'anonymous@'
- end
- log "Logging in as #{user}"
- ftp.login(user, password)
- ftp.passive = true
- if (path = uri.path.chomp('/')).present?
- log "Changing directory to #{path}"
- ftp.chdir(path)
- end
- yield ftp
- ensure
- log "Closing the connection"
- ftp.close
- end
- def base_uri
- @base_uri ||= URI(interpolated['url'])
- end
- def saving_entries
- known_entries = memory['known_entries'] || {}
- found_entries = {}
- new_files = []
- yield proc { |filename, mtime|
- found_entries[filename] = misotime = mtime.utc.iso8601
- unless (prev = known_entries[filename]) && misotime <= prev
- new_files << filename
- end
- }
- new_files.sort_by { |filename|
- found_entries[filename]
- }.each { |filename|
- create_event payload: get_file_pointer(filename).merge({
- 'url' => (base_uri + uri_path_escape(filename)).to_s,
- 'filename' => filename,
- 'timestamp' => found_entries[filename],
- })
- }
- memory['known_entries'] = found_entries
- save!
- end
- def get_io(file)
- data = StringIO.new
- open_ftp(base_uri) do |ftp|
- ftp.getbinaryfile(file, nil) do |chunk|
- data.write chunk.force_encoding(options['force_encoding'].presence || 'UTF-8')
- end
- end
- data.rewind
- data
- end
- private
- def uri_path_escape(string)
- str = string.b
- str.gsub!(/([^A-Za-z0-9\-._~!$&()*+,=@]+)/) { |m|
- '%' + m.unpack('H2' * m.bytesize).join('%').upcase
- }
- str.force_encoding(Encoding::US_ASCII)
- end
- end
- end
|