123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- =begin
- Usage Example:
- class Agents::ExampleAgent < Agent
- include LongRunnable
- # Optional
- # Override this method if you need to group multiple agents based on an API key,
- # or server they connect to.
- # Have a look at the TwitterStreamAgent for an example.
- def self.setup_worker; end
- class Worker < LongRunnable::Worker
- # Optional
- # Called after initialization of the Worker class, use this method as an initializer.
- def setup; end
- # Required
- # Put your agent logic in here, it must not return. If it does your agent will be restarted.
- def run; end
- # Optional
- # Use this method the gracefully stop your agent but make sure the run method return, or
- # terminate the thread.
- def stop; end
- end
- end
- =end
- module LongRunnable
- extend ActiveSupport::Concern
- included do |base|
- AgentRunner.register(base)
- end
- def start_worker?
- true
- end
- def worker_id(config = nil)
- "#{self.class.to_s}-#{id}-#{Digest::SHA1.hexdigest((config.presence || options).to_json)}"
- end
- module ClassMethods
- def setup_worker
- active.map do |agent|
- next unless agent.start_worker?
- self::Worker.new(id: agent.worker_id, agent: agent)
- end.compact
- end
- end
- class Worker
- attr_reader :thread, :id, :agent, :config, :mutex, :scheduler, :restarting
- def initialize(options = {})
- @id = options[:id]
- @agent = options[:agent]
- @config = options[:config]
- @restarting = false
- end
- def run
- raise StandardError, 'Override LongRunnable::Worker#run in your agent Worker subclass.'
- end
- def run!
- @thread = Thread.new do
- Thread.current[:name] = "#{id}-#{Time.now}"
- begin
- run
- rescue SignalException, SystemExit
- stop!
- rescue StandardError => e
- message = "#{id} Exception #{e.message}:\n#{e.backtrace.first(10).join("\n")}"
- AgentRunner.with_connection do
- agent.error(message)
- end
- end
- end
- end
- def setup!(scheduler, mutex)
- @scheduler = scheduler
- @mutex = mutex
- setup if respond_to?(:setup)
- end
- def stop!
- @scheduler.jobs(tag: id).each(&:unschedule)
- if respond_to?(:stop)
- stop
- else
- terminate_thread!
- end
- end
- def terminate_thread!
- if thread
- thread.instance_eval { ActiveRecord::Base.connection_pool.release_connection }
- thread.wakeup if thread.status == 'sleep'
- thread.terminate
- end
- end
- def restart!
- without_alive_check do
- puts "--> Restarting #{id} at #{Time.now} <--"
- stop!
- setup!(scheduler, mutex)
- run!
- end
- end
- def every(*args, &blk)
- schedule(:every, args, &blk)
- end
- def cron(*args, &blk)
- schedule(:cron, args, &blk)
- end
- def schedule_in(*args, &blk)
- schedule(:schedule_in, args, &blk)
- end
- def boolify(value)
- agent.send(:boolify, value)
- end
- private
- def schedule(method, args, &blk)
- @scheduler.send(method, *args, tag: id, &blk)
- end
- def without_alive_check(&blk)
- @restarting = true
- yield
- ensure
- @restarting = false
- end
- end
- end
|