agent_runner.rb 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. require 'cgi'
  2. require 'json'
  3. require 'rufus-scheduler'
  4. require 'pp'
  5. require 'twitter'
  6. class AgentRunner
  7. @@agents = []
  8. def initialize(options = {})
  9. @workers = {}
  10. @signal_queue = []
  11. @options = options
  12. @options[:only] = [@options[:only]].flatten if @options[:only]
  13. @options[:except] = [@options[:except]].flatten if @options[:except]
  14. @mutex = Mutex.new
  15. @scheduler = Rufus::Scheduler.new(frequency: ENV['SCHEDULER_FREQUENCY'].presence || 0.3)
  16. @scheduler.every 5 do
  17. restart_dead_workers if @running
  18. end
  19. @scheduler.every 60 do
  20. run_workers if @running
  21. end
  22. set_traps
  23. end
  24. def stop
  25. puts "Stopping AgentRunner..."
  26. @running = false
  27. @workers.each_pair do |_, w| w.stop! end
  28. @scheduler.stop
  29. end
  30. def run
  31. @running = true
  32. run_workers
  33. while @running
  34. if signal = @signal_queue.shift
  35. handle_signal(signal)
  36. end
  37. sleep 0.25
  38. end
  39. @scheduler.join
  40. end
  41. def set_traps
  42. %w(INT TERM QUIT).each do |signal|
  43. Signal.trap(signal) { @signal_queue << signal }
  44. end
  45. end
  46. def self.register(agent)
  47. @@agents << agent unless @@agents.include?(agent)
  48. end
  49. def self.with_connection
  50. ActiveRecord::Base.connection_pool.with_connection do
  51. yield
  52. end
  53. end
  54. private
  55. def run_workers
  56. workers = load_workers
  57. new_worker_ids = workers.keys
  58. current_worker_ids = @workers.keys
  59. (current_worker_ids - new_worker_ids).each do |outdated_worker_id|
  60. puts "Killing #{outdated_worker_id}"
  61. @workers[outdated_worker_id].stop!
  62. @workers.delete(outdated_worker_id)
  63. end
  64. (new_worker_ids - current_worker_ids).each do |new_worker_id|
  65. puts "Starting #{new_worker_id}"
  66. @workers[new_worker_id] = workers[new_worker_id]
  67. @workers[new_worker_id].setup!(@scheduler, @mutex)
  68. @workers[new_worker_id].run!
  69. end
  70. end
  71. def load_workers
  72. workers = {}
  73. @@agents.each do |klass|
  74. next if @options[:only] && !@options[:only].include?(klass)
  75. next if @options[:except] && @options[:except].include?(klass)
  76. AgentRunner.with_connection do
  77. (klass.setup_worker || [])
  78. end.each do |agent_worker|
  79. workers[agent_worker.id] = agent_worker
  80. end
  81. end
  82. workers
  83. end
  84. def restart_dead_workers
  85. @workers.each_pair do |id, worker|
  86. if worker.thread && !worker.thread.alive?
  87. puts "Restarting #{id.to_s}"
  88. @workers[id].run!
  89. end
  90. end
  91. end
  92. def handle_signal(signal)
  93. case signal
  94. when 'INT', 'TERM', 'QUIT'
  95. stop
  96. end
  97. end
  98. end
  99. require 'agents/twitter_stream_agent'
  100. require 'agents/jabber_agent'
  101. require 'huginn_scheduler'
  102. require 'delayed_job_worker'