123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- require "json"
- module Agents
- class MqttAgent < Agent
- gem_dependency_check { defined?(MQTT) }
- description <<~MD
- The MQTT Agent allows both publication and subscription to an MQTT topic.
- #{'## Include `mqtt` in your Gemfile to use this Agent!' if dependencies_missing?}
- MQTT is a generic transport protocol for machine to machine communication.
- You can do things like:
- * Publish to [RabbitMQ](http://www.rabbitmq.com/mqtt.html)
- * Run [OwnTracks, a location tracking tool](http://owntracks.org/) for iOS and Android
- * Subscribe to your home automation setup like [Ninjablocks](http://forums.ninjablocks.com/index.php?p=/discussion/661/today-i-learned-about-mqtt/p1) or [TheThingSystem](http://thethingsystem.com/dev/supported-things.html)
- Simply choose a topic (think email subject line) to publish/listen to, and configure your service.
- It's easy to setup your own [broker](http://jpmens.net/2013/09/01/installing-mosquitto-on-a-raspberry-pi/) or connect to a [cloud service](http://www.cloudmqtt.com)
- Hints:
- Many services run mqtts (mqtt over SSL) often with a custom certificate.
- You'll want to download their cert and install it locally, specifying the ```certificate_path``` configuration.
- Example configuration:
- <pre><code>{
- 'uri' => 'mqtts://user:pass@localhost:8883'
- 'ssl' => :TLSv1,
- 'ca_file' => './ca.pem',
- 'cert_file' => './client.crt',
- 'key_file' => './client.key',
- 'topic' => 'huginn'
- }
- </code></pre>
- Subscribe to CloCkWeRX's TheThingSystem instance (thethingsystem.com), where
- temperature and other events are being published.
- <pre><code>{
- 'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858',
- 'topic' => 'the_thing_system/demo'
- }
- </code></pre>
- Subscribe to all topics
- <pre><code>{
- 'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858',
- 'topic' => '/#'
- }
- </code></pre>
- Find out more detail on [subscription wildcards](http://www.eclipse.org/paho/files/mqttdoc/Cclient/wildcard.html)
- MD
- event_description <<~MD
- Events are simply nested MQTT payloads. For example, an MQTT payload for Owntracks
- {
- "topic": "owntracks/kcqlmkgx/Dan",
- "message": {"_type": "location", "lat": "-34.8493644", "lon": "138.5218119", "tst": "1401771049", "acc": "50.0", "batt": "31", "desc": "Home", "event": "enter"},
- "time": 1401771051
- }
- MD
- def validate_options
- unless options['uri'].present? &&
- options['topic'].present?
- errors.add(:base, "topic and uri are required")
- end
- end
- def working?
- (event_created_within?(interpolated['expected_update_period_in_days']) && !recent_error_logs?) || received_event_without_error?
- end
- def default_options
- {
- 'uri' => 'mqtts://user:pass@localhost:8883',
- 'ssl' => :TLSv1,
- 'ca_file' => './ca.pem',
- 'cert_file' => './client.crt',
- 'key_file' => './client.key',
- 'topic' => 'huginn',
- 'max_read_time' => '10',
- 'expected_update_period_in_days' => '2'
- }
- end
- def mqtt_client
- @client ||= MQTT::Client.new(interpolated['uri']).tap { |c|
- if interpolated['ssl']
- c.ssl = interpolated['ssl'].to_sym
- c.ca_file = interpolated['ca_file']
- c.cert_file = interpolated['cert_file']
- c.key_file = interpolated['key_file']
- end
- }
- end
- def receive(incoming_events)
- mqtt_client.connect do |c|
- incoming_events.each do |event|
- c.publish(interpolated(event)['topic'], event.payload['message'])
- end
- end
- end
- def check
- last_message = memory['last_message']
- mqtt_client.connect
- poll_thread = Thread.new do
- mqtt_client.get_packet(interpolated['topic']) do |packet|
- topic, payload = message = [packet.topic, packet.payload]
- # Ignore a message if it is previously received
- next if (packet.retain || packet.duplicate) && message == last_message
- last_message = message
- # A lot of services generate JSON, so try that.
- begin
- payload = JSON.parse(payload)
- rescue StandardError
- end
- create_event payload: {
- 'topic' => topic,
- 'message' => payload,
- 'time' => Time.now.to_i
- }
- end
- end
- sleep (interpolated['max_read_time'].presence || 15).to_f
- mqtt_client.disconnect
- poll_thread.kill
- # Remember the last original (non-retain, non-duplicate) message
- self.memory['last_message'] = last_message
- save!
- end
- end
- end
|