Mosquitto

From air
Revision as of 10:19, 9 November 2013 by Donsez (talk | contribs) (→‎Examples)
Jump to navigation Jump to search
The printable version is no longer supported and may have rendering errors. Please update your browser bookmarks and please use the default browser print function instead.

http://mosquitto.org/

Serveur MQTT écrit en C et C++

Installation rapide et simple


Premiers pas

Installation

sur Raspberry Pi

wget https://bitbucket.org/pypa/setuptools/raw/bootstrap/ez_setup.py -O - | python
sudo python get-pip.py

sur MacOS X

brew install mosquitto


sur Ubuntu

http://mosquitto.org/2013/01/mosquitto-debian-repository/

Lancement du broker

Lancez le broker Mosquitto sur l'hôte (10.0.1.3). Remarque : le broker est parfois lancé via initd (vérifiez avec ps -ax | grep mosquitto )

mosquitto

Publication & Souscription

Command lines

Publication en CLI

Publication depuis le Raspberry Pi http://mosquitto.org/man/mosquitto_pub-1.html

BROKER=10.0.1.3
mosquitto_pub -h $BROKER -d -t arduino/temp -m "100"

Souscription en CLI

Souscription depuis l'hôte http://mosquitto.org/man/mosquitto_sub-1.html

BROKER=10.0.1.3
mosquitto_sub -h $BROKER -d -t arduino/temp

Python

Publication en Python

RPI + Arduino running Mosquitto MQTT publisher
Agrisensor pushing MQTT messages
Car park sensor pushing MQTT messages

Publication depuis le Raspberry Pi (et un Arduino) http://mosquitto.org/documentation/python/

BROKER=10.0.1.3
python publisharduino.py $BROKER
#!/usr/bin/python
#
# simple app to read string from serial port (arduino board)
# and publish via MQTT
#
# uses the Python MQTT client from the Mosquitto project
# http://mosquitto.org
#
# initially Andy Piper http://andypiper.co.uk
# 2011/09/15

import serial
import mosquitto
import os
import sys

broker=sys.argv[1]
port = 1883

serialdev = '/dev/ttyACM0'



#MQTT callbacks

def on_connect(mosq, obj, rc):
    if rc == 0:
        print("Connected successfully.")
    else:
        raise Exception


def on_disconnect(mosq, obj, rc):
    print("Disconnected successfully.")


def on_publish(mosq, obj, mid):
    print("Message "+str(mid)+" published.")


def on_subscribe(mosq, obj, mid, qos_list):
    print("Subscribe with mid "+str(mid)+" received.")


def on_unsubscribe(mosq, obj, mid):
    print("Unsubscribe with mid "+str(mid)+" received.")


def on_message(mosq, obj, msg):
    print("Message received on topic "+msg.topic+" with QoS "+str(msg.qos)+" and payload "+msg.payload)


#called on exit
#close serial, disconnect MQTT
def cleanup():
    print "Ending and cleaning up"
    ser.close()
    mqttc.disconnect()

try:
    print "Connecting... ", serialdev
    #connect to serial port
    ser = serial.Serial(serialdev, 9600, timeout=20)
except:
    print "Failed to connect serial"
    #unable to continue with no serial input
    raise SystemExit


try:
    ser.flushInput()
    #create an mqtt client
    mypid = os.getpid()
    client_uniq = "arduino_pub_"+str(mypid)
    mqttc = mosquitto.Mosquitto(client_uniq)

    #attach MQTT callbacks
    mqttc.on_connect = on_connect
    mqttc.on_disconnect = on_disconnect
    mqttc.on_publish = on_publish
    mqttc.on_subscribe = on_subscribe
    mqttc.on_unsubscribe = on_unsubscribe
    #mqttc.on_message = on_message

    #connect to broker
    mqttc.connect(broker, port, 60)

    #remain connected to broker
    #read data from serial and publish
    while mqttc.loop() == 0:
        line = ser.readline()
        #split line as it contains V,temp
        list = line.split(",")
        #second list element is temp
        temp = list[0].rstrip()
        print("Temp is "+temp)
        mqttc.publish("arduino/temp", temp)
        pass


# handle list index error (i.e. assume no data received)
except (IndexError):
    print "No data received within serial timeout period"
    cleanup()
# handle app closure
except (KeyboardInterrupt):
    print "Interrupt received"
    cleanup()
except (RuntimeError):
    print "uh-oh! time to die"
    cleanup()


Souscription en Python

Souscription depuis l'hôte http://mosquitto.org/documentation/python/

BROKER=10.0.1.3
python subscribearduino.py $BROKER
#!/usr/bin/python
#
# simple app to subscribe MQTT topic
#
# uses the Python MQTT client from the Mosquitto project
# http://mosquitto.org
#
# initially Andy Piper http://andypiper.co.uk
# 2011/09/15

import os
import mosquitto
import sys

broker=sys.argv[1]
#broker = "10.0.1.3"
port = 1883

serialdev = '/dev/ttyACM0'

#MQTT callbacks

def on_connect(mosq, obj, rc):
    if rc == 0:
        print("Connected successfully.")
    else:
        raise Exception

def on_disconnect(mosq, obj, rc):
    print("Disconnected successfully.")


def on_publish(mosq, obj, mid):
    print("Message "+str(mid)+" published.")


def on_subscribe(mosq, obj, mid, qos_list):
    print("Subscribe with mid "+str(mid)+" received.")


def on_unsubscribe(mosq, obj, mid):
    print("Unsubscribe with mid "+str(mid)+" received.")


def on_message(mosq, obj, msg):
    print("Message received on topic "+msg.topic+" with QoS "+str(msg.qos)+" and payload "+msg.payload)


#called on exit
# disconnect MQTT
def cleanup():
    print "Ending and cleaning up"
    mqttc.disconnect()

try:
    #create an mqtt client
    mypid = os.getpid()
    client_uniq = "arduino_pub_"+str(mypid)
    mqttc = mosquitto.Mosquitto(client_uniq)

    #attach MQTT callbacks
    mqttc.on_connect = on_connect
    mqttc.on_disconnect = on_disconnect
    mqttc.on_publish = on_publish
    mqttc.on_subscribe = on_subscribe
    mqttc.on_unsubscribe = on_unsubscribe
    mqttc.on_message = on_message

    #connect to broker
    mqttc.connect(broker, port, 60)

    mqttc.subscribe("arduino/temp")

    #remain connected to broker
    while mqttc.loop() == 0:
        pass


# handle app closure
except (KeyboardInterrupt):
    print "Interrupt received"
    cleanup()
except (RuntimeError):
    print "uh-oh! time to die"
    cleanup()

Node.js

Souscription avec Node.js

npm install mqtt
BROKER=10.0.1.3
node subscribe.js $BROKER 1883 arduino/temp
#!/usr/bin/env node

var mqtt = require('mqtt');

var argv = process.argv;

for (var i = 2; i <= 4; i++) {
  if(!argv[i]) process.exit(-1);
}

var port = argv[3]
  , host = argv[2]
  , topic = argv[4];

var c = mqtt.createClient(port, host);

c.on('connect', function() {

  c.on('message', function(topic, message) {
    console.log(topic + ' ' + message);
  });

  c.subscribe(topic);
});


Simple Souscripteur avec une interface Web

npm install mqtt
npm install express
BROKER=10.0.1.3
node webmqttsub.js $BROKER 1883 arduino/#

node webmqttsub.js $BROKER 1883 bbc/#
curl http://localhost:3000/*
curl http://localhost:3000/arduino/
curl http://localhost:3000/arduino/temp


// Simple Gateway between MQTT et HTTP (REST)

// Author: Didier Donsez, 2013
// TODO add PUT, POST (publish), DELETE method

var mqtt = require('mqtt');

var argv = process.argv;

for (var i = 2; i <= 4; i++) {
  if(!argv[i]) process.exit(-1);
}

var brokerport = argv[3]
  , brokerhost = argv[2]
  , subscribedtopicpattern = argv[4];

var topics = new Object();

var c = mqtt.createClient(brokerport, brokerhost);

c.on('connect', function() {

  c.on('message', function(topic, message) {
    console.log("receive "+ topic + ' ' + message);
      topics[topic]=message;
  });

  console.log("subscribe to " + subscribedtopicpattern);
  c.subscribe(subscribedtopicpattern);
});

var express = require('express');
var app = express();
app.use(function(req, res, next){
    var path=req.path;
    console.log('req.path=' + path);
    if(path=="/*"){
	res.set('Content-Type', 'application/json');
	res.send(JSON.stringify(topics));
     } else {
         var topic=path.substring(1);
	 if (topics.hasOwnProperty(topic))  {
	     if(req.get('Content-Type')=='application/json'){
		 res.set('Content-Type', 'application/json');
		 res.send( '{"'+topic+'","'+topics[topic]+'"}' );
	     } else { 
		 res.set('Content-Type', 'text/plain');
		 res.send(topics[topic]);
	     }
	 } else {
	     res.send(404, 'Sorry, we cannot find topic '+ topic);
	 }
     }
});
app.listen(3000);


Publication avec Node.js

#!/usr/bin/env node

var mqtt = require('mqtt');

var argv = process.argv;

for (var i = 2; i <= 5; i++) {
  if(!argv[i]) process.exit(-1);
}

var port = argv[3]
  , host = argv[2]
  , topic = argv[4];
  , message = argv[5];

var c = mqtt.createClient(port, host);

c.on('connect', function() {

  c.on('subscribe', function(packet) {
      console.log("subscribe" + ' ' + packet);
  });

  c.on('close', function() {
      console.log("close");
  });

  c.on('disconnect', function(packet) {
      console.log("disconnect" + ' ' + packet);
  });

  c.on('error', function(e) {
     console.log("error" + ' ' + packet);
  });


  c.publish(topic, message);

  c.end();

});
npm install mqtt
BROKER=10.0.1.3
node publish.js $BROKER 1883 arduino/temp "99999999"

Bridge HTTP REST <--> MQTT

BROKER=test.mosquitto.org
mosquitto_sub -h test.mosquitto.org -t "arduino/#" -v
BROKER=test.mosquitto.org
mosquitto_pub -h test.mosquitto.org -t "arduino/temp" -m "246"
BRIDGE=http://test-mosquitto.herokuapp.com
curl $BRIDGE/arduino/temp
curl -X PUT --data-binary "247" $BRIDGE/arduino/temp
curl -X POST --data-binary "248" $BRIDGE/arduino/temp

...

curl -X DELETE $BRIDGE/arduino/temp

Node-RED with Mosquitto

Node RED with MQTT

TODO with Arduino, Firmata, MQTT


Start the subscription

BROKER=test.mosquitto.org
mosquitto_sub -h $BROKER -d -t smartgrid/uk

Install Terminal Notifier (MacOS X 10.8)

sudo gem install terminal-notifier


node red.js flow_mqttoutput.json

Browse http://127.0.0.1:1880

Import (Ctrl-I) the following flow then deploy. The flow gets UK smartgrid data from http://www.nationalgrid.com/ngrealtime/realtime/systemdata.aspx every minute, then parses it into a message then publishes the message on the smartgrid/uk topic of the test.mosquitto.org public server. The flow is inspired from the Node-RED documentation.

flow_mqttoutput.json

[{"id":"5ff55c38.ef0014","type":"mqtt-broker","broker":"test.mosquitto.org","port":"1883"},{"id":"26aba4b0.704bb4","type":"mqtt out","name":"Mosquitto Server","topic":"smartgrid/uk","broker":"5ff55c38.ef0014","x":462,"y":611,"wires":[]},{"id":"963ae491.d8ac68","type":"debug","name":"Debug MQTT","active":true,"complete":"false","x":472,"y":664,"wires":[]},{"id":"b52ffa92.d6b9c","type":"http request","name":"UK National Grid Open Data","method":"GET","url":"http://www.nationalgrid.com/ngrealtime/realtime/systemdata.aspx","x":171,"y":499,"wires":[["dbe1deb4.42e0d8"]]},{"id":"dbe1deb4.42e0d8","type":"function","name":"Parsing Grid Data","func":"// The received message is stored in 'msg'\n// It will have at least a 'payload' property:\n//   console.log(msg.payload);\n// The 'context' object is available to store state\n// between invocations of the function\n//   context = {};\n\n    // does a simple text extract parse of the http output to provide an\n    // object containing the uk power demand, frequency and time\n    \n    console.log(msg.payload);\n    \n    if (~msg.payload.indexOf('<BR')) {\n      var words = msg.payload.split(\"div\")[1].split(\"<BR\");\n      if (words.length >= 3) {\n        msg.payload = {};\n        msg.payload.demand = parseInt(words[0].split(\":\")[1]);\n        msg.payload.frequency = parseFloat(words[2].split(\":\")[1]);\n        msg.payload.time = words[1].split(\">\")[1];\n        \n        // Create the true/false signal based on the frequency.\n        msg2 = {};\n        msg2.payload = (msg.payload.frequency >= 50) ? true : false;\n        \n        return [msg,msg2];\n      }\n    }\n    return null;","outputs":1,"x":408,"y":502,"wires":[["26aba4b0.704bb4","963ae491.d8ac68","c3092d54.828b5"]]},{"id":"1e20ecfd.c0c393","type":"inject","name":"Every minute","topic":"","payload":"","repeat":"0","crontab":"*/1 0-22 * * *","once":true,"x":147,"y":378,"wires":[["b52ffa92.d6b9c"]]},{"id":"c3092d54.828b5","type":"exec","command":"echo 'New UK Smart Grid data !' | terminal-notifier -sound default","append":"","useSpawn":"","name":"Notify","x":460,"y":723,"wires":[[],[],[]]}]

From another machine, start :

node red.js flow_mqttinput.json

flow_mqttinput.json

[{"id":"f7611a87.089ee8","type":"mqtt-broker","broker":"test.mosquitto.org","port":"1883"},{"id":"80408ea7.7fbf7","type":"mqtt in","name":"UK Smart grid","topic":"smartgrid/uk ","broker":"f7611a87.089ee8","x":920,"y":291,"wires":[["f4f3a91.f0b0c58","ca880431.3577f8","5eb0c39b.a14f3c"]]},{"id":"f4f3a91.f0b0c58","type":"debug","name":"","active":true,"complete":"false","x":1172,"y":296,"wires":[]},{"id":"ca880431.3577f8","type":"function","name":"Console Log","func":"// The received message is stored in 'msg'\n// It will have at least a 'payload' property:\n//   console.log(msg.payload);\n// The 'context' object is available to store state\n// between invocations of the function\n//   context = {};\n\nconsole.log(msg.payload);\nreturn msg;","outputs":1,"x":1191,"y":341,"wires":[[]]},{"id":"5eb0c39b.a14f3c","type":"notify","title":"","name":"","x":1199,"y":400,"wires":[]}]

Examples