module Agents class S3Agent < Agent include FormConfigurable include FileHandling emits_file_pointer! no_bulk_receive! default_schedule 'every_1h' gem_dependency_check { defined?(Aws::S3) } description do <<~MD The S3Agent can watch a bucket for changes or emit an event for every file in that bucket. When receiving events, it writes the data into a file on S3. #{'## Include `aws-sdk-core` in your Gemfile to use this Agent!' if dependencies_missing?} `mode` must be present and either `read` or `write`, in `read` mode the agent checks the S3 bucket for changed files, with `write` it writes received events to a file in the bucket. ### Universal options To use credentials for the `access_key` and `access_key_secret` use the liquid `credential` tag like so `{% credential name-of-credential %}` Select the `region` in which the bucket was created. ### Reading When `watch` is set to `true` the S3Agent will watch the specified `bucket` for changes. An event will be emitted for every detected change. When `watch` is set to `false` the agent will emit an event for every file in the bucket on each sheduled run. #{emitting_file_handling_agent_description} ### Writing Specify the filename to use in `filename`, Liquid interpolation is possible to change the name per event. Use [Liquid](https://github.com/huginn/huginn/wiki/Formatting-Events-using-Liquid) templating in `data` to specify which part of the received event should be written. MD end event_description do "Events will looks like this:\n\n " + if boolify(interpolated['watch']) Utils.pretty_print({ "file_pointer" => { "file" => "filename", "agent_id" => id }, "event_type" => "modified/added/removed" }) else Utils.pretty_print({ "file_pointer" => { "file" => "filename", "agent_id" => id } }) end end def default_options { 'mode' => 'read', 'access_key_id' => '', 'access_key_secret' => '', 'watch' => 'true', 'bucket' => "", 'data' => '{{ data }}' } end form_configurable :mode, type: :array, values: %w[read write] form_configurable :access_key_id, roles: :validatable form_configurable :access_key_secret, roles: :validatable form_configurable :region, type: :array, values: %w[us-east-1 us-west-1 us-west-2 eu-west-1 eu-central-1 ap-southeast-1 ap-southeast-2 ap-northeast-1 ap-northeast-2 sa-east-1] form_configurable :watch, type: :array, values: %w[true false] form_configurable :bucket, roles: :completable form_configurable :filename form_configurable :data def validate_options if options['mode'].blank? || !['read', 'write'].include?(options['mode']) errors.add(:base, "The 'mode' option is required and must be set to 'read' or 'write'") end if options['bucket'].blank? errors.add(:base, "The 'bucket' option is required.") end if options['region'].blank? errors.add(:base, "The 'region' option is required.") end case interpolated['mode'] when 'read' if options['watch'].blank? || ![true, false].include?(boolify(options['watch'])) errors.add(:base, "The 'watch' option is required and must be set to 'true' or 'false'") end when 'write' if options['filename'].blank? errors.add(:base, "filename must be specified in 'write' mode") end if options['data'].blank? errors.add(:base, "data must be specified in 'write' mode") end end end def validate_access_key_id !!buckets end def validate_access_key_secret !!buckets end def complete_bucket (buckets || []).collect { |room| { text: room.name, id: room.name } } end def working? checked_without_error? end def check return if interpolated['mode'] != 'read' contents = safely do get_bucket_contents end if boolify(interpolated['watch']) watch(contents) else contents.each do |key, _| create_event payload: get_file_pointer(key) end end end def get_io(file) client.get_object(bucket: interpolated['bucket'], key: file).body end def receive(incoming_events) return if interpolated['mode'] != 'write' incoming_events.each do |event| safely do mo = interpolated(event) client.put_object(bucket: mo['bucket'], key: mo['filename'], body: mo['data']) end end end private def safely yield rescue Aws::S3::Errors::AccessDenied => e error("Could not access '#{interpolated['bucket']}' #{e.class} #{e.message}") rescue Aws::S3::Errors::ServiceError => e error("#{e.class}: #{e.message}") end def watch(contents) if last_check_at.nil? self.memory['seen_contents'] = contents return end new_memory = contents.dup memory['seen_contents'].each do |key, etag| if contents[key].blank? create_event payload: get_file_pointer(key).merge(event_type: :removed) elsif contents[key] != etag create_event payload: get_file_pointer(key).merge(event_type: :modified) end contents.delete(key) end contents.each do |key, _etag| create_event payload: get_file_pointer(key).merge(event_type: :added) end self.memory['seen_contents'] = new_memory end def get_bucket_contents contents = {} client.list_objects(bucket: interpolated['bucket']).each do |response| response.contents.each do |file| contents[file.key] = file.etag end end contents end def client @client ||= Aws::S3::Client.new(credentials: Aws::Credentials.new(interpolated['access_key_id'], interpolated['access_key_secret']), region: interpolated['region']) end def buckets(log = false) @buckets ||= client.list_buckets.buckets rescue Aws::S3::Errors::ServiceError => e false end end end