123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- require 'cgi'
- require 'json'
- require 'rufus-scheduler'
- require 'pp'
- require 'twitter'
- class AgentRunner
- @@agents = []
- def initialize(options = {})
- @workers = {}
- @signal_queue = []
- @options = options
- @options[:only] = [@options[:only]].flatten if @options[:only]
- @options[:except] = [@options[:except]].flatten if @options[:except]
- @mutex = Mutex.new
- @scheduler = Rufus::Scheduler.new(frequency: ENV['SCHEDULER_FREQUENCY'].presence || 0.3)
- @scheduler.every 5 do
- restart_dead_workers if @running
- end
- @scheduler.every 60 do
- run_workers if @running
- end
- set_traps
- end
- def stop
- puts "Stopping AgentRunner..." unless Rails.env.test?
- @running = false
- @workers.each_pair do |_, w| w.stop! end
- @scheduler.stop
- end
- def run
- @running = true
- run_workers
- while @running
- if signal = @signal_queue.shift
- handle_signal(signal)
- end
- sleep 0.25
- end
- @scheduler.join
- end
- def set_traps
- %w(INT TERM QUIT).each do |signal|
- Signal.trap(signal) { @signal_queue << signal }
- end
- end
- def self.register(agent)
- @@agents << agent unless @@agents.include?(agent)
- end
- def self.with_connection
- ActiveRecord::Base.connection_pool.with_connection do
- yield
- end
- end
- private
- def run_workers
- workers = load_workers
- new_worker_ids = workers.keys
- current_worker_ids = @workers.keys
- (current_worker_ids - new_worker_ids).each do |outdated_worker_id|
- puts "Killing #{outdated_worker_id}" unless Rails.env.test?
- @workers[outdated_worker_id].stop!
- @workers.delete(outdated_worker_id)
- end
- (new_worker_ids - current_worker_ids).each do |new_worker_id|
- puts "Starting #{new_worker_id}" unless Rails.env.test?
- @workers[new_worker_id] = workers[new_worker_id]
- @workers[new_worker_id].setup!(@scheduler, @mutex)
- @workers[new_worker_id].run!
- end
- end
- def load_workers
- workers = {}
- @@agents.each do |klass|
- next if @options[:only] && !@options[:only].include?(klass)
- next if @options[:except] && @options[:except].include?(klass)
- AgentRunner.with_connection do
- (klass.setup_worker || [])
- end.each do |agent_worker|
- workers[agent_worker.id] = agent_worker
- end
- end
- workers
- end
- def restart_dead_workers
- @workers.each_pair do |id, worker|
- if !worker.restarting && worker.thread && !worker.thread.alive?
- puts "Restarting #{id.to_s}" unless Rails.env.test?
- @workers[id].run!
- end
- end
- end
- def handle_signal(signal)
- case signal
- when 'INT', 'TERM', 'QUIT'
- stop
- end
- end
- end
- require 'agents/twitter_stream_agent'
- require 'agents/jabber_agent'
- require 'agents/local_file_agent'
- require 'huginn_scheduler'
- require 'delayed_job_worker'
|