long_runnable.rb 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. =begin
  2. Usage Example:
  3. class Agents::ExampleAgent < Agent
  4. include LongRunnable
  5. # Optional
  6. # Override this method if you need to group multiple agents based on an API key,
  7. # or server they connect to.
  8. # Have a look at the TwitterStreamAgent for an example.
  9. def self.setup_worker; end
  10. class Worker < LongRunnable::Worker
  11. # Optional
  12. # Called after initialization of the Worker class, use this method as an initializer.
  13. def setup; end
  14. # Required
  15. # Put your agent logic in here, it must not return. If it does your agent will be restarted.
  16. def run; end
  17. # Optional
  18. # Use this method the gracefully stop your agent but make sure the run method return, or
  19. # terminate the thread.
  20. def stop; end
  21. end
  22. end
  23. =end
  24. module LongRunnable
  25. extend ActiveSupport::Concern
  26. included do |base|
  27. AgentRunner.register(base)
  28. end
  29. def start_worker?
  30. true
  31. end
  32. def worker_id(config = nil)
  33. "#{self.class.to_s}-#{id}-#{Digest::SHA1.hexdigest((config.presence || options).to_json)}"
  34. end
  35. module ClassMethods
  36. def setup_worker
  37. active.map do |agent|
  38. next unless agent.start_worker?
  39. self::Worker.new(id: agent.worker_id, agent: agent)
  40. end.compact
  41. end
  42. end
  43. class Worker
  44. attr_reader :thread, :id, :agent, :config, :mutex, :scheduler
  45. def initialize(options = {})
  46. @id = options[:id]
  47. @agent = options[:agent]
  48. @config = options[:config]
  49. end
  50. def run
  51. raise StandardError, 'Override LongRunnable::Worker#run in your agent Worker subclass.'
  52. end
  53. def run!
  54. @thread = Thread.new do
  55. begin
  56. run
  57. rescue SignalException, SystemExit
  58. stop!
  59. rescue StandardError => e
  60. message = "#{id} Exception #{e.message}:\n#{e.backtrace.first(10).join("\n")}"
  61. AgentRunner.with_connection do
  62. agent.error(message)
  63. end
  64. end
  65. end
  66. end
  67. def setup!(scheduler, mutex)
  68. @scheduler = scheduler
  69. @mutex = mutex
  70. setup if respond_to?(:setup)
  71. end
  72. def stop!
  73. @scheduler.jobs(tag: id).each(&:unschedule)
  74. if respond_to?(:stop)
  75. stop
  76. else
  77. thread.terminate
  78. end
  79. end
  80. def restart!
  81. stop!
  82. setup!(scheduler, mutex)
  83. run!
  84. end
  85. def every(*args, &blk)
  86. schedule(:every, args, &blk)
  87. end
  88. def cron(*args, &blk)
  89. schedule(:cron, args, &blk)
  90. end
  91. def schedule_in(*args, &blk)
  92. schedule(:schedule_in, args, &blk)
  93. end
  94. def boolify(value)
  95. agent.send(:boolify, value)
  96. end
  97. private
  98. def schedule(method, args, &blk)
  99. @scheduler.send(method, *args, tag: id, &blk)
  100. end
  101. end
  102. end