mqtt_agent.rb 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. # encoding: utf-8
  2. require "json"
  3. module Agents
  4. class MqttAgent < Agent
  5. gem_dependency_check { defined?(MQTT) }
  6. description <<-MD
  7. The MQTT Agent allows both publication and subscription to an MQTT topic.
  8. #{'## Include `mqtt` in your Gemfile to use this Agent!' if dependencies_missing?}
  9. MQTT is a generic transport protocol for machine to machine communication.
  10. You can do things like:
  11. * Publish to [RabbitMQ](http://www.rabbitmq.com/mqtt.html)
  12. * Run [OwnTracks, a location tracking tool](http://owntracks.org/) for iOS and Android
  13. * Subscribe to your home automation setup like [Ninjablocks](http://forums.ninjablocks.com/index.php?p=/discussion/661/today-i-learned-about-mqtt/p1) or [TheThingSystem](http://thethingsystem.com/dev/supported-things.html)
  14. Simply choose a topic (think email subject line) to publish/listen to, and configure your service.
  15. It's easy to setup your own [broker](http://jpmens.net/2013/09/01/installing-mosquitto-on-a-raspberry-pi/) or connect to a [cloud service](http://www.cloudmqtt.com)
  16. Hints:
  17. Many services run mqtts (mqtt over SSL) often with a custom certificate.
  18. You'll want to download their cert and install it locally, specifying the ```certificate_path``` configuration.
  19. Example configuration:
  20. <pre><code>{
  21. 'uri' => 'mqtts://user:pass@localhost:8883'
  22. 'ssl' => :TLSv1,
  23. 'ca_file' => './ca.pem',
  24. 'cert_file' => './client.crt',
  25. 'key_file' => './client.key',
  26. 'topic' => 'huginn'
  27. }
  28. </code></pre>
  29. Subscribe to CloCkWeRX's TheThingSystem instance (thethingsystem.com), where
  30. temperature and other events are being published.
  31. <pre><code>{
  32. 'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858',
  33. 'topic' => 'the_thing_system/demo'
  34. }
  35. </code></pre>
  36. Subscribe to all topics
  37. <pre><code>{
  38. 'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858',
  39. 'topic' => '/#'
  40. }
  41. </code></pre>
  42. Find out more detail on [subscription wildcards](http://www.eclipse.org/paho/files/mqttdoc/Cclient/wildcard.html)
  43. MD
  44. event_description <<-MD
  45. Events are simply nested MQTT payloads. For example, an MQTT payload for Owntracks
  46. <pre><code>{
  47. "topic": "owntracks/kcqlmkgx/Dan",
  48. "message": {"_type": "location", "lat": "-34.8493644", "lon": "138.5218119", "tst": "1401771049", "acc": "50.0", "batt": "31", "desc": "Home", "event": "enter"},
  49. "time": 1401771051
  50. }</code></pre>
  51. MD
  52. def validate_options
  53. unless options['uri'].present? &&
  54. options['topic'].present?
  55. errors.add(:base, "topic and uri are required")
  56. end
  57. end
  58. def working?
  59. (event_created_within?(interpolated['expected_update_period_in_days']) && !recent_error_logs?) || received_event_without_error?
  60. end
  61. def default_options
  62. {
  63. 'uri' => 'mqtts://user:pass@localhost:8883',
  64. 'ssl' => :TLSv1,
  65. 'ca_file' => './ca.pem',
  66. 'cert_file' => './client.crt',
  67. 'key_file' => './client.key',
  68. 'topic' => 'huginn',
  69. 'max_read_time' => '10',
  70. 'expected_update_period_in_days' => '2'
  71. }
  72. end
  73. def mqtt_client
  74. @client ||= MQTT::Client.new(interpolated['uri'])
  75. if interpolated['ssl']
  76. @client.ssl = interpolated['ssl'].to_sym
  77. @client.ca_file = interpolated['ca_file']
  78. @client.cert_file = interpolated['cert_file']
  79. @client.key_file = interpolated['key_file']
  80. end
  81. @client
  82. end
  83. def receive(incoming_events)
  84. mqtt_client.connect do |c|
  85. incoming_events.each do |event|
  86. c.publish(interpolated(event)['topic'], event.payload['message'])
  87. end
  88. end
  89. end
  90. def check
  91. last_message = memory['last_message']
  92. mqtt_client.connect do |c|
  93. begin
  94. Timeout.timeout((interpolated['max_read_time'].presence || 15).to_i) {
  95. c.get_packet(interpolated['topic']) do |packet|
  96. topic, payload = message = [packet.topic, packet.payload]
  97. # Ignore a message if it is previously received
  98. next if (packet.retain || packet.duplicate) && message == last_message
  99. last_message = message
  100. # A lot of services generate JSON, so try that.
  101. begin
  102. payload = JSON.parse(payload)
  103. rescue
  104. end
  105. create_event payload: {
  106. 'topic' => topic,
  107. 'message' => payload,
  108. 'time' => Time.now.to_i
  109. }
  110. end
  111. }
  112. rescue Timeout::Error
  113. end
  114. end
  115. # Remember the last original (non-retain, non-duplicate) message
  116. self.memory['last_message'] = last_message
  117. save!
  118. end
  119. end
  120. end