agent.rb 14 KB


  1. require 'utils'
  2. # Agent is the core class in Huginn, representing a configurable, schedulable, reactive system with memory that can
  3. # be sub-classed for many different purposes. Agents can emit Events, as well as receive them and react in many different ways.
  4. # The basic Agent API is detailed on the Huginn wiki: https://github.com/cantino/huginn/wiki/Creating-a-new-agent
  5. class Agent < ActiveRecord::Base
  6. include AssignableTypes
  7. include MarkdownClassAttributes
  8. include JSONSerializedField
  9. include RDBMSFunctions
  10. include WorkingHelpers
  11. include LiquidInterpolatable
  12. include HasGuid
  13. include LiquidDroppable
  14. include DryRunnable
  15. include SortableEvents
  16. markdown_class_attributes :description, :event_description
  17. load_types_in "Agents"
  18. SCHEDULES = %w[every_1m every_2m every_5m every_10m every_30m every_1h every_2h every_5h every_12h every_1d every_2d every_7d
  19. midnight 1am 2am 3am 4am 5am 6am 7am 8am 9am 10am 11am noon 1pm 2pm 3pm 4pm 5pm 6pm 7pm 8pm 9pm 10pm 11pm never]
  20. EVENT_RETENTION_SCHEDULES = [["Forever", 0], ['1 hour', 1.hour], ['6 hours', 6.hours], ["1 day", 1.day], *([2, 3, 4, 5, 7, 14, 21, 30, 45, 90, 180, 365].map {|n| ["#{n} days", n.days] })]
  21. attr_accessible :options, :memory, :name, :type, :schedule, :controller_ids, :control_target_ids, :disabled, :source_ids, :scenario_ids, :keep_events_for, :propagate_immediately, :drop_pending_events
  22. json_serialize :options, :memory
  23. validates_presence_of :name, :user
  24. validates_inclusion_of :keep_events_for, :in => EVENT_RETENTION_SCHEDULES.map(&:last)
  25. validate :sources_are_owned
  26. validate :controllers_are_owned
  27. validate :control_targets_are_owned
  28. validate :scenarios_are_owned
  29. validate :validate_schedule
  30. validate :validate_options
  31. after_initialize :set_default_schedule
  32. before_validation :set_default_schedule
  33. before_validation :unschedule_if_cannot_schedule
  34. before_save :unschedule_if_cannot_schedule
  35. before_create :set_last_checked_event_id
  36. after_save :possibly_update_event_expirations
  37. belongs_to :user, :inverse_of => :agents
  38. belongs_to :service, :inverse_of => :agents
  39. has_many :events, -> { order("events.id desc") }, :dependent => :delete_all, :inverse_of => :agent
  40. has_one :most_recent_event, -> { order("events.id desc") }, :inverse_of => :agent, :class_name => "Event"
  41. has_many :logs, -> { order("agent_logs.id desc") }, :dependent => :delete_all, :inverse_of => :agent, :class_name => "AgentLog"
  42. has_many :received_events, -> { order("events.id desc") }, :through => :sources, :class_name => "Event", :source => :events
  43. has_many :links_as_source, :dependent => :delete_all, :foreign_key => "source_id", :class_name => "Link", :inverse_of => :source
  44. has_many :links_as_receiver, :dependent => :delete_all, :foreign_key => "receiver_id", :class_name => "Link", :inverse_of => :receiver
  45. has_many :sources, :through => :links_as_receiver, :class_name => "Agent", :inverse_of => :receivers
  46. has_many :receivers, :through => :links_as_source, :class_name => "Agent", :inverse_of => :sources
  47. has_many :control_links_as_controller, dependent: :delete_all, foreign_key: 'controller_id', class_name: 'ControlLink', inverse_of: :controller
  48. has_many :control_links_as_control_target, dependent: :delete_all, foreign_key: 'control_target_id', class_name: 'ControlLink', inverse_of: :control_target
  49. has_many :controllers, through: :control_links_as_control_target, class_name: "Agent", inverse_of: :control_targets
  50. has_many :control_targets, through: :control_links_as_controller, class_name: "Agent", inverse_of: :controllers
  51. has_many :scenario_memberships, :dependent => :destroy, :inverse_of => :agent
  52. has_many :scenarios, :through => :scenario_memberships, :inverse_of => :agents
  53. scope :active, -> { where(disabled: false) }
  54. scope :inactive, -> { where(disabled: true) }
  55. scope :of_type, lambda { |type|
  56. type = case type
  57. when Agent
  58. type.class.to_s
  59. else
  60. type.to_s
  61. end
  62. where(:type => type)
  63. }
  64. def short_type
  65. type.demodulize
  66. end
  67. def check
  68. # Implement me in your subclass of Agent.
  69. end
  70. def default_options
  71. # Implement me in your subclass of Agent.
  72. {}
  73. end
  74. def receive(events)
  75. # Implement me in your subclass of Agent.
  76. end
  77. def is_form_configurable?
  78. false
  79. end
  80. def receive_web_request(params, method, format)
  81. # Implement me in your subclass of Agent.
  82. ["not implemented", 404]
  83. end
  84. # Implement me in your subclass to decide if your Agent is working.
  85. def working?
  86. raise "Implement me in your subclass"
  87. end
  88. def build_event(event)
  89. event = events.build(event) if event.is_a?(Hash)
  90. event.agent = self
  91. event.user = user
  92. event.expires_at ||= new_event_expiration_date
  93. event
  94. end
  95. def create_event(event)
  96. if can_create_events?
  97. event = build_event(event)
  98. event.save!
  99. event
  100. else
  101. error "This Agent cannot create events!"
  102. end
  103. end
  104. def credential(name)
  105. @credential_cache ||= {}
  106. if @credential_cache.has_key?(name)
  107. @credential_cache[name]
  108. else
  109. @credential_cache[name] = user.user_credentials.where(:credential_name => name).first.try(:credential_value)
  110. end
  111. end
  112. def reload
  113. @credential_cache = {}
  114. super
  115. end
  116. def new_event_expiration_date
  117. keep_events_for > 0 ? keep_events_for.seconds.from_now : nil
  118. end
  119. def update_event_expirations!
  120. if keep_events_for == 0
  121. events.update_all :expires_at => nil
  122. else
  123. events.update_all "expires_at = " + rdbms_date_add("created_at", "SECOND", keep_events_for.to_i)
  124. end
  125. end
  126. def trigger_web_request(params, method, format)
  127. if respond_to?(:receive_webhook)
  128. Rails.logger.warn "DEPRECATED: The .receive_webhook method is deprecated, please switch your Agent to use .receive_web_request."
  129. receive_webhook(params).tap do
  130. self.last_web_request_at = Time.now
  131. save!
  132. end
  133. else
  134. receive_web_request(params, method, format).tap do
  135. self.last_web_request_at = Time.now
  136. save!
  137. end
  138. end
  139. end
  140. def unavailable?
  141. disabled? || dependencies_missing?
  142. end
  143. def dependencies_missing?
  144. self.class.dependencies_missing?
  145. end
  146. def default_schedule
  147. self.class.default_schedule
  148. end
  149. def cannot_be_scheduled?
  150. self.class.cannot_be_scheduled?
  151. end
  152. def can_be_scheduled?
  153. !cannot_be_scheduled?
  154. end
  155. def cannot_receive_events?
  156. self.class.cannot_receive_events?
  157. end
  158. def can_receive_events?
  159. !cannot_receive_events?
  160. end
  161. def cannot_create_events?
  162. self.class.cannot_create_events?
  163. end
  164. def can_create_events?
  165. !cannot_create_events?
  166. end
  167. def can_control_other_agents?
  168. self.class.can_control_other_agents?
  169. end
  170. def can_dry_run?
  171. self.class.can_dry_run?
  172. end
  173. def no_bulk_receive?
  174. self.class.no_bulk_receive?
  175. end
  176. def log(message, options = {})
  177. AgentLog.log_for_agent(self, message, options)
  178. end
  179. def error(message, options = {})
  180. log(message, options.merge(:level => 4))
  181. end
  182. def delete_logs!
  183. logs.delete_all
  184. update_column :last_error_log_at, nil
  185. end
  186. def drop_pending_events
  187. false
  188. end
  189. def drop_pending_events=(bool)
  190. set_last_checked_event_id if bool
  191. end
  192. # Callbacks
  193. def set_default_schedule
  194. self.schedule = default_schedule unless schedule.present? || cannot_be_scheduled?
  195. end
  196. def unschedule_if_cannot_schedule
  197. self.schedule = nil if cannot_be_scheduled?
  198. end
  199. def set_last_checked_event_id
  200. if can_receive_events? && newest_event_id = Event.maximum(:id)
  201. self.last_checked_event_id = newest_event_id
  202. end
  203. end
  204. def possibly_update_event_expirations
  205. update_event_expirations! if keep_events_for_changed?
  206. end
  207. #Validation Methods
  208. private
  209. def sources_are_owned
  210. errors.add(:sources, "must be owned by you") unless sources.all? {|s| s.user_id == user_id }
  211. end
  212. def controllers_are_owned
  213. errors.add(:controllers, "must be owned by you") unless controllers.all? {|s| s.user_id == user_id }
  214. end
  215. def control_targets_are_owned
  216. errors.add(:control_targets, "must be owned by you") unless control_targets.all? {|s| s.user_id == user_id }
  217. end
  218. def scenarios_are_owned
  219. errors.add(:scenarios, "must be owned by you") unless scenarios.all? {|s| s.user_id == user_id }
  220. end
  221. def validate_schedule
  222. unless cannot_be_scheduled?
  223. errors.add(:schedule, "is not a valid schedule") unless SCHEDULES.include?(schedule.to_s)
  224. end
  225. end
  226. def validate_options
  227. # Implement me in your subclass to test for valid options.
  228. end
  229. # Utility Methods
  230. def boolify(option_value)
  231. case option_value
  232. when true, 'true'
  233. true
  234. when false, 'false'
  235. false
  236. else
  237. nil
  238. end
  239. end
  240. # Class Methods
  241. class << self
  242. def build_clone(original)
  243. new(original.slice(:type, :options, :schedule, :controller_ids, :control_target_ids,
  244. :source_ids, :keep_events_for, :propagate_immediately)) { |clone|
  245. # Give it a unique name
  246. 2.upto(count) do |i|
  247. name = '%s (%d)' % [original.name, i]
  248. unless exists?(name: name)
  249. clone.name = name
  250. break
  251. end
  252. end
  253. }
  254. end
  255. def cannot_be_scheduled!
  256. @cannot_be_scheduled = true
  257. end
  258. def cannot_be_scheduled?
  259. !!@cannot_be_scheduled
  260. end
  261. def default_schedule(schedule = nil)
  262. @default_schedule = schedule unless schedule.nil?
  263. @default_schedule
  264. end
  265. def cannot_create_events!
  266. @cannot_create_events = true
  267. end
  268. def cannot_create_events?
  269. !!@cannot_create_events
  270. end
  271. def cannot_receive_events!
  272. @cannot_receive_events = true
  273. end
  274. def cannot_receive_events?
  275. !!@cannot_receive_events
  276. end
  277. def can_control_other_agents?
  278. include? AgentControllerConcern
  279. end
  280. def can_dry_run!
  281. @can_dry_run = true
  282. end
  283. def can_dry_run?
  284. !!@can_dry_run
  285. end
  286. def no_bulk_receive!
  287. @no_bulk_receive = true
  288. end
  289. def no_bulk_receive?
  290. !!@no_bulk_receive
  291. end
  292. def gem_dependency_check
  293. @gem_dependencies_checked = true
  294. @gem_dependencies_met = yield
  295. end
  296. def dependencies_missing?
  297. @gem_dependencies_checked && !@gem_dependencies_met
  298. end
  299. # Find all Agents that have received Events since the last execution of this method. Update those Agents with
  300. # their new `last_checked_event_id` and queue each of the Agents to be called with #receive using `async_receive`.
  301. # This is called by bin/schedule.rb periodically.
  302. def receive!(options={})
  303. Agent.transaction do
  304. scope = Agent.
  305. select("agents.id AS receiver_agent_id, events.id AS event_id").
  306. joins("JOIN links ON (links.receiver_id = agents.id)").
  307. joins("JOIN agents AS sources ON (links.source_id = sources.id)").
  308. joins("JOIN events ON (events.agent_id = sources.id AND events.id > links.event_id_at_creation)").
  309. where("NOT agents.disabled AND (agents.last_checked_event_id IS NULL OR events.id > agents.last_checked_event_id)")
  310. if options[:only_receivers].present?
  311. scope = scope.where("agents.id in (?)", options[:only_receivers])
  312. end
  313. sql = scope.to_sql()
  314. agents_to_events = {}
  315. Agent.connection.select_rows(sql).each do |receiver_agent_id, event_id|
  316. agents_to_events[receiver_agent_id.to_i] ||= []
  317. agents_to_events[receiver_agent_id.to_i] << event_id
  318. end
  319. Agent.where(:id => agents_to_events.keys).each do |agent|
  320. event_ids = agents_to_events[agent.id].uniq
  321. agent.update_attribute :last_checked_event_id, event_ids.max
  322. if agent.no_bulk_receive?
  323. event_ids.each { |event_id| Agent.async_receive(agent.id, [event_id]) }
  324. else
  325. Agent.async_receive(agent.id, event_ids)
  326. end
  327. end
  328. {
  329. :agent_count => agents_to_events.keys.length,
  330. :event_count => agents_to_events.values.flatten.uniq.compact.length
  331. }
  332. end
  333. end
  334. # This method will enqueue an AgentReceiveJob job. It accepts Agent and Event ids instead of a literal ActiveRecord
  335. # models because it is preferable to serialize jobs with ids.
  336. def async_receive(agent_id, event_ids)
  337. AgentReceiveJob.perform_later(agent_id, event_ids)
  338. end
  339. # Given a schedule name, run `check` via `bulk_check` on all Agents with that schedule.
  340. # This is called by bin/schedule.rb for each schedule in `SCHEDULES`.
  341. def run_schedule(schedule)
  342. return if schedule == 'never'
  343. types = where(:schedule => schedule).group(:type).pluck(:type)
  344. types.each do |type|
  345. type.constantize.bulk_check(schedule)
  346. end
  347. end
  348. # Schedule `async_check`s for every Agent on the given schedule. This is normally called by `run_schedule` once
  349. # per type of agent, so you can override this to define custom bulk check behavior for your custom Agent type.
  350. def bulk_check(schedule)
  351. raise "Call #bulk_check on the appropriate subclass of Agent" if self == Agent
  352. where("agents.schedule = ? and disabled = false", schedule).pluck("agents.id").each do |agent_id|
  353. async_check(agent_id)
  354. end
  355. end
  356. # This method will enqueue an AgentCheckJob job. It accepts an Agent id instead of a literal Agent because it is
  357. # preferable to serialize job with ids, instead of with the full Agents.
  358. def async_check(agent_id)
  359. AgentCheckJob.perform_later(agent_id)
  360. end
  361. end
  362. end
  363. class AgentDrop
  364. def type
  365. @object.short_type
  366. end
  367. [
  368. :name,
  369. :type,
  370. :options,
  371. :memory,
  372. :sources,
  373. :receivers,
  374. :schedule,
  375. :controllers,
  376. :control_targets,
  377. :disabled,
  378. :keep_events_for,
  379. :propagate_immediately,
  380. ].each { |attr|
  381. define_method(attr) {
  382. @object.__send__(attr)
  383. } unless method_defined?(attr)
  384. }
  385. end