Browse Source

Merge pull request #819 from dsander/agent-runner

Add AgentRunner and LongRunnable to support long running agents
Dominik Sander 9 years ago
parent
commit
06b022f161

+ 124 - 0
app/concerns/long_runnable.rb

@@ -0,0 +1,124 @@
+=begin
+Usage Example:
+
+class Agents::ExampleAgent < Agent
+  include LongRunnable
+
+  # Optional
+  #   Override this method if you need to group multiple agents based on an API key,
+  #   or server they connect to.
+  #   Have a look at the TwitterStreamAgent for an example.
+  def self.setup_worker; end
+
+  class Worker < LongRunnable::Worker
+    # Optional
+    #   Called after initialization of the Worker class, use this method as an initializer.
+    def setup; end
+
+    # Required
+    #  Put your agent logic in here, it must not return. If it does your agent will be restarted.
+    def run; end
+
+    # Optional
+    #   Use this method the gracefully stop your agent but make sure the run method return, or
+    #   terminate the thread.
+    def stop; end
+  end
+end
+=end
+module LongRunnable
+  extend ActiveSupport::Concern
+
+  included do |base|
+    AgentRunner.register(base)
+  end
+
+  def start_worker?
+    true
+  end
+
+  def worker_id(config = nil)
+    "#{self.class.to_s}-#{id}-#{Digest::SHA1.hexdigest((config.presence || options).to_json)}"
+  end
+
+  module ClassMethods
+    def setup_worker
+      active.map do |agent|
+        next unless agent.start_worker?
+        self::Worker.new(id: agent.worker_id, agent: agent)
+      end.compact
+    end
+  end
+
+  class Worker
+    attr_reader :thread, :id, :agent, :config, :mutex, :scheduler
+
+    def initialize(options = {})
+      @id = options[:id]
+      @agent = options[:agent]
+      @config = options[:config]
+    end
+
+    def run
+      raise StandardError, 'Override LongRunnable::Worker#run in your agent Worker subclass.'
+    end
+
+    def run!
+      @thread = Thread.new do
+        begin
+          run
+        rescue SignalException, SystemExit
+          stop!
+        rescue StandardError => e
+          message = "#{id} Exception #{e.message}:\n#{e.backtrace.first(10).join("\n")}"
+          AgentRunner.with_connection do
+            agent.error(message)
+          end
+        end
+      end
+    end
+
+    def setup!(scheduler, mutex)
+      @scheduler = scheduler
+      @mutex = mutex
+      setup if respond_to?(:setup)
+    end
+
+    def stop!
+      @scheduler.jobs(tag: id).each(&:unschedule)
+
+      if respond_to?(:stop)
+        stop
+      else
+        thread.terminate
+      end
+    end
+
+    def restart!
+      stop!
+      setup!(scheduler, mutex)
+      run!
+    end
+
+    def every(*args, &blk)
+      schedule(:every, args, &blk)
+    end
+
+    def cron(*args, &blk)
+      schedule(:cron, args, &blk)
+    end
+
+    def schedule_in(*args, &blk)
+      schedule(:schedule_in, args, &blk)
+    end
+
+    def boolify(value)
+      agent.send(:boolify, value)
+    end
+
+    private
+    def schedule(method, args, &blk)
+      @scheduler.send(method, *args, tag: id, &blk)
+    end
+  end
+end

+ 85 - 1
app/models/agents/jabber_agent.rb

@@ -1,7 +1,9 @@
 module Agents
   class JabberAgent < Agent
+    include LongRunnable
+    include FormConfigurable
+
     cannot_be_scheduled!
-    cannot_create_events!
 
     gem_dependency_check { defined?(Jabber) }
 
@@ -16,9 +18,22 @@ module Agents
       can contain any keys found in the source's payload, escaped using double curly braces.
       ex: `"News Story: {{title}}: {{url}}"`
 
+      When `connect_to_receiver` is set to true, the JabberAgent will emit an event for every message it receives.
+
       Have a look at the [Wiki](https://github.com/cantino/huginn/wiki/Formatting-Events-using-Liquid) to learn more about liquid templating.
     MD
 
+    event_description <<-MD
+      `event` will be set to either `on_join`, `on_leave`, `on_message`, `on_room_message` or `on_subject`
+
+          {
+            "event": "on_message",
+            "time": null,
+            "nick": "Dominik Sander",
+            "message": "Hello from huginn."
+          }
+    MD
+
     def default_options
       {
         'jabber_server'   => '127.0.0.1',
@@ -31,6 +46,15 @@ module Agents
       }
     end
 
+    form_configurable :jabber_server
+    form_configurable :jabber_port
+    form_configurable :jabber_sender
+    form_configurable :jabber_receiver
+    form_configurable :jabber_password
+    form_configurable :message, type: :text
+    form_configurable :connect_to_receiver, type: :boolean
+    form_configurable :expected_receive_period_in_days
+
     def working?
       last_receive_at && last_receive_at > interpolated['expected_receive_period_in_days'].to_i.days.ago && !recent_error_logs?
     end
@@ -50,6 +74,10 @@ module Agents
       client.send Jabber::Message::new(interpolated['jabber_receiver'], text).set_type(:chat)
     end
 
+    def start_worker?
+      boolify(interpolated[:connect_to_receiver])
+    end
+
     private
 
     def client
@@ -66,5 +94,61 @@ module Agents
     def body(event)
       interpolated(event)['message']
     end
+
+    class Worker < LongRunnable::Worker
+      IGNORE_MESSAGES_FOR=5
+
+      def setup
+        require 'xmpp4r/muc/helper/simplemucclient'
+      end
+
+      def run
+        @started_at = Time.now
+        @client = client
+        muc = Jabber::MUC::SimpleMUCClient.new(@client)
+
+        [:on_join, :on_leave, :on_message, :on_room_message, :on_subject].each do |event|
+          muc.__send__(event) do |*args|
+            message_handler(event, args)
+          end
+        end
+
+        muc.join(agent.interpolated['jabber_receiver'])
+
+        sleep(1) while @client.is_connected?
+      end
+
+      def message_handler(event, args)
+        return if Time.now - @started_at < IGNORE_MESSAGES_FOR
+
+        time, nick, message = normalize_args(event, args)
+
+        AgentRunner.with_connection do
+          agent.create_event(payload: {event: event, time: time, nick: nick, message: message})
+        end
+      end
+
+      def stop
+        @client.close
+        @client.stop
+        thread.terminate
+      end
+
+      def client
+        agent.send(:client)
+      end
+
+      private
+      def normalize_args(event, args)
+        case event
+        when :on_join, :on_leave
+          [args[0], args[1]]
+        when :on_message, :on_subject
+          args
+        when :on_room_message
+          [args[0], nil, args[1]]
+        end
+      end
+    end
   end
 end

+ 121 - 0
app/models/agents/twitter_stream_agent.rb

@@ -1,6 +1,7 @@
 module Agents
   class TwitterStreamAgent < Agent
     include TwitterConcern
+    include LongRunnable
 
     cannot_receive_events!
 
@@ -122,5 +123,125 @@ module Agents
         end
       end
     end
+
+    def self.setup_worker
+      if Agents::TwitterStreamAgent.dependencies_missing?
+        STDERR.puts Agents::TwitterStreamAgent.twitter_dependencies_missing
+        STDERR.flush
+        return false
+      end
+
+      Agents::TwitterStreamAgent.active.group_by { |agent| agent.twitter_oauth_token }.map do |oauth_token, agents|
+        filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m }
+
+        agents.each do |agent|
+          agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter|
+            filter_to_agent_map[filter] << agent
+          end
+        end
+
+        config_hash = filter_to_agent_map.map { |k, v| [k, v.map(&:id)] }
+        config_hash.push(oauth_token)
+
+        Worker.new(id: agents.first.worker_id(config_hash),
+                   config: {filter_to_agent_map: filter_to_agent_map},
+                   agent: agents.first)
+      end
+    end
+
+    class Worker < LongRunnable::Worker
+      RELOAD_TIMEOUT = 60.minutes
+      DUPLICATE_DETECTION_LENGTH = 1000
+      SEPARATOR = /[^\w_\-]+/
+
+      def setup
+        require 'twitter/json_stream'
+        @filter_to_agent_map = @config[:filter_to_agent_map]
+
+        schedule_in RELOAD_TIMEOUT do
+          puts "--> Restarting TwitterStream #{id}"
+          restart!
+        end
+      end
+
+      def run
+        @recent_tweets = []
+        EventMachine.run do
+          stream!(@filter_to_agent_map.keys, @agent) do |status|
+            handle_status(status)
+          end
+        end
+        Thread.stop
+      end
+
+      def stop
+        EventMachine.stop_event_loop if EventMachine.reactor_running?
+        thread.terminate
+      end
+
+      private
+      def stream!(filters, agent, &block)
+        filters = filters.map(&:downcase).uniq
+
+        stream = Twitter::JSONStream.connect(
+          :path    => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}",
+          :ssl     => true,
+          :oauth   => {
+            :consumer_key    => agent.twitter_consumer_key,
+            :consumer_secret => agent.twitter_consumer_secret,
+            :access_key      => agent.twitter_oauth_token,
+            :access_secret   => agent.twitter_oauth_token_secret
+          }
+        )
+
+        stream.each_item do |status|
+          block.call(status)
+        end
+
+        stream.on_error do |message|
+          STDERR.puts " --> Twitter error: #{message} <--"
+        end
+
+        stream.on_no_data do |message|
+          STDERR.puts " --> Got no data for awhile; trying to reconnect."
+          restart!
+        end
+
+        stream.on_max_reconnects do |timeout, retries|
+          STDERR.puts " --> Oops, tried too many times! <--"
+          sleep 60
+          restart!
+        end
+      end
+
+      def handle_status(status)
+        status = JSON.parse(status) if status.is_a?(String)
+        return unless status
+        return if status.has_key?('delete')
+        return unless status['text']
+        status['text'] = status['text'].gsub(/&lt;/, "<").gsub(/&gt;/, ">").gsub(/[\t\n\r]/, '  ')
+
+        if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash)
+          puts "Skipping retweet: #{status["text"]}"
+          return
+        elsif @recent_tweets.include?(status["id_str"])
+          puts "Skipping duplicate tweet: #{status["text"]}"
+          return
+        end
+
+        @recent_tweets << status["id_str"]
+        @recent_tweets.shift if @recent_tweets.length > DUPLICATE_DETECTION_LENGTH
+        puts status["text"]
+        @filter_to_agent_map.keys.each do |filter|
+          next unless (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
+          @filter_to_agent_map[filter].each do |agent|
+            puts " -> #{agent.name}"
+            AgentRunner.with_connection do
+              agent.process_tweet(filter, status)
+            end
+          end
+        end
+      end
+    end
   end
 end

+ 9 - 0
bin/agent_runner.rb

@@ -0,0 +1,9 @@
+#!/usr/bin/env ruby
+
+# This process is used to maintain Huginn's upkeep behavior, automatically running scheduled Agents and
+# periodically propagating and expiring Events. It also running TwitterStreamAgents and Agents that support long running
+# background jobs.
+
+require_relative './pre_runner_boot'
+
+AgentRunner.new(except: DelayedJobWorker).run

+ 13 - 0
bin/pre_runner_boot.rb

@@ -0,0 +1,13 @@
+unless defined?(Rails)
+  puts
+  puts "Please run me with rails runner, for example:"
+  puts "  RAILS_ENV=production bundle exec rails runner bin/#{File.basename($0)}"
+  puts
+  exit 1
+end
+
+Rails.configuration.cache_classes = true
+
+Dotenv.load if ENV['APP_SECRET_TOKEN'].blank?
+
+require 'agent_runner'

+ 2 - 9
bin/schedule.rb

@@ -3,13 +3,6 @@
 # This process is used to maintain Huginn's upkeep behavior, automatically running scheduled Agents and
 # periodically propagating and expiring Events.  It's typically run via foreman and the included Procfile.
 
-unless defined?(Rails)
-  puts
-  puts "Please run me with rails runner, for example:"
-  puts "  RAILS_ENV=production bundle exec rails runner bin/schedule.rb"
-  puts
-  exit 1
-end
+require_relative './pre_runner_boot'
 
-scheduler = HuginnScheduler.new(frequency: ENV['SCHEDULER_FREQUENCY'].presence || 0.3)
-scheduler.run!
+AgentRunner.new(only: HuginnScheduler).run

+ 7 - 59
bin/threaded.rb

@@ -1,65 +1,13 @@
-require 'thread'
-require 'huginn_scheduler'
-require 'twitter_stream'
+#!/usr/bin/env ruby
 
-Rails.configuration.cache_classes = true
+require_relative './pre_runner_boot'
 
-STDOUT.sync = true
-STDERR.sync = true
-
-def stop
-  puts 'Exiting...'
-  @scheduler.stop
-  @dj.stop
-  @stream.stop
-end
-
-def safely(&block)
-  begin
-    yield block
-  rescue StandardError => e
-    STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n"
-    STDERR.puts "Terminating myself ..."
-    STDERR.flush
-    stop
-  end
-end
-
-threads = []
-threads << Thread.new do
-  safely do
-    @stream = TwitterStream.new
-    @stream.run
-    puts "Twitter stream stopped ..."
-  end
-end
-
-threads << Thread.new do
-  safely do
-    @scheduler = HuginnScheduler.new(frequency: ENV['SCHEDULER_FREQUENCY'].presence || 0.3)
-    @scheduler.run!
-    puts "Scheduler stopped ..."
-  end
-end
-
-threads << Thread.new do
-  safely do
-    require 'delayed/command'
-    @dj = Delayed::Worker.new
-    @dj.start
-    puts "Delayed job stopped ..."
-  end
-end
+agent_runner = AgentRunner.new
 
 # We need to wait a bit to let delayed_job set it's traps so we can override them
-sleep 0.5
-
-trap('TERM') do
-  stop
-end
-
-trap('INT') do
-  stop
+Thread.new do
+  sleep 5
+  agent_runner.set_traps
 end
 
-threads.collect { |t| t.join }
+agent_runner.run

+ 2 - 8
bin/twitter_stream.rb

@@ -4,12 +4,6 @@
 # new or changed TwitterStreamAgents and starts to follow the stream for them.  It is typically run by foreman via
 # the included Procfile.
 
-unless defined?(Rails)
-  puts
-  puts "Please run me with rails runner, for example:"
-  puts "  RAILS_ENV=production bundle exec rails runner bin/twitter_stream.rb"
-  puts
-  exit 1
-end
+require_relative './pre_runner_boot'
 
-TwitterStream.new.run
+AgentRunner.new(only: Agents::TwitterStreamAgent).run

+ 121 - 0
lib/agent_runner.rb

@@ -0,0 +1,121 @@
+require 'cgi'
+require 'json'
+require 'rufus-scheduler'
+require 'pp'
+require 'twitter'
+
+class AgentRunner
+  @@agents = []
+
+  def initialize(options = {})
+    @workers = {}
+    @signal_queue = []
+    @options = options
+    @options[:only] = [@options[:only]].flatten if @options[:only]
+    @options[:except] = [@options[:except]].flatten if @options[:except]
+    @mutex = Mutex.new
+    @scheduler = Rufus::Scheduler.new(frequency: ENV['SCHEDULER_FREQUENCY'].presence || 0.3)
+
+    @scheduler.every 5 do
+      restart_dead_workers if @running
+    end
+
+    @scheduler.every 60 do
+      run_workers if @running
+    end
+
+    set_traps
+  end
+
+  def stop
+    puts "Stopping AgentRunner..."
+    @running = false
+    @workers.each_pair do |_, w| w.stop! end
+    @scheduler.stop
+  end
+
+  def run
+    @running = true
+    run_workers
+
+    while @running
+      if signal = @signal_queue.shift
+        handle_signal(signal)
+      end
+      sleep 0.25
+    end
+    @scheduler.join
+  end
+
+  def set_traps
+    %w(INT TERM QUIT).each do |signal|
+      Signal.trap(signal) { @signal_queue << signal }
+    end
+  end
+
+  def self.register(agent)
+    @@agents << agent unless @@agents.include?(agent)
+  end
+
+  def self.with_connection
+    ActiveRecord::Base.connection_pool.with_connection do
+      yield
+    end
+  end
+
+  private
+  def run_workers
+    workers             = load_workers
+    new_worker_ids      = workers.keys
+    current_worker_ids  = @workers.keys
+
+    (current_worker_ids - new_worker_ids).each do |outdated_worker_id|
+      puts "Killing #{outdated_worker_id}"
+      @workers[outdated_worker_id].stop!
+      @workers.delete(outdated_worker_id)
+    end
+
+    (new_worker_ids - current_worker_ids).each do |new_worker_id|
+      puts "Starting #{new_worker_id}"
+      @workers[new_worker_id] = workers[new_worker_id]
+      @workers[new_worker_id].setup!(@scheduler, @mutex)
+      @workers[new_worker_id].run!
+    end
+  end
+
+  def load_workers
+    workers = {}
+    @@agents.each do |klass|
+      next if @options[:only] && !@options[:only].include?(klass)
+      next if @options[:except] && @options[:except].include?(klass)
+
+      AgentRunner.with_connection do
+        (klass.setup_worker || [])
+      end.each do |agent_worker|
+        workers[agent_worker.id] = agent_worker
+      end
+    end
+    workers
+  end
+
+  def restart_dead_workers
+    @workers.each_pair do |id, worker|
+      if worker.thread && !worker.thread.alive?
+        puts "Restarting #{id.to_s}"
+        @workers[id].run!
+      end
+    end
+  end
+
+  def handle_signal(signal)
+    case signal
+    when 'INT', 'TERM', 'QUIT'
+      stop
+    end
+  end
+end
+
+require 'agents/twitter_stream_agent'
+require 'agents/jabber_agent'
+require 'huginn_scheduler'
+require 'delayed_job_worker'

+ 16 - 0
lib/delayed_job_worker.rb

@@ -0,0 +1,16 @@
+class DelayedJobWorker < LongRunnable::Worker
+  include LongRunnable
+
+  def run
+    @dj = Delayed::Worker.new
+    @dj.start
+  end
+
+  def stop
+    @dj.stop
+  end
+
+  def self.setup_worker
+    [new(id: self.to_s)]
+  end
+end

+ 20 - 22
lib/huginn_scheduler.rb

@@ -92,58 +92,56 @@ class Rufus::Scheduler
   end
 end
 
-class HuginnScheduler
-  FAILED_JOBS_TO_KEEP = 100
-  attr_accessor :mutex
-
-  def initialize(options = {})
-    @rufus_scheduler = Rufus::Scheduler.new(options)
-    self.mutex = Mutex.new
-  end
+class HuginnScheduler < LongRunnable::Worker
+  include LongRunnable
 
-  def stop
-    @rufus_scheduler.stop
-  end
+  FAILED_JOBS_TO_KEEP = 100
 
-  def run!
+  def setup
     tzinfo_friendly_timezone = ActiveSupport::TimeZone::MAPPING[ENV['TIMEZONE'].presence || "Pacific Time (US & Canada)"]
 
     # Schedule event propagation.
-    @rufus_scheduler.every '1m' do
+    every '1m' do
       propagate!
     end
 
     # Schedule event cleanup.
-    @rufus_scheduler.every ENV['EVENT_EXPIRATION_CHECK'].presence || '6h' do
+    every ENV['EVENT_EXPIRATION_CHECK'].presence || '6h' do
       cleanup_expired_events!
     end
 
     # Schedule failed job cleanup.
-    @rufus_scheduler.every '1h' do
+    every '1h' do
       cleanup_failed_jobs!
     end
 
     # Schedule repeating events.
     %w[1m 2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule|
-      @rufus_scheduler.every schedule do
+      every schedule do
         run_schedule "every_#{schedule}"
       end
     end
 
     # Schedule events for specific times.
     24.times do |hour|
-      @rufus_scheduler.cron "0 #{hour} * * * " + tzinfo_friendly_timezone do
+      cron "0 #{hour} * * * " + tzinfo_friendly_timezone do
         run_schedule hour_to_schedule_name(hour)
       end
     end
 
     # Schedule Scheduler Agents
 
-    @rufus_scheduler.every '1m' do
-      @rufus_scheduler.schedule_scheduler_agents
+    every '1m' do
+      @scheduler.schedule_scheduler_agents
     end
+  end
+
+  def run
+    @scheduler.join
+  end
 
-    @rufus_scheduler.join
+  def self.setup_worker
+    [new(id: self.to_s)]
   end
 
   private
@@ -187,8 +185,8 @@ class HuginnScheduler
   end
 
   def with_mutex
-    ActiveRecord::Base.connection_pool.with_connection do
-      mutex.synchronize do
+    mutex.synchronize do
+      ActiveRecord::Base.connection_pool.with_connection do
         yield
       end
     end

+ 0 - 134
lib/twitter_stream.rb

@@ -1,134 +0,0 @@
-require 'cgi'
-require 'json'
-require 'em-http-request'
-require 'pp'
-
-class TwitterStream
-  def initialize
-    @running = true
-  end
-
-  def stop
-    @running = false
-  end
-
-  def stream!(filters, agent, &block)
-    filters = filters.map(&:downcase).uniq
-
-    stream = Twitter::JSONStream.connect(
-      :path    => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}",
-      :ssl     => true,
-      :oauth   => {
-        :consumer_key    => agent.twitter_consumer_key,
-        :consumer_secret => agent.twitter_consumer_secret,
-        :access_key      => agent.twitter_oauth_token,
-        :access_secret   => agent.twitter_oauth_token_secret
-      }
-    )
-
-    stream.each_item do |status|
-      status = JSON.parse(status) if status.is_a?(String)
-      next unless status
-      next if status.has_key?('delete')
-      next unless status['text']
-      status['text'] = status['text'].gsub(/&lt;/, "<").gsub(/&gt;/, ">").gsub(/[\t\n\r]/, '  ')
-      block.call(status)
-    end
-
-    stream.on_error do |message|
-      STDERR.puts " --> Twitter error: #{message} <--"
-    end
-
-    stream.on_no_data do |message|
-      STDERR.puts " --> Got no data for awhile; trying to reconnect."
-      EventMachine::stop_event_loop
-    end
-
-    stream.on_max_reconnects do |timeout, retries|
-      STDERR.puts " --> Oops, tried too many times! <--"
-      EventMachine::stop_event_loop
-    end
-  end
-
-  def load_and_run(agents)
-    agents.group_by { |agent| agent.twitter_oauth_token }.each do |oauth_token, agents|
-      filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m }
-
-      agents.each do |agent|
-        agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter|
-          filter_to_agent_map[filter] << agent
-        end
-      end
-
-      recent_tweets = []
-
-      stream!(filter_to_agent_map.keys, agents.first) do |status|
-        if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash)
-          puts "Skipping retweet: #{status["text"]}"
-        elsif recent_tweets.include?(status["id_str"])
-          puts "Skipping duplicate tweet: #{status["text"]}"
-        else
-          recent_tweets << status["id_str"]
-          recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH
-          puts status["text"]
-          filter_to_agent_map.keys.each do |filter|
-            if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
-              filter_to_agent_map[filter].each do |agent|
-                puts " -> #{agent.name}"
-                agent.process_tweet(filter, status)
-              end
-            end
-          end
-        end
-      end
-    end
-  end
-
-  RELOAD_TIMEOUT = 10.minutes
-  DUPLICATE_DETECTION_LENGTH = 1000
-  SEPARATOR = /[^\w_\-]+/
-
-  def run
-    if Agents::TwitterStreamAgent.dependencies_missing?
-      STDERR.puts Agents::TwitterStreamAgent.twitter_dependencies_missing
-      STDERR.flush
-      return
-    end
-
-    require 'twitter/json_stream'
-
-    while @running
-      begin
-        agents = Agents::TwitterStreamAgent.active.all
-
-        EventMachine::run do
-          EventMachine.add_periodic_timer(1) {
-            EventMachine::stop_event_loop if !@running
-          }
-
-          EventMachine.add_periodic_timer(RELOAD_TIMEOUT) {
-            puts "Reloading EventMachine and all Agents..."
-            EventMachine::stop_event_loop
-          }
-
-          if agents.length == 0
-            puts "No agents found.  Will look again in a minute."
-            EventMachine.add_timer(60) {
-              EventMachine::stop_event_loop
-            }
-          else
-            puts "Found #{agents.length} agent(s).  Loading them now..."
-            load_and_run agents
-          end
-        end
-      rescue SignalException, SystemExit
-        @running = false
-        EventMachine::stop_event_loop if EventMachine.reactor_running?
-      rescue StandardError => e
-        STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n"
-        STDERR.puts "Waiting for a couple of minutes..."
-        sleep 120
-      end
-    end
-  end
-end

+ 114 - 0
spec/concerns/long_runnable_spec.rb

@@ -0,0 +1,114 @@
+require 'spec_helper'
+
+describe LongRunnable do
+  class LongRunnableAgent < Agent
+    include LongRunnable
+
+    def default_options
+      {test: 'test'}
+    end
+  end
+
+  before(:all) do
+    @agent = LongRunnableAgent.new
+  end
+
+  it "start_worker? defaults to true" do
+    expect(@agent.start_worker?).to be_truthy
+  end
+
+  it "should build the worker_id" do
+    expect(@agent.worker_id).to eq('LongRunnableAgent--bf21a9e8fbc5a3846fb05b4fa0859e0917b2202f')
+  end
+
+  context "#setup_worker" do
+    it "returns active agent workers" do
+      mock(LongRunnableAgent).active { [@agent] }
+      workers = LongRunnableAgent.setup_worker
+      expect(workers.length).to eq(1)
+      expect(workers.first).to be_a(LongRunnableAgent::Worker)
+      expect(workers.first.agent).to eq(@agent)
+    end
+
+    it "returns an empty array when no agent is active" do
+      mock(LongRunnableAgent).active { [] }
+      workers = LongRunnableAgent.setup_worker
+      expect(workers.length).to eq(0)
+    end
+  end
+
+  describe LongRunnable::Worker do
+    before(:each) do
+      @agent = Object.new
+      @worker = LongRunnable::Worker.new(agent: @agent, id: 'test1234')
+      @worker.setup!(Rufus::Scheduler.new, Mutex.new)
+    end
+
+    it "calls boolify of the agent" do
+      mock(@agent).boolify('true') { true }
+      expect(@worker.boolify('true')).to be_truthy
+    end
+
+    it "expects run to be overriden" do
+      expect { @worker.run }.to raise_error(StandardError)
+    end
+
+    context "#run!" do
+      it "runs the agent worker" do
+        mock(@worker).run
+        @worker.run!.join
+      end
+
+      it "stops when rescueing a SystemExit" do
+        mock(@worker).run { raise SystemExit }
+        mock(@worker).stop!
+        @worker.run!.join
+      end
+
+      it "creates an agent log entry for a generic exception" do
+        stub(STDERR).puts
+        mock(@worker).run { raise "woups" }
+        mock(@agent).error(/woups/)
+        @worker.run!.join
+      end
+    end
+
+    context "#stop!" do
+      it "terminates the thread" do
+        mock(@worker.thread).terminate
+        @worker.stop!
+      end
+
+      it "gracefully stops the worker" do
+        mock(@worker).stop
+        @worker.stop!
+      end
+    end
+
+    context "#restart!" do
+      it "stops, setups and starts the worker" do
+        mock(@worker).stop!
+        mock(@worker).setup!(@worker.scheduler, @worker.mutex)
+        mock(@worker).run!
+        @worker.restart!
+      end
+    end
+
+    context "scheduling" do
+      it "schedules tasks once" do
+        mock(@worker.scheduler).send(:schedule_in, 1.hour, tag: 'test1234')
+        @worker.schedule_in 1.hour do noop; end
+      end
+
+      it "schedules repeating tasks" do
+        mock(@worker.scheduler).send(:every, 1.hour, tag: 'test1234')
+        @worker.every 1.hour do noop; end
+      end
+
+      it "allows the cron syntax" do
+        mock(@worker.scheduler).send(:cron, '0 * * * *', tag: 'test1234')
+        @worker.cron '0 * * * *' do noop; end
+      end
+    end
+  end
+end

+ 102 - 0
spec/lib/agent_runner_spec.rb

@@ -0,0 +1,102 @@
+require 'spec_helper'
+
+describe AgentRunner do
+  context "without traps" do
+    before do
+      stub.instance_of(Rufus::Scheduler).every
+      stub.instance_of(AgentRunner).set_traps
+      @agent_runner = AgentRunner.new
+    end
+
+    context "#run" do
+      before do
+        mock(@agent_runner).run_workers
+        mock.instance_of(IO).puts('Stopping AgentRunner...')
+      end
+
+      it "runs until stop is called" do
+        mock.instance_of(Rufus::Scheduler).join
+        Thread.new { while @agent_runner.instance_variable_get(:@running) != false do sleep 0.1; @agent_runner.stop end }
+        @agent_runner.run
+      end
+
+      it "handles signals" do
+        @agent_runner.instance_variable_set(:@signal_queue, ['TERM'])
+        @agent_runner.run
+      end
+    end
+
+    context "#load_workers" do
+      before do
+        AgentRunner.class_variable_set(:@@agents, [HuginnScheduler, DelayedJobWorker])
+      end
+      it "loads all workers" do
+        workers = @agent_runner.send(:load_workers)
+        expect(workers).to be_a(Hash)
+        expect(workers.keys).to eq(['HuginnScheduler', 'DelayedJobWorker'])
+      end
+
+      it "loads only the workers specified in the :only option" do
+        @agent_runner = AgentRunner.new(only: HuginnScheduler)
+        workers = @agent_runner.send(:load_workers)
+        expect(workers.keys).to eq(['HuginnScheduler'])
+      end
+
+      it "does not load workers specified in the :except option" do
+        @agent_runner = AgentRunner.new(except: HuginnScheduler)
+        workers = @agent_runner.send(:load_workers)
+        expect(workers.keys).to eq(['DelayedJobWorker'])
+      end
+    end
+
+    context "running workers" do
+      before do
+        AgentRunner.class_variable_set(:@@agents, [HuginnScheduler, DelayedJobWorker])
+        stub.instance_of(IO).puts
+        stub.instance_of(LongRunnable::Worker).setup!
+      end
+
+      context "#run_workers" do
+
+        it "runs all the workers" do
+          mock.instance_of(HuginnScheduler).run!
+          mock.instance_of(DelayedJobWorker).run!
+          @agent_runner.send(:run_workers)
+        end
+
+        it "kills no long active workers" do
+          mock.instance_of(HuginnScheduler).run!
+          mock.instance_of(DelayedJobWorker).run!
+          @agent_runner.send(:run_workers)
+          AgentRunner.class_variable_set(:@@agents, [DelayedJobWorker])
+          mock.instance_of(HuginnScheduler).stop!
+          @agent_runner.send(:run_workers)
+        end
+      end
+
+      context "#restart_dead_workers" do
+        before do
+          mock.instance_of(HuginnScheduler).run!
+          mock.instance_of(DelayedJobWorker).run!
+          @agent_runner.send(:run_workers)
+
+        end
+        it "restarts dead workers" do
+          stub.instance_of(HuginnScheduler).thread { OpenStruct.new(alive?: false) }
+          mock.instance_of(HuginnScheduler).run!
+          @agent_runner.send(:restart_dead_workers)
+        end
+      end
+    end
+  end
+
+  context "#set_traps" do
+    it "sets traps for INT TERM and QUIT" do
+      agent_runner = AgentRunner.new
+      mock(Signal).trap('INT')
+      mock(Signal).trap('TERM')
+      mock(Signal).trap('QUIT')
+      agent_runner.set_traps
+    end
+  end
+end

+ 28 - 0
spec/lib/delayed_job_worker_spec.rb

@@ -0,0 +1,28 @@
+require 'spec_helper'
+
+describe DelayedJobWorker do
+  before do
+    @djw = DelayedJobWorker.new
+  end
+
+  it "should run" do
+    mock.instance_of(Delayed::Worker).start
+    @djw.run
+  end
+
+  it "should stop" do
+    mock.instance_of(Delayed::Worker).start
+    mock.instance_of(Delayed::Worker).stop
+    @djw.run
+    @djw.stop
+  end
+
+  context "#setup_worker" do
+    it "should return an array with an instance of itself" do
+      workers = DelayedJobWorker.setup_worker
+      expect(workers).to be_a(Array)
+      expect(workers.first).to be_a(DelayedJobWorker)
+      expect(workers.first.id).to eq('DelayedJobWorker')
+    end
+  end
+end

+ 15 - 7
spec/lib/huginn_scheduler_spec.rb

@@ -4,17 +4,16 @@ require 'huginn_scheduler'
 describe HuginnScheduler do
   before(:each) do
     @scheduler = HuginnScheduler.new
+    stub(@scheduler).setup {}
+    @scheduler.setup!(Rufus::Scheduler.new, Mutex.new)
     stub
   end
 
-  it "should stop the scheduler" do
-    mock.instance_of(Rufus::Scheduler).stop
-    @scheduler.stop
-  end
-
   it "schould register the schedules with the rufus scheduler and run" do
     mock.instance_of(Rufus::Scheduler).join
-    @scheduler.run!
+    scheduler = HuginnScheduler.new
+    scheduler.setup!(Rufus::Scheduler.new, Mutex.new)
+    scheduler.run
   end
 
   it "should run scheduled agents" do
@@ -53,7 +52,7 @@ describe HuginnScheduler do
     end
   end
 
-  describe "cleanup_failed_jobs!" do
+  describe "cleanup_failed_jobs!", focus: true do
     before do
       3.times do |i|
         Delayed::Job.create(failed_at: Time.now - i.minutes)
@@ -75,6 +74,15 @@ describe HuginnScheduler do
       ENV['FAILED_JOBS_TO_KEEP'] = old
     end
   end
+
+  context "#setup_worker" do
+    it "should return an array with an instance of itself" do
+      workers = HuginnScheduler.setup_worker
+      expect(workers).to be_a(Array)
+      expect(workers.first).to be_a(HuginnScheduler)
+      expect(workers.first.id).to eq('HuginnScheduler')
+    end
+  end
 end
 
 describe Rufus::Scheduler do

+ 73 - 0
spec/models/agents/jabber_agent_spec.rb

@@ -44,6 +44,17 @@ describe Agents::JabberAgent do
     end
   end
 
+  context "#start_worker?" do
+    it "starts when connect_to_receiver is truthy" do
+      agent.options[:connect_to_receiver] = 'true'
+      expect(agent.start_worker?).to be_truthy
+    end
+
+    it "does not starts when connect_to_receiver is not truthy" do
+      expect(agent.start_worker?).to be_falsy
+    end
+  end
+
   describe "validation" do
     before do
       expect(agent).to be_valid
@@ -78,4 +89,66 @@ describe Agents::JabberAgent do
                        'Warning! Another Weather Alert! - http://www.weather.com/we-are-screwed'])
     end
   end
+
+  describe Agents::JabberAgent::Worker do
+    before(:each) do
+      @worker = Agents::JabberAgent::Worker.new(agent: agent)
+      @worker.setup
+      stub.any_instance_of(Jabber::Client).connect
+      stub.any_instance_of(Jabber::Client).auth
+    end
+
+    it "runs" do
+      agent.options[:jabber_receiver] = 'someJID'
+      mock.any_instance_of(Jabber::MUC::SimpleMUCClient).join('someJID')
+      @worker.run
+    end
+
+    it "stops" do
+      @worker.instance_variable_set(:@client, @worker.client)
+      mock.any_instance_of(Jabber::Client).close
+      mock.any_instance_of(Jabber::Client).stop
+      mock(@worker).thread { mock!.terminate }
+      @worker.stop
+    end
+
+    context "#message_handler" do
+      it "it ignores messages for the first seconds" do
+        @worker.instance_variable_set(:@started_at, Time.now)
+        expect { @worker.message_handler(:on_message, [123456, 'nick', 'hello']) }
+          .to change { agent.events.count }.by(0)
+      end
+
+      it "creates events" do
+        @worker.instance_variable_set(:@started_at, Time.now - 10.seconds)
+        expect { @worker.message_handler(:on_message, [123456, 'nick', 'hello']) }
+          .to change { agent.events.count }.by(1)
+        event = agent.events.last
+        expect(event.payload).to eq({'event' => 'on_message', 'time' => 123456, 'nick' => 'nick', 'message' => 'hello'})
+      end
+    end
+
+    context "#normalize_args" do
+      it "handles :on_join and :on_leave" do
+        time, nick, message = @worker.send(:normalize_args, :on_join, [123456, 'nick'])
+        expect(time).to eq(123456)
+        expect(nick).to eq('nick')
+        expect(message).to be_nil
+      end
+
+      it "handles :on_message and :on_leave" do
+        time, nick, message = @worker.send(:normalize_args, :on_message, [123456, 'nick', 'hello'])
+        expect(time).to eq(123456)
+        expect(nick).to eq('nick')
+        expect(message).to eq('hello')
+      end
+
+      it "handles :on_room_message" do
+        time, nick, message = @worker.send(:normalize_args, :on_room_message, [123456, 'hello'])
+        expect(time).to eq(123456)
+        expect(nick).to be_nil
+        expect(message).to eq('hello')
+      end
+    end
+  end
 end

+ 145 - 0
spec/models/agents/twitter_stream_agent_spec.rb

@@ -125,4 +125,149 @@ describe Agents::TwitterStreamAgent do
       end
     end
   end
+
+  context "#setup_worker" do
+    it "ensures the dependencies are available" do
+      mock(STDERR).puts(Agents::TwitterStreamAgent.twitter_dependencies_missing)
+      mock(Agents::TwitterStreamAgent).dependencies_missing? { true }
+      expect(Agents::TwitterStreamAgent.setup_worker).to eq(false)
+    end
+
+    it "returns now workers if no agent is active" do
+      mock(Agents::TwitterStreamAgent).active { [] }
+      expect(Agents::TwitterStreamAgent.setup_worker).to eq([])
+    end
+
+    it "returns a worker for an active agent" do
+      mock(Agents::TwitterStreamAgent).active { [@agent] }
+      workers = Agents::TwitterStreamAgent.setup_worker
+      expect(workers).to be_a(Array)
+      expect(workers.length).to eq(1)
+      expect(workers.first).to be_a(Agents::TwitterStreamAgent::Worker)
+      filter_to_agent_map = workers.first.config[:filter_to_agent_map]
+      expect(filter_to_agent_map.keys).to eq(['keyword1', 'keyword2'])
+      expect(filter_to_agent_map.values).to eq([[@agent], [@agent]])
+    end
+
+    it "correctly maps keywords to agents" do
+      agent2 = @agent.dup
+      agent2.id = 123455
+      agent2.options[:filters] = ['agent2']
+      mock(Agents::TwitterStreamAgent).active { [@agent, agent2] }
+
+      workers = Agents::TwitterStreamAgent.setup_worker
+      filter_to_agent_map = workers.first.config[:filter_to_agent_map]
+      expect(filter_to_agent_map.keys).to eq(['keyword1', 'keyword2', 'agent2'])
+      expect(filter_to_agent_map['keyword1']).to eq([@agent])
+      expect(filter_to_agent_map['agent2']).to eq([agent2])
+    end
+  end
+
+  describe Agents::TwitterStreamAgent::Worker do
+    before(:each) do
+      @mock_agent = mock!
+      @config = {agent: @agent, config: {filter_to_agent_map: {'agent' => [@mock_agent]}}}
+      @worker = Agents::TwitterStreamAgent::Worker.new(@config)
+      @worker.instance_variable_set(:@recent_tweets, [])
+      mock(@worker).schedule_in(Agents::TwitterStreamAgent::Worker::RELOAD_TIMEOUT)
+      @worker.setup!(nil, Mutex.new)
+    end
+
+    context "#run" do
+      it "starts the stream" do
+        mock(EventMachine).run.yields
+        mock(@worker).stream!(['agent'], @agent)
+        mock(Thread).stop
+        @worker.run
+      end
+
+      it "yields received tweets" do
+        mock(EventMachine).run.yields
+        mock(@worker).stream!(['agent'], @agent).yields('status' => 'hello')
+        mock(@worker).handle_status('status' => 'hello')
+        mock(Thread).stop
+        @worker.run
+      end
+    end
+
+    context "#stop" do
+      it "stops the thread" do
+        mock(@worker.thread).terminate
+        @worker.stop
+      end
+    end
+
+    context "stream!" do
+      def stub_without(method = nil)
+        stream_stub = stub!
+        stream_stub.each_item if method != :each_item
+        stream_stub.on_error if method != :on_error
+        stream_stub.on_no_data if method != :on_no_data
+        stream_stub.on_max_reconnects if method != :on_max_reconnects
+        stub(Twitter::JSONStream).connect { stream_stub }
+        stream_stub
+      end
+
+      it "initializes Twitter::JSONStream" do
+        mock(Twitter::JSONStream).connect({:path=>"/1/statuses/filter.json?track=agent",
+                                           :ssl=>true, :oauth=>{:consumer_key=>"twitteroauthkey",
+                                           :consumer_secret=>"twitteroauthsecret",
+                                           :access_key=>"1234token",
+                                           :access_secret=>"56789secret"}
+                                          }) { stub_without }
+        @worker.send(:stream!, ['agent'], @agent)
+      end
+
+      context "callback handling" do
+        it "logs error messages" do
+          stub_without(:on_error).on_error.yields('woups')
+          mock(STDERR).puts(" --> Twitter error: woups <--")
+          @worker.send(:stream!, ['agent'], @agent)
+        end
+
+        it "stop when no data was received"do
+          stub_without(:on_no_data).on_no_data.yields
+          mock(@worker).restart!
+          mock(STDERR).puts(" --> Got no data for awhile; trying to reconnect.")
+          @worker.send(:stream!, ['agent'], @agent)
+        end
+
+        it "sleeps for 60 seconds on_max_reconnects" do
+          stub_without(:on_max_reconnects).on_max_reconnects.yields
+          mock(STDERR).puts(" --> Oops, tried too many times! <--")
+          mock(@worker).sleep(60)
+          mock(@worker).restart!
+          @worker.send(:stream!, ['agent'], @agent)
+        end
+
+        it "yields every status received" do
+          stub_without(:each_item).each_item.yields({'text' => 'hello'})
+          @worker.send(:stream!, ['agent'], @agent) do |status|
+            expect(status).to eq({'text' => 'hello'})
+          end
+        end
+      end
+    end
+
+    context "#handle_status" do
+      it "skips retweets" do
+        mock.instance_of(IO).puts('Skipping retweet: retweet')
+        @worker.send(:handle_status, {'text' => 'retweet', 'retweeted_status' => {one: true}})
+      end
+
+      it "deduplicates tweets" do
+        mock.instance_of(IO).puts("dup")
+        @worker.send(:handle_status, {'text' => 'dup', 'id_str' => 1})
+        mock.instance_of(IO).puts("Skipping duplicate tweet: dup")
+        @worker.send(:handle_status, {'text' => 'dup', 'id_str' => 1})
+      end
+
+      it "calls the agent to process the tweet" do
+        stub.instance_of(IO).puts
+        mock(@mock_agent).name { 'mock' }
+        mock(@mock_agent).process_tweet('agent', {'text' => 'agent'})
+        @worker.send(:handle_status, {'text' => 'agent'})
+      end
+    end
+  end
 end