long_runnable.rb 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. =begin
  2. Usage Example:
  3. class Agents::ExampleAgent < Agent
  4. include LongRunnable
  5. # Optional
  6. # Override this method if you need to group multiple agents based on an API key,
  7. # or server they connect to.
  8. # Have a look at the TwitterStreamAgent for an example.
  9. def self.setup_worker; end
  10. class Worker < LongRunnable::Worker
  11. # Optional
  12. # Called after initialization of the Worker class, use this method as an initializer.
  13. def setup; end
  14. # Required
  15. # Put your agent logic in here, it must not return. If it does your agent will be restarted.
  16. def run; end
  17. # Optional
  18. # Use this method the gracefully stop your agent but make sure the run method return, or
  19. # terminate the thread.
  20. def stop; end
  21. end
  22. end
  23. =end
  24. module LongRunnable
  25. extend ActiveSupport::Concern
  26. included do |base|
  27. AgentRunner.register(base)
  28. end
  29. def start_worker?
  30. true
  31. end
  32. def worker_id(config = nil)
  33. "#{self.class.to_s}-#{id}-#{Digest::SHA1.hexdigest((config.presence || options).to_json)}"
  34. end
  35. module ClassMethods
  36. def setup_worker
  37. active.map do |agent|
  38. next unless agent.start_worker?
  39. self::Worker.new(id: agent.worker_id, agent: agent)
  40. end.compact
  41. end
  42. end
  43. class Worker
  44. attr_reader :thread, :id, :agent, :config, :mutex, :scheduler, :restarting
  45. def initialize(options = {})
  46. @id = options[:id]
  47. @agent = options[:agent]
  48. @config = options[:config]
  49. @restarting = false
  50. end
  51. def run
  52. raise StandardError, 'Override LongRunnable::Worker#run in your agent Worker subclass.'
  53. end
  54. def run!
  55. @thread = Thread.new do
  56. Thread.current[:name] = "#{id}-#{Time.now}"
  57. begin
  58. run
  59. rescue SignalException, SystemExit
  60. stop!
  61. rescue StandardError => e
  62. message = "#{id} Exception #{e.message}:\n#{e.backtrace.first(10).join("\n")}"
  63. AgentRunner.with_connection do
  64. agent.error(message)
  65. end
  66. end
  67. end
  68. end
  69. def setup!(scheduler, mutex)
  70. @scheduler = scheduler
  71. @mutex = mutex
  72. setup if respond_to?(:setup)
  73. end
  74. def stop!
  75. @scheduler.jobs(tag: id).each(&:unschedule)
  76. if respond_to?(:stop)
  77. stop
  78. else
  79. terminate_thread!
  80. end
  81. end
  82. def terminate_thread!
  83. thread.terminate
  84. thread.wakeup if thread.status == 'sleep'
  85. end
  86. def restart!
  87. without_alive_check do
  88. stop!
  89. setup!(scheduler, mutex)
  90. run!
  91. end
  92. end
  93. def every(*args, &blk)
  94. schedule(:every, args, &blk)
  95. end
  96. def cron(*args, &blk)
  97. schedule(:cron, args, &blk)
  98. end
  99. def schedule_in(*args, &blk)
  100. schedule(:schedule_in, args, &blk)
  101. end
  102. def boolify(value)
  103. agent.send(:boolify, value)
  104. end
  105. private
  106. def schedule(method, args, &blk)
  107. @scheduler.send(method, *args, tag: id, &blk)
  108. end
  109. def without_alive_check(&blk)
  110. @restarting = true
  111. yield
  112. ensure
  113. @restarting = false
  114. end
  115. end
  116. end