ftpsite_agent.rb 5.2 KB


  1. require 'uri'
  2. require 'time'
  3. module Agents
  4. class FtpsiteAgent < Agent
  5. cannot_receive_events!
  6. default_schedule "every_12h"
  7. gem_dependency_check { defined?(Net::FTP) && defined?(Net::FTP::List) }
  8. description <<-MD
  9. The FTP Site Agent checks an FTP site and creates Events based on newly uploaded files in a directory.
  10. #{'## Include `net-ftp-list` in your Gemfile to use this Agent!' if dependencies_missing?}
  11. Specify a `url` that represents a directory of an FTP site to watch, and a list of `patterns` to match against file names.
  12. Login credentials can be included in `url` if authentication is required.
  13. Only files with a last modification time later than the `after` value, if specifed, are notified.
  14. MD
  15. event_description <<-MD
  16. Events look like this:
  17. {
  18. "url": "ftp://example.org/pub/releases/foo-1.2.tar.gz",
  19. "filename": "foo-1.2.tar.gz",
  20. "timestamp": "2014-04-10T22:50:00Z"
  21. }
  22. MD
  23. def working?
  24. event_created_within?(interpolated['expected_update_period_in_days']) && !recent_error_logs?
  25. end
  26. def default_options
  27. {
  28. 'expected_update_period_in_days' => "1",
  29. 'url' => "ftp://example.org/pub/releases/",
  30. 'patterns' => [
  31. 'foo-*.tar.gz',
  32. ],
  33. 'after' => Time.now.iso8601,
  34. }
  35. end
  36. def validate_options
  37. # Check for required fields
  38. begin
  39. url = options['url']
  40. String === url or raise
  41. uri = URI(url)
  42. URI::FTP === uri or raise
  43. errors.add(:base, "url must end with a slash") unless uri.path.end_with?('/')
  44. rescue
  45. errors.add(:base, "url must be a valid FTP URL")
  46. end
  47. patterns = options['patterns']
  48. case patterns
  49. when Array
  50. if patterns.empty?
  51. errors.add(:base, "patterns must not be empty")
  52. end
  53. when nil, ''
  54. errors.add(:base, "patterns must be specified")
  55. else
  56. errors.add(:base, "patterns must be an array")
  57. end
  58. # Check for optional fields
  59. if (timestamp = options['timestamp']).present?
  60. begin
  61. Time.parse(timestamp)
  62. rescue
  63. errors.add(:base, "timestamp cannot be parsed as time")
  64. end
  65. end
  66. if options['expected_update_period_in_days'].present?
  67. errors.add(:base, "Invalid expected_update_period_in_days format") unless is_positive_integer?(options['expected_update_period_in_days'])
  68. end
  69. end
  70. def check
  71. saving_entries do |found|
  72. each_entry { |filename, mtime|
  73. found[filename, mtime]
  74. }
  75. end
  76. end
  77. def each_entry
  78. patterns = interpolated['patterns']
  79. after =
  80. if str = interpolated['after']
  81. Time.parse(str)
  82. else
  83. Time.at(0)
  84. end
  85. open_ftp(base_uri) do |ftp|
  86. log "Listing the directory"
  87. # Do not use a block style call because we need to call other
  88. # commands during iteration.
  89. list = ftp.list('-a')
  90. list.each do |line|
  91. entry = Net::FTP::List.parse line
  92. filename = entry.basename
  93. mtime = Time.parse(entry.mtime.to_s).utc
  94. patterns.any? { |pattern|
  95. File.fnmatch?(pattern, filename)
  96. } or next
  97. after < mtime or next
  98. yield filename, mtime
  99. end
  100. end
  101. end
  102. def open_ftp(uri)
  103. ftp = Net::FTP.new
  104. log "Connecting to #{uri.host}#{':%d' % uri.port if uri.port != uri.default_port}"
  105. ftp.connect(uri.host, uri.port)
  106. user =
  107. if str = uri.user
  108. URI.decode(str)
  109. else
  110. 'anonymous'
  111. end
  112. password =
  113. if str = uri.password
  114. URI.decode(str)
  115. else
  116. 'anonymous@'
  117. end
  118. log "Logging in as #{user}"
  119. ftp.login(user, password)
  120. ftp.passive = true
  121. path = uri.path.chomp('/')
  122. log "Changing directory to #{path}"
  123. ftp.chdir(path)
  124. yield ftp
  125. ensure
  126. log "Closing the connection"
  127. ftp.close
  128. end
  129. def base_uri
  130. @base_uri ||= URI(interpolated['url'])
  131. end
  132. def saving_entries
  133. known_entries = memory['known_entries'] || {}
  134. found_entries = {}
  135. new_files = []
  136. yield proc { |filename, mtime|
  137. found_entries[filename] = misotime = mtime.utc.iso8601
  138. unless (prev = known_entries[filename]) && misotime <= prev
  139. new_files << filename
  140. end
  141. }
  142. new_files.sort_by { |filename|
  143. found_entries[filename]
  144. }.each { |filename|
  145. create_event payload: {
  146. 'url' => (base_uri + uri_path_escape(filename)).to_s,
  147. 'filename' => filename,
  148. 'timestamp' => found_entries[filename],
  149. }
  150. }
  151. memory['known_entries'] = found_entries
  152. save!
  153. end
  154. private
  155. def is_positive_integer?(value)
  156. Integer(value) >= 0
  157. rescue
  158. false
  159. end
  160. def uri_path_escape(string)
  161. str = string.dup.force_encoding(Encoding::ASCII_8BIT) # string.b in Ruby >=2.0
  162. str.gsub!(/([^A-Za-z0-9\-._~!$&()*+,=@]+)/) { |m|
  163. '%' + m.unpack('H2' * m.bytesize).join('%').upcase
  164. }
  165. str.force_encoding(Encoding::US_ASCII)
  166. end
  167. end
  168. end