123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- module Agents
- class S3Agent < Agent
- include FormConfigurable
- include FileHandling
- emits_file_pointer!
- no_bulk_receive!
- default_schedule 'every_1h'
- gem_dependency_check { defined?(Aws::S3) }
- description do
- <<~MD
- The S3Agent can watch a bucket for changes or emit an event for every file in that bucket. When receiving events, it writes the data into a file on S3.
- #{'## Include `aws-sdk-core` in your Gemfile to use this Agent!' if dependencies_missing?}
- `mode` must be present and either `read` or `write`, in `read` mode the agent checks the S3 bucket for changed files, with `write` it writes received events to a file in the bucket.
- ### Universal options
- To use credentials for the `access_key` and `access_key_secret` use the liquid `credential` tag like so `{% credential name-of-credential %}`
- Select the `region` in which the bucket was created.
- ### Reading
- When `watch` is set to `true` the S3Agent will watch the specified `bucket` for changes. An event will be emitted for every detected change.
- When `watch` is set to `false` the agent will emit an event for every file in the bucket on each sheduled run.
- #{emitting_file_handling_agent_description}
- ### Writing
- Specify the filename to use in `filename`, Liquid interpolation is possible to change the name per event.
- Use [Liquid](https://github.com/huginn/huginn/wiki/Formatting-Events-using-Liquid) templating in `data` to specify which part of the received event should be written.
- MD
- end
- event_description do
- "Events will looks like this:\n\n " +
- if boolify(interpolated['watch'])
- Utils.pretty_print({
- "file_pointer" => {
- "file" => "filename",
- "agent_id" => id
- },
- "event_type" => "modified/added/removed"
- })
- else
- Utils.pretty_print({
- "file_pointer" => {
- "file" => "filename",
- "agent_id" => id
- }
- })
- end
- end
- def default_options
- {
- 'mode' => 'read',
- 'access_key_id' => '',
- 'access_key_secret' => '',
- 'watch' => 'true',
- 'bucket' => "",
- 'data' => '{{ data }}'
- }
- end
- form_configurable :mode, type: :array, values: %w[read write]
- form_configurable :access_key_id, roles: :validatable
- form_configurable :access_key_secret, roles: :validatable
- form_configurable :region, type: :array,
- values: %w[us-east-1 us-west-1 us-west-2 eu-west-1 eu-central-1 ap-southeast-1 ap-southeast-2 ap-northeast-1 ap-northeast-2 sa-east-1]
- form_configurable :watch, type: :array, values: %w[true false]
- form_configurable :bucket, roles: :completable
- form_configurable :filename
- form_configurable :data
- def validate_options
- if options['mode'].blank? || !['read', 'write'].include?(options['mode'])
- errors.add(:base, "The 'mode' option is required and must be set to 'read' or 'write'")
- end
- if options['bucket'].blank?
- errors.add(:base, "The 'bucket' option is required.")
- end
- if options['region'].blank?
- errors.add(:base, "The 'region' option is required.")
- end
- case interpolated['mode']
- when 'read'
- if options['watch'].blank? || ![true, false].include?(boolify(options['watch']))
- errors.add(:base, "The 'watch' option is required and must be set to 'true' or 'false'")
- end
- when 'write'
- if options['filename'].blank?
- errors.add(:base, "filename must be specified in 'write' mode")
- end
- if options['data'].blank?
- errors.add(:base, "data must be specified in 'write' mode")
- end
- end
- end
- def validate_access_key_id
- !!buckets
- end
- def validate_access_key_secret
- !!buckets
- end
- def complete_bucket
- (buckets || []).collect { |room| { text: room.name, id: room.name } }
- end
- def working?
- checked_without_error?
- end
- def check
- return if interpolated['mode'] != 'read'
- contents = safely do
- get_bucket_contents
- end
- if boolify(interpolated['watch'])
- watch(contents)
- else
- contents.each do |key, _|
- create_event payload: get_file_pointer(key)
- end
- end
- end
- def get_io(file)
- client.get_object(bucket: interpolated['bucket'], key: file).body
- end
- def receive(incoming_events)
- return if interpolated['mode'] != 'write'
- incoming_events.each do |event|
- safely do
- mo = interpolated(event)
- client.put_object(bucket: mo['bucket'], key: mo['filename'], body: mo['data'])
- end
- end
- end
- private
- def safely
- yield
- rescue Aws::S3::Errors::AccessDenied => e
- error("Could not access '#{interpolated['bucket']}' #{e.class} #{e.message}")
- rescue Aws::S3::Errors::ServiceError => e
- error("#{e.class}: #{e.message}")
- end
- def watch(contents)
- if last_check_at.nil?
- self.memory['seen_contents'] = contents
- return
- end
- new_memory = contents.dup
- memory['seen_contents'].each do |key, etag|
- if contents[key].blank?
- create_event payload: get_file_pointer(key).merge(event_type: :removed)
- elsif contents[key] != etag
- create_event payload: get_file_pointer(key).merge(event_type: :modified)
- end
- contents.delete(key)
- end
- contents.each do |key, _etag|
- create_event payload: get_file_pointer(key).merge(event_type: :added)
- end
- self.memory['seen_contents'] = new_memory
- end
- def get_bucket_contents
- contents = {}
- client.list_objects(bucket: interpolated['bucket']).each do |response|
- response.contents.each do |file|
- contents[file.key] = file.etag
- end
- end
- contents
- end
- def client
- @client ||= Aws::S3::Client.new(credentials: Aws::Credentials.new(interpolated['access_key_id'], interpolated['access_key_secret']),
- region: interpolated['region'])
- end
- def buckets(log = false)
- @buckets ||= client.list_buckets.buckets
- rescue Aws::S3::Errors::ServiceError => e
- false
- end
- end
- end
|