Browse Source

Merge pull request #1055 from cantino/delay_agent

Add DelayAgent for buffering incoming Events
Andrew Cantino 9 years ago
parent
commit
e4d11aae7e

+ 65 - 0
app/models/agents/delay_agent.rb

@@ -0,0 +1,65 @@
+module Agents
+  class DelayAgent < Agent
+    default_schedule "every_12h"
+
+    description <<-MD
+      The DelayAgent stores received Events and emits copies of them on a schedule. Use this as a buffer or queue of Events.
+
+      `max_events` should be set to the maximum number of events that you'd like to hold in the buffer. When this number is
+      reached, new events will either be ignored, or will displace the oldest event already in the buffer, depending on
+      whether you set `keep` to `newest` or `oldest`.
+
+      `expected_receive_period_in_days` is used to determine if the Agent is working. Set it to the maximum number of days
+      that you anticipate passing without this Agent receiving an incoming Event.
+    MD
+
+    def default_options
+      {
+        'expected_receive_period_in_days' => "10",
+        'max_events' => "100",
+        'keep' => 'newest'
+      }
+    end
+
+    def validate_options
+      unless options['expected_receive_period_in_days'].present? && options['expected_receive_period_in_days'].to_i > 0
+        errors.add(:base, "Please provide 'expected_receive_period_in_days' to indicate how many days can pass before this Agent is considered to be not working")
+      end
+
+      unless options['keep'].present? && options['keep'].in?(%w[newest oldest])
+        errors.add(:base, "The 'keep' option is required and must be set to 'oldest' or 'newest'")
+      end
+
+      unless options['max_events'].present? && options['max_events'].to_i > 0
+        errors.add(:base, "The 'max_events' option is required and must be an integer greater than 0")
+      end
+    end
+
+    def working?
+      last_receive_at && last_receive_at > options['expected_receive_period_in_days'].to_i.days.ago && !recent_error_logs?
+    end
+
+    def receive(incoming_events)
+      incoming_events.each do |event|
+        memory['event_ids'] ||= []
+        memory['event_ids'] << event.id
+        if memory['event_ids'].length > interpolated['max_events'].to_i
+          if interpolated['keep'] == 'newest'
+            memory['event_ids'].shift
+          else
+            memory['event_ids'].pop
+          end
+        end
+      end
+    end
+
+    def check
+      if memory['event_ids'] && memory['event_ids'].length > 0
+        received_events.where(id: memory['event_ids']).reorder('events.id asc').each do |event|
+          create_event payload: event.payload
+        end
+        memory['event_ids'] = []
+      end
+    end
+  end
+end

+ 1 - 1
config/initializers/delayed_job.rb

@@ -17,4 +17,4 @@ end
 
 Delayed::Backend::ActiveRecord.configure do |config|
   config.reserve_sql_strategy = :default_sql
-end
+end

+ 112 - 0
spec/models/agents/delay_agent_spec.rb

@@ -0,0 +1,112 @@
+require 'spec_helper'
+
+describe Agents::DelayAgent do
+  let(:agent) do
+    _agent = Agents::DelayAgent.new(name: 'My DelayAgent')
+    _agent.options = _agent.default_options.merge('max_events' => 2)
+    _agent.user = users(:bob)
+    _agent.sources << agents(:bob_website_agent)
+    _agent.save!
+    _agent
+  end
+
+  def create_event
+    _event = Event.new(payload: { random: rand })
+    _event.agent = agents(:bob_website_agent)
+    _event.save!
+    _event
+  end
+
+  let(:first_event) { create_event }
+  let(:second_event) { create_event }
+  let(:third_event) { create_event }
+
+  describe "#working?" do
+    it "checks if events have been received within expected receive period" do
+      expect(agent).not_to be_working
+      Agents::DelayAgent.async_receive agent.id, [events(:bob_website_agent_event).id]
+      expect(agent.reload).to be_working
+      the_future = (agent.options[:expected_receive_period_in_days].to_i + 1).days.from_now
+      stub(Time).now { the_future }
+      expect(agent.reload).not_to be_working
+    end
+  end
+
+  describe "validation" do
+    before do
+      expect(agent).to be_valid
+    end
+
+    it "should validate max_events" do
+      agent.options.delete('max_events')
+      expect(agent).not_to be_valid
+      agent.options['max_events'] = ""
+      expect(agent).not_to be_valid
+      agent.options['max_events'] = "0"
+      expect(agent).not_to be_valid
+      agent.options['max_events'] = "10"
+      expect(agent).to be_valid
+    end
+
+    it "should validate presence of expected_receive_period_in_days" do
+      agent.options['expected_receive_period_in_days'] = ""
+      expect(agent).not_to be_valid
+      agent.options['expected_receive_period_in_days'] = 0
+      expect(agent).not_to be_valid
+      agent.options['expected_receive_period_in_days'] = -1
+      expect(agent).not_to be_valid
+    end
+
+    it "should validate keep" do
+      agent.options.delete('keep')
+      expect(agent).not_to be_valid
+      agent.options['keep'] = ""
+      expect(agent).not_to be_valid
+      agent.options['keep'] = 'wrong'
+      expect(agent).not_to be_valid
+      agent.options['keep'] = 'newest'
+      expect(agent).to be_valid
+      agent.options['keep'] = 'oldest'
+      expect(agent).to be_valid
+    end
+  end
+
+  describe "#receive" do
+    it "records Events" do
+      expect(agent.memory).to be_empty
+      agent.receive([first_event])
+      expect(agent.memory).not_to be_empty
+      agent.receive([second_event])
+      expect(agent.memory['event_ids']).to eq [first_event.id, second_event.id]
+    end
+
+    it "keeps the newest when 'keep' is set to 'newest'" do
+      expect(agent.options['keep']).to eq 'newest'
+      agent.receive([first_event, second_event, third_event])
+      expect(agent.memory['event_ids']).to eq [second_event.id, third_event.id]
+    end
+
+    it "keeps the oldest when 'keep' is set to 'oldest'" do
+      agent.options['keep'] = 'oldest'
+      agent.receive([first_event, second_event, third_event])
+      expect(agent.memory['event_ids']).to eq [first_event.id, second_event.id]
+    end
+  end
+
+  describe "#check" do
+    it "re-emits Events and clears the memory" do
+      agent.receive([first_event, second_event, third_event])
+      expect(agent.memory['event_ids']).to eq [second_event.id, third_event.id]
+
+      expect {
+        agent.check
+      }.to change { agent.events.count }.by(2)
+
+      events = agent.events.reorder('events.id desc')
+      expect(events.first.payload).to eq third_event.payload
+      expect(events.second.payload).to eq second_event.payload
+
+      expect(agent.memory['event_ids']).to eq []
+    end
+  end
+end