long_runnable.rb 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. =begin
  2. Usage Example:
  3. class Agents::ExampleAgent < Agent
  4. include LongRunnable
  5. # Optional
  6. # Override this method if you need to group multiple agents based on an API key,
  7. # or server they connect to.
  8. # Have a look at the TwitterStreamAgent for an example.
  9. def self.setup_worker; end
  10. class Worker < LongRunnable::Worker
  11. # Optional
  12. # Called after initialization of the Worker class, use this method as an initializer.
  13. def setup; end
  14. # Required
  15. # Put your agent logic in here, it must not return. If it does your agent will be restarted.
  16. def run; end
  17. # Optional
  18. # Use this method the gracefully stop your agent but make sure the run method return, or
  19. # terminate the thread.
  20. def stop; end
  21. end
  22. end
  23. =end
  24. module LongRunnable
  25. extend ActiveSupport::Concern
  26. included do |base|
  27. AgentRunner.register(base)
  28. end
  29. def start_worker?
  30. true
  31. end
  32. def worker_id(config = nil)
  33. "#{self.class.to_s}-#{id}-#{Digest::SHA1.hexdigest((config.presence || options).to_json)}"
  34. end
  35. module ClassMethods
  36. def setup_worker
  37. active.map do |agent|
  38. next unless agent.start_worker?
  39. self::Worker.new(id: agent.worker_id, agent: agent)
  40. end.compact
  41. end
  42. end
  43. class Worker
  44. attr_reader :thread, :id, :agent, :config, :mutex, :scheduler, :restarting
  45. def initialize(options = {})
  46. @id = options[:id]
  47. @agent = options[:agent]
  48. @config = options[:config]
  49. @restarting = false
  50. end
  51. def run
  52. raise StandardError, 'Override LongRunnable::Worker#run in your agent Worker subclass.'
  53. end
  54. def run!
  55. @thread = Thread.new do
  56. Thread.current[:name] = "#{id}-#{Time.now}"
  57. begin
  58. run
  59. rescue SignalException, SystemExit
  60. stop!
  61. rescue StandardError => e
  62. message = "#{id} Exception #{e.message}:\n#{e.backtrace.first(10).join("\n")}"
  63. AgentRunner.with_connection do
  64. agent.error(message)
  65. end
  66. end
  67. end
  68. end
  69. def setup!(scheduler, mutex)
  70. @scheduler = scheduler
  71. @mutex = mutex
  72. setup if respond_to?(:setup)
  73. end
  74. def stop!
  75. @scheduler.jobs(tag: id).each(&:unschedule)
  76. if respond_to?(:stop)
  77. stop
  78. else
  79. terminate_thread!
  80. end
  81. end
  82. def terminate_thread!
  83. if thread
  84. thread.instance_eval { ActiveRecord::Base.connection_pool.release_connection }
  85. thread.wakeup if thread.status == 'sleep'
  86. thread.terminate
  87. end
  88. end
  89. def restart!
  90. without_alive_check do
  91. puts "--> Restarting #{id} at #{Time.now} <--"
  92. stop!
  93. setup!(scheduler, mutex)
  94. run!
  95. end
  96. end
  97. def every(*args, &blk)
  98. schedule(:every, args, &blk)
  99. end
  100. def cron(*args, &blk)
  101. schedule(:cron, args, &blk)
  102. end
  103. def schedule_in(*args, &blk)
  104. schedule(:schedule_in, args, &blk)
  105. end
  106. def boolify(value)
  107. agent.send(:boolify, value)
  108. end
  109. private
  110. def schedule(method, args, &blk)
  111. @scheduler.send(method, *args, tag: id, &blk)
  112. end
  113. def without_alive_check(&blk)
  114. @restarting = true
  115. yield
  116. ensure
  117. @restarting = false
  118. end
  119. end
  120. end