s3_agent.rb 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. module Agents
  2. class S3Agent < Agent
  3. include FormConfigurable
  4. include FileHandling
  5. emits_file_pointer!
  6. no_bulk_receive!
  7. default_schedule 'every_1h'
  8. gem_dependency_check { defined?(Aws::S3) }
  9. description do
  10. <<-MD
  11. The S3Agent can watch a bucket for changes or emit an event for every file in that bucket. When receiving events, it writes the data into a file on S3.
  12. #{'## Include `aws-sdk-core` in your Gemfile to use this Agent!' if dependencies_missing?}
  13. `mode` must be present and either `read` or `write`, in `read` mode the agent checks the S3 bucket for changed files, with `write` it writes received events to a file in the bucket.
  14. ### Universal options
  15. To use credentials for the `access_key` and `access_key_secret` use the liquid `credential` tag like so `{% credential name-of-credential %}`
  16. Select the `region` in which the bucket was created.
  17. ### Reading
  18. When `watch` is set to `true` the S3Agent will watch the specified `bucket` for changes. An event will be emitted for every detected change.
  19. When `watch` is set to `false` the agent will emit an event for every file in the bucket on each sheduled run.
  20. #{emitting_file_handling_agent_description}
  21. ### Writing
  22. Specify the filename to use in `filename`, Liquid interpolation is possible to change the name per event.
  23. Use [Liquid](https://github.com/huginn/huginn/wiki/Formatting-Events-using-Liquid) templating in `data` to specify which part of the received event should be written.
  24. MD
  25. end
  26. event_description do
  27. "Events will looks like this:\n\n %s" % if boolify(interpolated['watch'])
  28. Utils.pretty_print({
  29. "file_pointer" => {
  30. "file" => "filename",
  31. "agent_id" => id
  32. },
  33. "event_type" => "modified/added/removed"
  34. })
  35. else
  36. Utils.pretty_print({
  37. "file_pointer" => {
  38. "file" => "filename",
  39. "agent_id" => id
  40. }
  41. })
  42. end
  43. end
  44. def default_options
  45. {
  46. 'mode' => 'read',
  47. 'access_key_id' => '',
  48. 'access_key_secret' => '',
  49. 'watch' => 'true',
  50. 'bucket' => "",
  51. 'data' => '{{ data }}'
  52. }
  53. end
  54. form_configurable :mode, type: :array, values: %w(read write)
  55. form_configurable :access_key_id, roles: :validatable
  56. form_configurable :access_key_secret, roles: :validatable
  57. form_configurable :region, type: :array, values: %w(us-east-1 us-west-1 us-west-2 eu-west-1 eu-central-1 ap-southeast-1 ap-southeast-2 ap-northeast-1 ap-northeast-2 sa-east-1)
  58. form_configurable :watch, type: :array, values: %w(true false)
  59. form_configurable :bucket, roles: :completable
  60. form_configurable :filename
  61. form_configurable :data
  62. def validate_options
  63. if options['mode'].blank? || !['read', 'write'].include?(options['mode'])
  64. errors.add(:base, "The 'mode' option is required and must be set to 'read' or 'write'")
  65. end
  66. if options['bucket'].blank?
  67. errors.add(:base, "The 'bucket' option is required.")
  68. end
  69. if options['region'].blank?
  70. errors.add(:base, "The 'region' option is required.")
  71. end
  72. case interpolated['mode']
  73. when 'read'
  74. if options['watch'].blank? || ![true, false].include?(boolify(options['watch']))
  75. errors.add(:base, "The 'watch' option is required and must be set to 'true' or 'false'")
  76. end
  77. when 'write'
  78. if options['filename'].blank?
  79. errors.add(:base, "filename must be specified in 'write' mode")
  80. end
  81. if options['data'].blank?
  82. errors.add(:base, "data must be specified in 'write' mode")
  83. end
  84. end
  85. end
  86. def validate_access_key_id
  87. !!buckets
  88. end
  89. def validate_access_key_secret
  90. !!buckets
  91. end
  92. def complete_bucket
  93. (buckets || []).collect { |room| {text: room.name, id: room.name} }
  94. end
  95. def working?
  96. checked_without_error?
  97. end
  98. def check
  99. return if interpolated['mode'] != 'read'
  100. contents = safely do
  101. get_bucket_contents
  102. end
  103. if boolify(interpolated['watch'])
  104. watch(contents)
  105. else
  106. contents.each do |key, _|
  107. create_event payload: get_file_pointer(key)
  108. end
  109. end
  110. end
  111. def get_io(file)
  112. client.get_object(bucket: interpolated['bucket'], key: file).body
  113. end
  114. def receive(incoming_events)
  115. return if interpolated['mode'] != 'write'
  116. incoming_events.each do |event|
  117. safely do
  118. mo = interpolated(event)
  119. client.put_object(bucket: mo['bucket'], key: mo['filename'], body: mo['data'])
  120. end
  121. end
  122. end
  123. private
  124. def safely
  125. yield
  126. rescue Aws::S3::Errors::AccessDenied => e
  127. error("Could not access '#{interpolated['bucket']}' #{e.class} #{e.message}")
  128. rescue Aws::S3::Errors::ServiceError =>e
  129. error("#{e.class}: #{e.message}")
  130. end
  131. def watch(contents)
  132. if last_check_at.nil?
  133. self.memory['seen_contents'] = contents
  134. return
  135. end
  136. new_memory = contents.dup
  137. memory['seen_contents'].each do |key, etag|
  138. if contents[key].blank?
  139. create_event payload: get_file_pointer(key).merge(event_type: :removed)
  140. elsif contents[key] != etag
  141. create_event payload: get_file_pointer(key).merge(event_type: :modified)
  142. end
  143. contents.delete(key)
  144. end
  145. contents.each do |key, etag|
  146. create_event payload: get_file_pointer(key).merge(event_type: :added)
  147. end
  148. self.memory['seen_contents'] = new_memory
  149. end
  150. def get_bucket_contents
  151. contents = {}
  152. client.list_objects(bucket: interpolated['bucket']).each do |response|
  153. response.contents.each do |file|
  154. contents[file.key] = file.etag
  155. end
  156. end
  157. contents
  158. end
  159. def client
  160. @client ||= Aws::S3::Client.new(credentials: Aws::Credentials.new(interpolated['access_key_id'], interpolated['access_key_secret']),
  161. region: interpolated['region'])
  162. end
  163. def buckets(log = false)
  164. @buckets ||= client.list_buckets.buckets
  165. rescue Aws::S3::Errors::ServiceError => e
  166. false
  167. end
  168. end
  169. end