s3_agent.rb 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. module Agents
  2. class S3Agent < Agent
  3. include FormConfigurable
  4. include FileHandling
  5. emits_file_pointer!
  6. no_bulk_receive!
  7. default_schedule 'every_1h'
  8. gem_dependency_check { defined?(Aws::S3) }
  9. description do
  10. <<~MD
  11. The S3Agent can watch a bucket for changes or emit an event for every file in that bucket. When receiving events, it writes the data into a file on S3.
  12. #{'## Include `aws-sdk-core` in your Gemfile to use this Agent!' if dependencies_missing?}
  13. `mode` must be present and either `read` or `write`, in `read` mode the agent checks the S3 bucket for changed files, with `write` it writes received events to a file in the bucket.
  14. ### Universal options
  15. To use credentials for the `access_key` and `access_key_secret` use the liquid `credential` tag like so `{% credential name-of-credential %}`
  16. Select the `region` in which the bucket was created.
  17. ### Reading
  18. When `watch` is set to `true` the S3Agent will watch the specified `bucket` for changes. An event will be emitted for every detected change.
  19. When `watch` is set to `false` the agent will emit an event for every file in the bucket on each sheduled run.
  20. #{emitting_file_handling_agent_description}
  21. ### Writing
  22. Specify the filename to use in `filename`, Liquid interpolation is possible to change the name per event.
  23. Use [Liquid](https://github.com/huginn/huginn/wiki/Formatting-Events-using-Liquid) templating in `data` to specify which part of the received event should be written.
  24. MD
  25. end
  26. event_description do
  27. "Events will looks like this:\n\n " +
  28. if boolify(interpolated['watch'])
  29. Utils.pretty_print({
  30. "file_pointer" => {
  31. "file" => "filename",
  32. "agent_id" => id
  33. },
  34. "event_type" => "modified/added/removed"
  35. })
  36. else
  37. Utils.pretty_print({
  38. "file_pointer" => {
  39. "file" => "filename",
  40. "agent_id" => id
  41. }
  42. })
  43. end
  44. end
  45. def default_options
  46. {
  47. 'mode' => 'read',
  48. 'access_key_id' => '',
  49. 'access_key_secret' => '',
  50. 'watch' => 'true',
  51. 'bucket' => "",
  52. 'data' => '{{ data }}'
  53. }
  54. end
  55. form_configurable :mode, type: :array, values: %w[read write]
  56. form_configurable :access_key_id, roles: :validatable
  57. form_configurable :access_key_secret, roles: :validatable
  58. form_configurable :region, type: :array,
  59. values: %w[us-east-1 us-west-1 us-west-2 eu-west-1 eu-central-1 ap-southeast-1 ap-southeast-2 ap-northeast-1 ap-northeast-2 sa-east-1]
  60. form_configurable :watch, type: :array, values: %w[true false]
  61. form_configurable :bucket, roles: :completable
  62. form_configurable :filename
  63. form_configurable :data
  64. def validate_options
  65. if options['mode'].blank? || !['read', 'write'].include?(options['mode'])
  66. errors.add(:base, "The 'mode' option is required and must be set to 'read' or 'write'")
  67. end
  68. if options['bucket'].blank?
  69. errors.add(:base, "The 'bucket' option is required.")
  70. end
  71. if options['region'].blank?
  72. errors.add(:base, "The 'region' option is required.")
  73. end
  74. case interpolated['mode']
  75. when 'read'
  76. if options['watch'].blank? || ![true, false].include?(boolify(options['watch']))
  77. errors.add(:base, "The 'watch' option is required and must be set to 'true' or 'false'")
  78. end
  79. when 'write'
  80. if options['filename'].blank?
  81. errors.add(:base, "filename must be specified in 'write' mode")
  82. end
  83. if options['data'].blank?
  84. errors.add(:base, "data must be specified in 'write' mode")
  85. end
  86. end
  87. end
  88. def validate_access_key_id
  89. !!buckets
  90. end
  91. def validate_access_key_secret
  92. !!buckets
  93. end
  94. def complete_bucket
  95. (buckets || []).collect { |room| { text: room.name, id: room.name } }
  96. end
  97. def working?
  98. checked_without_error?
  99. end
  100. def check
  101. return if interpolated['mode'] != 'read'
  102. contents = safely do
  103. get_bucket_contents
  104. end
  105. if boolify(interpolated['watch'])
  106. watch(contents)
  107. else
  108. contents.each do |key, _|
  109. create_event payload: get_file_pointer(key)
  110. end
  111. end
  112. end
  113. def get_io(file)
  114. client.get_object(bucket: interpolated['bucket'], key: file).body
  115. end
  116. def receive(incoming_events)
  117. return if interpolated['mode'] != 'write'
  118. incoming_events.each do |event|
  119. safely do
  120. mo = interpolated(event)
  121. client.put_object(bucket: mo['bucket'], key: mo['filename'], body: mo['data'])
  122. end
  123. end
  124. end
  125. private
  126. def safely
  127. yield
  128. rescue Aws::S3::Errors::AccessDenied => e
  129. error("Could not access '#{interpolated['bucket']}' #{e.class} #{e.message}")
  130. rescue Aws::S3::Errors::ServiceError => e
  131. error("#{e.class}: #{e.message}")
  132. end
  133. def watch(contents)
  134. if last_check_at.nil?
  135. self.memory['seen_contents'] = contents
  136. return
  137. end
  138. new_memory = contents.dup
  139. memory['seen_contents'].each do |key, etag|
  140. if contents[key].blank?
  141. create_event payload: get_file_pointer(key).merge(event_type: :removed)
  142. elsif contents[key] != etag
  143. create_event payload: get_file_pointer(key).merge(event_type: :modified)
  144. end
  145. contents.delete(key)
  146. end
  147. contents.each do |key, _etag|
  148. create_event payload: get_file_pointer(key).merge(event_type: :added)
  149. end
  150. self.memory['seen_contents'] = new_memory
  151. end
  152. def get_bucket_contents
  153. contents = {}
  154. client.list_objects(bucket: interpolated['bucket']).each do |response|
  155. response.contents.each do |file|
  156. contents[file.key] = file.etag
  157. end
  158. end
  159. contents
  160. end
  161. def client
  162. @client ||= Aws::S3::Client.new(credentials: Aws::Credentials.new(interpolated['access_key_id'], interpolated['access_key_secret']),
  163. region: interpolated['region'])
  164. end
  165. def buckets(log = false)
  166. @buckets ||= client.list_buckets.buckets
  167. rescue Aws::S3::Errors::ServiceError => e
  168. false
  169. end
  170. end
  171. end