huginn_scheduler.rb 5.1 KB


  1. require 'rufus/scheduler'
  2. class Rufus::Scheduler
  3. SCHEDULER_AGENT_TAG = Agents::SchedulerAgent.name
  4. class Job
  5. # Store an ID of SchedulerAgent in this job.
  6. def scheduler_agent_id=(id)
  7. self[:scheduler_agent_id] = id
  8. end
  9. # Extract an ID of SchedulerAgent if any.
  10. def scheduler_agent_id
  11. self[:scheduler_agent_id]
  12. end
  13. # Return a SchedulerAgent tied to this job. Return nil if it is
  14. # not found or disabled.
  15. def scheduler_agent
  16. agent_id = scheduler_agent_id or return nil
  17. Agent.of_type(Agents::SchedulerAgent).active.find_by(id: agent_id)
  18. end
  19. end
  20. # Get all jobs tied to any SchedulerAgent
  21. def scheduler_agent_jobs
  22. jobs(tag: SCHEDULER_AGENT_TAG)
  23. end
  24. # Get a job tied to a given SchedulerAgent
  25. def scheduler_agent_job(agent)
  26. scheduler_agent_jobs.find { |job|
  27. job.scheduler_agent_id == agent.id
  28. }
  29. end
  30. # Schedule or reschedule a job for a given SchedulerAgent and return
  31. # the running job. Return nil if unscheduled.
  32. def schedule_scheduler_agent(agent)
  33. job = scheduler_agent_job(agent)
  34. if agent.unavailable?
  35. if job
  36. puts "Unscheduling SchedulerAgent##{agent.id} (disabled)"
  37. job.unschedule
  38. end
  39. nil
  40. else
  41. if job
  42. return job if agent.memory['scheduled_at'] == job.scheduled_at.to_i
  43. puts "Rescheduling SchedulerAgent##{agent.id}"
  44. job.unschedule
  45. else
  46. puts "Scheduling SchedulerAgent##{agent.id}"
  47. end
  48. agent_id = agent.id
  49. job = schedule_cron agent.options['schedule'], tag: SCHEDULER_AGENT_TAG do |job|
  50. job.scheduler_agent_id = agent_id
  51. if scheduler_agent = job.scheduler_agent
  52. scheduler_agent.control!
  53. else
  54. puts "Unscheduling SchedulerAgent##{job.scheduler_agent_id} (disabled or deleted)"
  55. job.unschedule
  56. end
  57. end
  58. # Make sure the job is associated with a SchedulerAgent before
  59. # it is triggered.
  60. job.scheduler_agent_id = agent_id
  61. agent.memory['scheduled_at'] = job.scheduled_at.to_i
  62. agent.save
  63. job
  64. end
  65. end
  66. # Schedule or reschedule jobs for all SchedulerAgents and unschedule
  67. # orphaned jobs if any.
  68. def schedule_scheduler_agents
  69. scheduled_jobs = Agent.of_type(Agents::SchedulerAgent).map { |scheduler_agent|
  70. schedule_scheduler_agent(scheduler_agent)
  71. }.compact
  72. (scheduler_agent_jobs - scheduled_jobs).each { |job|
  73. puts "Unscheduling SchedulerAgent##{job.scheduler_agent_id} (orphaned)"
  74. job.unschedule
  75. }
  76. end
  77. end
  78. class HuginnScheduler < LongRunnable::Worker
  79. include LongRunnable
  80. FAILED_JOBS_TO_KEEP = 100
  81. SCHEDULE_TO_CRON = {
  82. '1m' => '*/1 * * * *',
  83. '2m' => '*/2 * * * *',
  84. '5m' => '*/5 * * * *',
  85. '10m' => '*/10 * * * *',
  86. '30m' => '*/30 * * * *',
  87. '1h' => '0 * * * *',
  88. '2h' => '0 */2 * * *',
  89. '5h' => '0 */5 * * *',
  90. '12h' => '0 */12 * * *',
  91. '1d' => '0 0 * * *',
  92. '2d' => '0 0 */2 * *',
  93. '7d' => '0 0 * * 1',
  94. }
  95. def setup
  96. Time.zone = ENV['TIMEZONE'].presence || 'Pacific Time (US & Canada)'
  97. tzinfo_friendly_timezone = Time.zone.tzinfo.identifier
  98. # Schedule event propagation.
  99. every '1m' do
  100. propagate!
  101. end
  102. # Schedule event cleanup.
  103. every ENV['EVENT_EXPIRATION_CHECK'].presence || '6h' do
  104. cleanup_expired_events!
  105. end
  106. # Schedule failed job cleanup.
  107. every '1h' do
  108. cleanup_failed_jobs!
  109. end
  110. # Schedule repeating events.
  111. SCHEDULE_TO_CRON.keys.each do |schedule|
  112. cron "#{SCHEDULE_TO_CRON[schedule]} #{tzinfo_friendly_timezone}" do
  113. run_schedule "every_#{schedule}"
  114. end
  115. end
  116. # Schedule events for specific times.
  117. 24.times do |hour|
  118. cron "0 #{hour} * * * " + tzinfo_friendly_timezone do
  119. run_schedule hour_to_schedule_name(hour)
  120. end
  121. end
  122. # Schedule Scheduler Agents
  123. every '1m' do
  124. @scheduler.schedule_scheduler_agents
  125. end
  126. end
  127. def run
  128. @scheduler.join
  129. end
  130. def self.setup_worker
  131. [new(id: self.to_s)]
  132. end
  133. private
  134. def run_schedule(time)
  135. with_mutex do
  136. puts "Queuing schedule for #{time}"
  137. AgentRunScheduleJob.perform_later(time)
  138. end
  139. end
  140. def propagate!
  141. with_mutex do
  142. return unless AgentPropagateJob.can_enqueue?
  143. puts "Queuing event propagation"
  144. AgentPropagateJob.perform_later
  145. end
  146. end
  147. def cleanup_expired_events!
  148. with_mutex do
  149. puts "Running event cleanup"
  150. AgentCleanupExpiredJob.perform_later
  151. end
  152. end
  153. def cleanup_failed_jobs!
  154. num_to_keep = (ENV['FAILED_JOBS_TO_KEEP'].presence || FAILED_JOBS_TO_KEEP).to_i
  155. first_to_delete = Delayed::Job.where.not(failed_at: nil).order("failed_at DESC").offset(num_to_keep).limit(1).pluck(:failed_at).first
  156. Delayed::Job.where(["failed_at <= ?", first_to_delete]).delete_all if first_to_delete.present?
  157. end
  158. def hour_to_schedule_name(hour)
  159. if hour == 0
  160. "midnight"
  161. elsif hour < 12
  162. "#{hour}am"
  163. elsif hour == 12
  164. "noon"
  165. else
  166. "#{hour - 12}pm"
  167. end
  168. end
  169. def with_mutex
  170. mutex.synchronize do
  171. ActiveRecord::Base.connection_pool.with_connection do
  172. yield
  173. end
  174. end
  175. end
  176. end