Browse Source

Merge pull request #368 from CloCkWeRX/add_mqtt_support

Add simple MQTT support to subscribe to, publish messages.
Andrew Cantino 10 years ago
parent
commit
d66015c403
5 changed files with 331 additions and 0 deletions
  1. 2 0
      Gemfile
  2. 2 0
      Gemfile.lock
  3. 138 0
      app/models/agents/mqtt_agent.rb
  4. 52 0
      spec/models/agents/mqtt_agent_spec.rb
  5. 137 0
      spec/support/fake_mqtt_server.rb

+ 2 - 0
Gemfile

@@ -74,6 +74,8 @@ gem 'slack-notifier', '~> 0.5.0'
 
 gem 'therubyracer', '~> 0.12.1'
 
+gem 'mqtt'
+
 group :development do
   gem 'binding_of_caller'
   gem 'better_errors'

+ 2 - 0
Gemfile.lock

@@ -160,6 +160,7 @@ GEM
     mime-types (1.25.1)
     mini_portile (0.5.3)
     minitest (5.3.3)
+    mqtt (0.2.0)
     multi_json (1.9.3)
     multi_xml (0.5.5)
     multipart-post (2.0.0)
@@ -341,6 +342,7 @@ DEPENDENCIES
   kaminari (~> 0.15.1)
   kramdown (~> 1.3.3)
   liquid (~> 2.6.1)
+  mqtt
   mysql2 (~> 0.3.15)
   nokogiri (~> 1.6.1)
   protected_attributes (~> 1.0.7)

+ 138 - 0
app/models/agents/mqtt_agent.rb

@@ -0,0 +1,138 @@
+# encoding: utf-8 
+require "mqtt"
+require "json"
+
+module Agents
+  class MqttAgent < Agent
+    description <<-MD
+      The MQTT agent allows both publication and subscription to an MQTT topic.
+
+      MQTT is a generic transport protocol for machine to machine communication.
+
+      You can do things like:
+
+       * Publish to [RabbitMQ](http://www.rabbitmq.com/mqtt.html)
+       * Run [OwnTracks, a location tracking tool](http://owntracks.org/) for iOS and Android
+       * Subscribe to your home automation setup like [Ninjablocks](http://forums.ninjablocks.com/index.php?p=/discussion/661/today-i-learned-about-mqtt/p1) or [TheThingSystem](http://thethingsystem.com/dev/supported-things.html)
+
+      Simply choose a topic (think email subject line) to publish/listen to, and configure your service.
+
+      It's easy to setup your own [broker](http://jpmens.net/2013/09/01/installing-mosquitto-on-a-raspberry-pi/) or connect to a [cloud service](www.cloudmqtt.com)
+
+      Hints:
+      Many services run mqtts (mqtt over SSL) often with a custom certificate.
+
+      You'll want to download their cert and install it locally, specifying the ```certificate_path``` configuration.
+
+
+      Example configuration:
+
+      <pre><code>{
+        'uri' => 'mqtts://user:pass@locahost:8883'
+        'ssl' => :TLSv1,
+        'ca_file' => './ca.pem',
+        'cert_file' => './client.crt',
+        'key_file' => './client.key',
+        'topic' => 'huginn'
+      }
+      </code></pre>
+
+      Subscribe to CloCkWeRX's TheThingSystem instance (thethingsystem.com), where
+      temperature and other events are being published.
+
+      <pre><code>{
+        'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858',
+        'topic' => 'the_thing_system/demo'
+      }
+      </code></pre>
+
+      Subscribe to all topics
+      <pre><code>{
+        'uri' => 'mqtt://kcqlmkgx:sVNoccqwvXxE@m10.cloudmqtt.com:13858',
+        'topic' => '/#'
+      }
+      </code></pre>
+
+      Find out more detail on [subscription wildcards](http://www.eclipse.org/paho/files/mqttdoc/Cclient/wildcard.html)
+    MD
+
+    event_description <<-MD
+      Events are simply nested MQTT payloads. For example, an MQTT payload for Owntracks
+
+      <pre><code>{
+        "topic": "owntracks/kcqlmkgx/Dan",
+        "message": {"_type": "location", "lat": "-34.8493644", "lon": "138.5218119", "tst": "1401771049", "acc": "50.0", "batt": "31", "desc": "Home", "event": "enter"},
+        "time": 1401771051
+      }</code></pre>
+    MD
+
+    def validate_options
+      unless options['uri'].present? &&
+        options['topic'].present?
+        errors.add(:base, "topic and uri are required")
+      end
+    end
+
+    def working?
+      event_created_within?(options['expected_update_period_in_days']) && !recent_error_logs?
+    end
+
+    def default_options
+      {
+        'uri' => 'mqtts://user:pass@locahost:8883',
+        'ssl' => :TLSv1,
+        'ca_file'  => './ca.pem',
+        'cert_file' => './client.crt',
+        'key_file' => './client.key',
+        'topic' => 'huginn',
+        'max_read_time' => '10'
+      }
+    end
+
+    def mqtt_client
+      @client ||= MQTT::Client.new(options['uri'])
+
+      if options['ssl']
+        @client.ssl = options['ssl'].to_sym
+        @client.ca_file = options['ca_file']
+        @client.cert_file = options['cert_file']
+        @client.key_file = options['key_file']
+      end
+
+      @client
+    end
+
+    def receive(incoming_events)
+      mqtt_client.connect do |c|
+        incoming_events.each do |event|
+          c.publish(options['topic'], payload)
+        end
+
+        c.disconnect
+      end
+    end
+
+
+    def check
+      mqtt_client.connect do |c|
+
+        Timeout::timeout((options['max_read_time'].presence || 15).to_i) {
+          c.get(options['topic']) do |topic, message|
+
+            # A lot of services generate JSON. Try that first
+            payload = JSON.parse(message) rescue message
+
+            create_event :payload => { 
+              'topic' => topic, 
+              'message' => payload, 
+              'time' => Time.now.to_i 
+            }
+          end
+        } rescue TimeoutError
+
+        c.disconnect   
+      end
+    end
+
+  end
+end

+ 52 - 0
spec/models/agents/mqtt_agent_spec.rb

@@ -0,0 +1,52 @@
+require 'spec_helper'
+require 'mqtt'
+require './spec/support/fake_mqtt_server'
+
+describe Agents::MqttAgent do
+
+  before :each do
+    @error_log = StringIO.new
+
+    @server = MQTT::FakeServer.new(41234, '127.0.0.1')
+    @server.just_one = true
+    @server.logger = Logger.new(@error_log)
+    @server.logger.level = Logger::DEBUG
+    @server.start
+
+    @valid_params = {
+      'uri' => "mqtt://#{@server.address}:#{@server.port}",
+      'topic' => '/#',
+      'max_read_time' => '1',
+      'expected_update_period_in_days' => "2"
+    }
+
+    @checker = Agents::MqttAgent.new(
+      :name => "somename", 
+      :options => @valid_params, 
+      :schedule => "midnight",
+    )
+    @checker.user = users(:jane)
+    @checker.save!
+  end
+
+  after :each do
+    @server.stop
+  end
+
+  describe "#check" do
+    it "should check that initial run creates an event" do
+      expect { @checker.check }.to change { Event.count }.by(2)
+    end
+  end
+
+  describe "#working?" do
+    it "checks if its generating events as scheduled" do
+      @checker.should_not be_working
+      @checker.check
+      @checker.reload.should be_working
+      three_days_from_now = 3.days.from_now
+      stub(Time).now { three_days_from_now }
+      @checker.should_not be_working
+    end
+  end
+end

+ 137 - 0
spec/support/fake_mqtt_server.rb

@@ -0,0 +1,137 @@
+#!/usr/bin/env ruby
+#
+# This is a 'fake' MQTT server to help with testing client implementations
+#
+# See https://github.com/njh/ruby-mqtt/blob/master/spec/fake_server.rb
+#
+# It behaves in the following ways:
+#   * Responses to CONNECT with a successful CONACK
+#   * Responses to PUBLISH by echoing the packet back
+#   * Responses to SUBSCRIBE with SUBACK and a PUBLISH to the topic
+#   * Responses to PINGREQ with PINGRESP
+#   * Responses to DISCONNECT by closing the socket
+#
+# It has the following restrictions
+#   * Doesn't deal with timeouts
+#   * Only handles a single connection at a time
+#
+
+$:.unshift File.dirname(__FILE__)+'/../lib'
+
+require 'logger'
+require 'socket'
+require 'mqtt'
+
+
+class MQTT::FakeServer
+  attr_reader :address, :port
+  attr_reader :last_publish
+  attr_reader :thread
+  attr_reader :pings_received
+  attr_accessor :just_one
+  attr_accessor :logger
+
+  # Create a new fake MQTT server
+  #
+  # If no port is given, bind to a random port number
+  # If no bind address is given, bind to localhost
+  def initialize(port=nil, bind_address='127.0.0.1')
+    @port = port
+    @address = bind_address
+  end
+
+  # Get the logger used by the server
+  def logger
+    @logger ||= Logger.new(STDOUT)
+  end
+
+  # Start the thread and open the socket that will process client connections
+  def start
+    @socket ||= TCPServer.new(@address, @port)
+    @address = @socket.addr[3]
+    @port = @socket.addr[1]
+    @thread ||= Thread.new do
+      logger.info "Started a fake MQTT server on #{@address}:#{@port}"
+      loop do
+        # Wait for a client to connect
+        client = @socket.accept
+        @pings_received = 0
+        handle_client(client)
+        break if just_one
+      end
+    end
+  end
+
+  # Stop the thread and close the socket
+  def stop
+    logger.info "Stopping fake MQTT server"
+    @socket.close unless @socket.nil?
+    @socket = nil
+
+    @thread.kill if @thread and @thread.alive?
+    @thread = nil
+  end
+
+  # Start the server thread and wait for it to finish (possibly never)
+  def run
+    start
+    begin
+      @thread.join
+    rescue Interrupt
+      stop
+    end
+  end
+
+
+  protected
+
+  # Given a client socket, process MQTT packets from the client
+  def handle_client(client)
+    loop do
+      packet = MQTT::Packet.read(client)
+      logger.debug packet.inspect
+
+      case packet
+        when MQTT::Packet::Connect
+          client.write MQTT::Packet::Connack.new(:return_code => 0)
+        when MQTT::Packet::Publish
+          client.write packet
+          @last_publish = packet
+        when MQTT::Packet::Subscribe
+          client.write MQTT::Packet::Suback.new(
+            :message_id => packet.message_id,
+            :granted_qos => 0
+          )
+          topic = packet.topics[0][0]
+          client.write MQTT::Packet::Publish.new(
+            :topic => topic,
+            :payload => "hello #{topic}",
+            :retain => true
+          )
+          client.write MQTT::Packet::Publish.new(
+            :topic => topic,
+            :payload => "did you know about #{topic}",
+            :retain => true
+          )
+
+        when MQTT::Packet::Pingreq
+          client.write MQTT::Packet::Pingresp.new
+          @pings_received += 1
+        when MQTT::Packet::Disconnect
+          client.close
+        break
+      end
+    end
+
+    rescue MQTT::ProtocolException => e
+      logger.warn "Protocol error, closing connection: #{e}"
+      client.close
+  end
+
+end
+
+if __FILE__ == $0
+  server = MQTT::FakeServer.new(MQTT::DEFAULT_PORT)
+  server.logger.level = Logger::DEBUG
+  server.run
+end