Sfoglia il codice sorgente

Add no_bulk_receive! option to ensure each event is processed individually

no_bulk_receive! ensures that agents which connect to an external only receive
one event per job this ensures that failed and retried jobs do not run twice
for the same event.

Fixes #1226
Dominik Sander 9 anni fa
parent
commit
a958b15f18

+ 22 - 6
app/models/agent.rb

@@ -208,6 +208,10 @@ class Agent < ActiveRecord::Base
     self.class.can_dry_run?
   end
 
+  def no_bulk_receive?
+    self.class.no_bulk_receive?
+  end
+
   def log(message, options = {})
     AgentLog.log_for_agent(self, message, options)
   end
@@ -350,6 +354,14 @@ class Agent < ActiveRecord::Base
       !!@can_dry_run
     end
 
+    def no_bulk_receive!
+      @no_bulk_receive = true
+    end
+
+    def no_bulk_receive?
+      !!@no_bulk_receive
+    end
+
     def gem_dependency_check
       @gem_dependencies_checked = true
       @gem_dependencies_met = yield
@@ -365,7 +377,7 @@ class Agent < ActiveRecord::Base
     def receive!(options={})
       Agent.transaction do
         scope = Agent.
-                select("agents.id AS receiver_agent_id, sources.id AS source_agent_id, events.id AS event_id").
+                select("agents.id AS receiver_agent_id, events.id AS event_id").
                 joins("JOIN links ON (links.receiver_id = agents.id)").
                 joins("JOIN agents AS sources ON (links.source_id = sources.id)").
                 joins("JOIN events ON (events.agent_id = sources.id AND events.id > links.event_id_at_creation)").
@@ -377,21 +389,25 @@ class Agent < ActiveRecord::Base
         sql = scope.to_sql()
 
         agents_to_events = {}
-        Agent.connection.select_rows(sql).each do |receiver_agent_id, source_agent_id, event_id|
+        Agent.connection.select_rows(sql).each do |receiver_agent_id, event_id|
           agents_to_events[receiver_agent_id.to_i] ||= []
           agents_to_events[receiver_agent_id.to_i] << event_id
         end
 
-        event_ids = agents_to_events.values.flatten.uniq.compact
-
         Agent.where(:id => agents_to_events.keys).each do |agent|
+          event_ids = agents_to_events[agent.id].uniq
           agent.update_attribute :last_checked_event_id, event_ids.max
-          Agent.async_receive(agent.id, agents_to_events[agent.id].uniq)
+
+          if agent.no_bulk_receive?
+            event_ids.each { |event_id| Agent.async_receive(agent.id, [event_id]) }
+          else
+            Agent.async_receive(agent.id, event_ids)
+          end
         end
 
         {
           :agent_count => agents_to_events.keys.length,
-          :event_count => event_ids.length
+          :event_count => agents_to_events.values.flatten.uniq.compact.length
         }
       end
     end

+ 1 - 0
app/models/agents/beeper_agent.rb

@@ -2,6 +2,7 @@ module Agents
   class BeeperAgent < Agent
     cannot_be_scheduled!
     cannot_create_events!
+    no_bulk_receive!
 
     description <<-MD
       Beeper agent sends messages to Beeper app on your mobile device via Push notifications.

+ 1 - 0
app/models/agents/dropbox_file_url_agent.rb

@@ -3,6 +3,7 @@ module Agents
     include DropboxConcern
 
     cannot_be_scheduled!
+    no_bulk_receive!
 
     description <<-MD
       The Dropbox File Url Agent is used to work with Dropbox. It takes a file path (or multiple file paths) and emits events with [temporary links](https://www.dropbox.com/developers/core/docs#media).

+ 1 - 0
app/models/agents/email_agent.rb

@@ -4,6 +4,7 @@ module Agents
 
     cannot_be_scheduled!
     cannot_create_events!
+    no_bulk_receive!
 
     description <<-MD
       The Email Agent sends any events it receives via email immediately.

+ 1 - 0
app/models/agents/google_calendar_publish_agent.rb

@@ -3,6 +3,7 @@ require 'json'
 module Agents
   class GoogleCalendarPublishAgent < Agent
     cannot_be_scheduled!
+    no_bulk_receive!
 
     gem_dependency_check { defined?(Google) && defined?(Google::APIClient) }
 

+ 1 - 0
app/models/agents/hipchat_agent.rb

@@ -4,6 +4,7 @@ module Agents
 
     cannot_be_scheduled!
     cannot_create_events!
+    no_bulk_receive!
 
     gem_dependency_check { defined?(HipChat) }
 

+ 1 - 0
app/models/agents/pdf_info_agent.rb

@@ -7,6 +7,7 @@ module Agents
     gem_dependency_check { defined?(HyPDF) }
 
     cannot_be_scheduled!
+    no_bulk_receive!
 
     description <<-MD
       The PDF Info Agent returns the metadata contained within a given PDF file, using HyPDF.

+ 1 - 0
app/models/agents/post_agent.rb

@@ -3,6 +3,7 @@ module Agents
     include WebRequestConcern
 
     can_dry_run!
+    no_bulk_receive!
     default_schedule "never"
 
     description <<-MD

+ 1 - 0
app/models/agents/pushbullet_agent.rb

@@ -4,6 +4,7 @@ module Agents
 
     cannot_be_scheduled!
     cannot_create_events!
+    no_bulk_receive!
 
     before_validation :create_device, on: :create
 

+ 2 - 0
app/models/agents/pushover_agent.rb

@@ -2,6 +2,8 @@ module Agents
   class PushoverAgent < Agent
     cannot_be_scheduled!
     cannot_create_events!
+    no_bulk_receive!
+
 
     API_URL = 'https://api.pushover.net/1/messages.json'
 

+ 2 - 0
app/models/agents/shell_command_agent.rb

@@ -3,6 +3,8 @@ module Agents
     default_schedule "never"
 
     can_dry_run!
+    no_bulk_receive!
+
 
     def self.should_run?
       ENV['ENABLE_INSECURE_AGENTS'] == "true"

+ 1 - 0
app/models/agents/slack_agent.rb

@@ -5,6 +5,7 @@ module Agents
 
     cannot_be_scheduled!
     cannot_create_events!
+    no_bulk_receive!
 
     gem_dependency_check { defined?(Slack) }
 

+ 1 - 0
app/models/agents/twilio_agent.rb

@@ -4,6 +4,7 @@ module Agents
   class TwilioAgent < Agent
     cannot_be_scheduled!
     cannot_create_events!
+    no_bulk_receive!
 
     gem_dependency_check { defined?(Twilio) }
 

+ 1 - 0
app/models/agents/website_agent.rb

@@ -7,6 +7,7 @@ module Agents
 
     can_dry_run!
     can_order_created_events!
+    no_bulk_receive!
 
     default_schedule "every_12h"
 

+ 1 - 0
app/models/agents/witai_agent.rb

@@ -1,6 +1,7 @@
 module Agents
   class WitaiAgent < Agent
     cannot_be_scheduled!
+    no_bulk_receive!
 
     description <<-MD
       The `wit.ai` agent receives events, sends a text query to your `wit.ai` instance and generates outcome events.

+ 1 - 0
app/models/agents/wunderlist_agent.rb

@@ -5,6 +5,7 @@ module Agents
     valid_oauth_providers :wunderlist
 
     cannot_be_scheduled!
+    no_bulk_receive!
 
     gem_dependency_check { Devise.omniauth_providers.include?(:wunderlist) }
 

+ 2 - 2
spec/lib/huginn_scheduler_spec.rb

@@ -56,7 +56,7 @@ describe HuginnScheduler do
     end
   end
 
-  describe "cleanup_failed_jobs!", focus: true do
+  describe "cleanup_failed_jobs!" do
     before do
       3.times do |i|
         Delayed::Job.create(failed_at: Time.now - i.minutes)
@@ -64,7 +64,7 @@ describe HuginnScheduler do
       @keep = Delayed::Job.order(:failed_at)[1]
     end
 
-    it "work with set FAILED_JOBS_TO_KEEP env variable", focus: true do
+    it "work with set FAILED_JOBS_TO_KEEP env variable" do
       expect { @scheduler.send(:cleanup_failed_jobs!) }.to change(Delayed::Job, :count).by(-1)
       expect { @scheduler.send(:cleanup_failed_jobs!) }.to change(Delayed::Job, :count).by(0)
       expect(@keep.id).to eq(Delayed::Job.order(:failed_at)[0].id)

+ 8 - 0
spec/models/agent_spec.rb

@@ -295,6 +295,14 @@ describe Agent do
         Agent.receive!
       end
 
+      it "should call receive for each event when no_bulk_receive! is used" do
+        mock.any_instance_of(Agents::TriggerAgent).receive(anything).twice
+        stub(Agents::TriggerAgent).no_bulk_receive? { true }
+        Agent.async_check(agents(:bob_weather_agent).id)
+        Agent.async_check(agents(:bob_weather_agent).id)
+        Agent.receive!
+      end
+
       it "should ignore events that were created before a particular Link" do
         agent2 = Agents::SomethingSource.new(:name => "something")
         agent2.user = users(:bob)

+ 7 - 0
spec/rails_helper.rb

@@ -50,6 +50,13 @@ RSpec.configure do |config|
   # rspec-rails.
   config.infer_base_class_for_anonymous_controllers = false
 
+  # These two settings work together to allow you to limit a spec run
+  # to individual examples or groups you care about by tagging them with
+  # `:focus` metadata. When nothing is tagged with `:focus`, all examples
+  # get run.
+  config.filter_run :focus
+  config.run_all_when_everything_filtered = true
+
   # Run specs in random order to surface order dependencies. If you find an
   # order dependency and want to debug it, you can fix the order by providing
   # the seed, which is printed after each run.

+ 1 - 1
spec/support/shared_examples/liquid_interpolatable.rb

@@ -30,7 +30,7 @@ shared_examples_for LiquidInterpolatable do
       })
     end
 
-    it "should work with arrays", focus: true do
+    it "should work with arrays" do
       @checker.options = {"value" => ["{{variable}}", "Much array", "Hey, {{hello_world}}"]}
       expect(@checker.interpolate_options(@checker.options, @event)).to eq({
         "value" => ["hello", "Much array", "Hey, Hello world"]