require 'rufus/scheduler' class Rufus::Scheduler SCHEDULER_AGENT_TAG = Agents::SchedulerAgent.name class Job # Store an ID of SchedulerAgent in this job. def scheduler_agent_id=(id) self[:scheduler_agent_id] = id end # Extract an ID of SchedulerAgent if any. def scheduler_agent_id self[:scheduler_agent_id] end # Return a SchedulerAgent tied to this job. Return nil if it is # not found or disabled. def scheduler_agent agent_id = scheduler_agent_id or return nil Agent.of_type(Agents::SchedulerAgent).active.find_by(id: agent_id) end end # Get all jobs tied to any SchedulerAgent def scheduler_agent_jobs jobs(tag: SCHEDULER_AGENT_TAG) end # Get a job tied to a given SchedulerAgent def scheduler_agent_job(agent) scheduler_agent_jobs.find { |job| job.scheduler_agent_id == agent.id } end # Schedule or reschedule a job for a given SchedulerAgent and return # the running job. Return nil if unscheduled. def schedule_scheduler_agent(agent) job = scheduler_agent_job(agent) if agent.unavailable? if job puts "Unscheduling SchedulerAgent##{agent.id} (disabled)" job.unschedule end nil else if job return job if agent.memory['scheduled_at'] == job.scheduled_at.to_i puts "Rescheduling SchedulerAgent##{agent.id}" job.unschedule else puts "Scheduling SchedulerAgent##{agent.id}" end agent_id = agent.id job = schedule_cron agent.options['schedule'], tag: SCHEDULER_AGENT_TAG do |job| job.scheduler_agent_id = agent_id if scheduler_agent = job.scheduler_agent scheduler_agent.control! else puts "Unscheduling SchedulerAgent##{job.scheduler_agent_id} (disabled or deleted)" job.unschedule end end # Make sure the job is associated with a SchedulerAgent before # it is triggered. job.scheduler_agent_id = agent_id agent.memory['scheduled_at'] = job.scheduled_at.to_i agent.save job end end # Schedule or reschedule jobs for all SchedulerAgents and unschedule # orphaned jobs if any. def schedule_scheduler_agents scheduled_jobs = Agent.of_type(Agents::SchedulerAgent).map { |scheduler_agent| schedule_scheduler_agent(scheduler_agent) }.compact (scheduler_agent_jobs - scheduled_jobs).each { |job| puts "Unscheduling SchedulerAgent##{job.scheduler_agent_id} (orphaned)" job.unschedule } end end class HuginnScheduler < LongRunnable::Worker include LongRunnable FAILED_JOBS_TO_KEEP = 100 SCHEDULE_TO_CRON = { '1m' => '*/1 * * * *', '2m' => '*/2 * * * *', '5m' => '*/5 * * * *', '10m' => '*/10 * * * *', '30m' => '*/30 * * * *', '1h' => '0 * * * *', '2h' => '0 */2 * * *', '5h' => '0 */5 * * *', '12h' => '0 */12 * * *', '1d' => '0 0 * * *', '2d' => '0 0 */2 * *', '7d' => '0 0 * * 1', } def setup Time.zone = ENV['TIMEZONE'].presence || 'Pacific Time (US & Canada)' tzinfo_friendly_timezone = Time.zone.tzinfo.identifier # Schedule event propagation. every '1m' do propagate! end # Schedule event cleanup. every ENV['EVENT_EXPIRATION_CHECK'].presence || '6h' do cleanup_expired_events! end # Schedule failed job cleanup. every '1h' do cleanup_failed_jobs! end # Schedule repeating events. SCHEDULE_TO_CRON.keys.each do |schedule| cron "#{SCHEDULE_TO_CRON[schedule]} #{tzinfo_friendly_timezone}" do run_schedule "every_#{schedule}" end end # Schedule events for specific times. 24.times do |hour| cron "0 #{hour} * * * " + tzinfo_friendly_timezone do run_schedule hour_to_schedule_name(hour) end end # Schedule Scheduler Agents every '1m' do @scheduler.schedule_scheduler_agents end end def run @scheduler.join end def self.setup_worker [new(id: self.to_s)] end private def run_schedule(time) with_mutex do puts "Queuing schedule for #{time}" AgentRunScheduleJob.perform_later(time) end end def propagate! with_mutex do return unless AgentPropagateJob.can_enqueue? puts "Queuing event propagation" AgentPropagateJob.perform_later end end def cleanup_expired_events! with_mutex do puts "Running event cleanup" AgentCleanupExpiredJob.perform_later end end def cleanup_failed_jobs! num_to_keep = (ENV['FAILED_JOBS_TO_KEEP'].presence || FAILED_JOBS_TO_KEEP).to_i first_to_delete = Delayed::Job.where.not(failed_at: nil).order("failed_at DESC").offset(num_to_keep).limit(1).pluck(:failed_at).first Delayed::Job.where(["failed_at <= ?", first_to_delete]).delete_all if first_to_delete.present? end def hour_to_schedule_name(hour) if hour == 0 "midnight" elsif hour < 12 "#{hour}am" elsif hour == 12 "noon" else "#{hour - 12}pm" end end def with_mutex mutex.synchronize do ActiveRecord::Base.connection_pool.with_connection do yield end end end end