Difference between revisions of "Mosquitto"

From air
Jump to navigation Jump to search
 
(43 intermediate revisions by the same user not shown)
Line 27: Line 27:
 
===Lancement du broker===
 
===Lancement du broker===
 
Lancez le broker Mosquitto sur l'hôte (10.0.1.3). Remarque : le broker est parfois lancé via initd (vérifiez avec ps -ax | grep mosquitto )
 
Lancez le broker Mosquitto sur l'hôte (10.0.1.3). Remarque : le broker est parfois lancé via initd (vérifiez avec ps -ax | grep mosquitto )
  +
  +
Sur Linux
 
<pre>
 
<pre>
 
mosquitto
 
mosquitto
  +
</pre>
  +
  +
Sur MacOS
  +
<pre>
  +
/usr/local/sbin/mosquitto
 
</pre>
 
</pre>
   
 
==Publication & Souscription==
 
==Publication & Souscription==
 
===Command lines===
 
===Command lines===
  +
  +
====Publication en CLI====
  +
Publication depuis le Raspberry Pi http://mosquitto.org/man/mosquitto_pub-1.html
  +
<pre>
  +
BROKER=test.mosquitto.org
  +
BROKER=localhost
  +
mosquitto_pub -h $BROKER -d -t arduino/temp -m "100"
  +
</pre>
  +
  +
====Souscription en CLI====
 
Souscription depuis l'hôte http://mosquitto.org/man/mosquitto_sub-1.html
 
Souscription depuis l'hôte http://mosquitto.org/man/mosquitto_sub-1.html
 
<pre>
 
<pre>
BROKER=10.0.1.3
+
BROKER=test.mosquitto.org
  +
BROKER=localhost
 
mosquitto_sub -h $BROKER -d -t arduino/temp
 
mosquitto_sub -h $BROKER -d -t arduino/temp
 
</pre>
 
</pre>
   
  +
Publication depuis le Raspberry Pi http://mosquitto.org/man/mosquitto_pub-1.html
 
  +
Souscription aux topics ''systèmes''
 
<pre>
 
<pre>
  +
mosquitto_sub -h $BROKER -v -t \$SYS/#
BROKER=10.0.1.3
 
mosquitto_pub -h $BROKER -d -t arduino/temp -m "100"
 
 
</pre>
 
</pre>
   
 
===Python===
 
===Python===
  +
====Publication en Python====
Publication depuis le Raspberry Pi http://mosquitto.org/documentation/python/
 
  +
[[Image:Rpi+arduino.jpg|200px|thumb|right|RPI + Arduino running Mosquitto MQTT publisher]]
  +
[[Image:Agrisensor-inside-small.jpg|thumb|200px|right|Agrisensor pushing MQTT messages]]
  +
[[Image:Carparksensor.jpg|thumb|200px|right|Car park sensor pushing MQTT messages]]
  +
Publication depuis le Raspberry Pi (et un Arduino) http://mosquitto.org/documentation/python/
 
<pre>
 
<pre>
 
BROKER=10.0.1.3
 
BROKER=10.0.1.3
Line 70: Line 92:
   
 
broker=sys.argv[1]
 
broker=sys.argv[1]
#broker = "10.0.1.3"
 
 
port = 1883
 
port = 1883
   
Line 169: Line 190:
 
</pre>
 
</pre>
   
  +
====Souscription en Python====
   
 
Souscription depuis l'hôte http://mosquitto.org/documentation/python/
 
Souscription depuis l'hôte http://mosquitto.org/documentation/python/
Line 264: Line 286:
   
 
</pre>
 
</pre>
  +
  +
===[[Node.js]]===
  +
====Souscription avec [[Node.js]]====
  +
<pre>
  +
npm install mqtt
  +
BROKER=10.0.1.3
  +
node subscribe.js $BROKER 1883 arduino/temp
  +
</pre>
  +
  +
<pre>
  +
#!/usr/bin/env node
  +
  +
var mqtt = require('mqtt');
  +
  +
var argv = process.argv;
  +
  +
for (var i = 2; i <= 4; i++) {
  +
if(!argv[i]) process.exit(-1);
  +
}
  +
  +
var port = argv[3]
  +
, host = argv[2]
  +
, topic = argv[4];
  +
  +
var c = mqtt.createClient(port, host);
  +
  +
c.on('connect', function() {
  +
  +
c.on('message', function(topic, message) {
  +
console.log(topic + ' ' + message);
  +
});
  +
  +
c.subscribe(topic);
  +
});
  +
  +
</pre>
  +
  +
  +
  +
Simple Souscripteur avec une interface Web
  +
<pre>
  +
npm install mqtt
  +
npm install express
  +
BROKER=10.0.1.3
  +
node webmqttsub.js $BROKER 1883 arduino/#
  +
  +
node webmqttsub.js $BROKER 1883 bbc/#
  +
</pre>
  +
  +
<pre>
  +
curl http://localhost:3000/*
  +
curl http://localhost:3000/arduino/
  +
curl http://localhost:3000/arduino/temp
  +
</pre>
  +
  +
<pre>
  +
  +
  +
// Simple Gateway between MQTT et HTTP (REST)
  +
  +
// Author: Didier Donsez, 2013
  +
// TODO add PUT, POST (publish), DELETE method
  +
  +
var mqtt = require('mqtt');
  +
  +
var argv = process.argv;
  +
  +
for (var i = 2; i <= 4; i++) {
  +
if(!argv[i]) process.exit(-1);
  +
}
  +
  +
var brokerport = argv[3]
  +
, brokerhost = argv[2]
  +
, subscribedtopicpattern = argv[4];
  +
  +
var topics = new Object();
  +
  +
var c = mqtt.createClient(brokerport, brokerhost);
  +
  +
c.on('connect', function() {
  +
  +
c.on('message', function(topic, message) {
  +
console.log("receive "+ topic + ' ' + message);
  +
topics[topic]=message;
  +
});
  +
  +
console.log("subscribe to " + subscribedtopicpattern);
  +
c.subscribe(subscribedtopicpattern);
  +
});
  +
  +
var express = require('express');
  +
var app = express();
  +
app.use(function(req, res, next){
  +
var path=req.path;
  +
console.log('req.path=' + path);
  +
if(path=="/*"){
  +
res.set('Content-Type', 'application/json');
  +
res.send(JSON.stringify(topics));
  +
} else {
  +
var topic=path.substring(1);
  +
if (topics.hasOwnProperty(topic)) {
  +
if(req.get('Content-Type')=='application/json'){
  +
res.set('Content-Type', 'application/json');
  +
res.send( '{"'+topic+'","'+topics[topic]+'"}' );
  +
} else {
  +
res.set('Content-Type', 'text/plain');
  +
res.send(topics[topic]);
  +
}
  +
} else {
  +
res.send(404, 'Sorry, we cannot find topic '+ topic);
  +
}
  +
}
  +
});
  +
app.listen(3000);
  +
  +
  +
</pre>
  +
  +
====Publication avec [[Node.js]]====
  +
<pre>
  +
#!/usr/bin/env node
  +
  +
var mqtt = require('mqtt');
  +
  +
var argv = process.argv;
  +
  +
for (var i = 2; i <= 5; i++) {
  +
if(!argv[i]) process.exit(-1);
  +
}
  +
  +
var port = argv[3]
  +
, host = argv[2]
  +
, topic = argv[4];
  +
, message = argv[5];
  +
  +
var c = mqtt.createClient(port, host);
  +
  +
c.on('connect', function() {
  +
  +
c.on('subscribe', function(packet) {
  +
console.log("subscribe" + ' ' + packet);
  +
});
  +
  +
c.on('close', function() {
  +
console.log("close");
  +
});
  +
  +
c.on('disconnect', function(packet) {
  +
console.log("disconnect" + ' ' + packet);
  +
});
  +
  +
c.on('error', function(e) {
  +
console.log("error" + ' ' + packet);
  +
});
  +
  +
  +
c.publish(topic, message);
  +
  +
c.end();
  +
  +
});
  +
</pre>
  +
  +
<pre>
  +
npm install mqtt
  +
BROKER=10.0.1.3
  +
node publish.js $BROKER 1883 arduino/temp "99999999"
  +
</pre>
  +
  +
===Bridge HTTP REST <--> MQTT===
  +
  +
<pre>
  +
BROKER=test.mosquitto.org
  +
mosquitto_sub -h test.mosquitto.org -t "arduino/#" -v
  +
</pre>
  +
  +
<pre>
  +
BROKER=test.mosquitto.org
  +
mosquitto_pub -h test.mosquitto.org -t "arduino/temp" -m "246"
  +
</pre>
  +
  +
<pre>
  +
BRIDGE=http://test-mosquitto.herokuapp.com
  +
curl $BRIDGE/arduino/temp
  +
curl -X PUT --data-binary "247" $BRIDGE/arduino/temp
  +
curl -X POST --data-binary "248" $BRIDGE/arduino/temp
  +
  +
...
  +
  +
curl -X DELETE $BRIDGE/arduino/temp
  +
</pre>
  +
  +
===[[Node-RED]] with Mosquitto===
  +
[[Image:Node-RED.png|300px|right|thumb|Node RED with MQTT]]
  +
TODO with [[Arduino]], [[Firmata]], [[MQTT]]
  +
  +
  +
Start the subscription
  +
<pre>
  +
BROKER=test.mosquitto.org
  +
mosquitto_sub -h $BROKER -d -t smartgrid/uk
  +
</pre>
  +
  +
<pre>
  +
BROKER=m2m.eclipse.org
  +
mosquitto_sub -h $BROKER -d -t /smartgrid/uk/demand
  +
</pre>
  +
  +
Install [[Terminal Notifier]] (MacOS X 10.8)
  +
<pre>
  +
sudo gem install terminal-notifier
  +
</pre>
  +
  +
  +
  +
<pre>
  +
node red.js flow_mqttoutput.json
  +
</pre>
  +
  +
Browse http://127.0.0.1:1880
  +
  +
Import (Ctrl-I) the following flow then deploy. The flow gets UK smartgrid data from http://www.nationalgrid.com/ngrealtime/realtime/systemdata.aspx every minute, then parses it into a message then publishes the message on the smartgrid/uk topic of the test.mosquitto.org public server. The flow is inspired from the [http://nodered.org/docs/getting-started/second-flow.html Node-RED documentation].
  +
  +
''flow_mqttoutput.json''
  +
<pre>
  +
[{"id":"1d2be19e.6a1c1e","type":"mqtt-broker","broker":"test.mosquitto.org","port":"1883"},{"id":"5ff55c38.ef0014","type":"mqtt-broker","broker":"m2m.eclipse.org","port":"1883"},{"id":"26aba4b0.704bb4","type":"mqtt out","name":"Mosquitto Server","topic":"smartgrid/uk","broker":"5ff55c38.ef0014","x":541,"y":698,"wires":[]},{"id":"963ae491.d8ac68","type":"debug","name":"Debug MQTT","active":true,"complete":"false","x":612,"y":497,"wires":[]},{"id":"b52ffa92.d6b9c","type":"http request","name":"UK National Grid Open Data","method":"GET","url":"http://www.nationalgrid.com/ngrealtime/realtime/systemdata.aspx","x":171,"y":499,"wires":[["dbe1deb4.42e0d8","778334c9.ca3c3c"]]},{"id":"dbe1deb4.42e0d8","type":"function","name":"Parsing Grid Data","func":"// The received message is stored in 'msg'\n// It will have at least a 'payload' property:\n// console.log(msg.payload);\n// The 'context' object is available to store state\n// between invocations of the function\n// context = {};\n\n // does a simple text extract parse of the http output to provide an\n // object containing the uk power demand, frequency and time\n \n console.log(msg.payload);\n \n if (~msg.payload.indexOf('<BR')) {\n var words = msg.payload.split(\"div\")[1].split(\"<BR\");\n if (words.length >= 3) {\n msg.payload = {};\n msg.payload.demand = parseInt(words[0].split(\":\")[1]);\n msg.payload.frequency = parseFloat(words[2].split(\":\")[1]);\n msg.payload.time = words[1].split(\">\")[1];\n \n // Create the true/false signal based on the frequency.\n msg2 = {};\n msg2.payload = (msg.payload.frequency >= 50) ? true : false;\n \n return [msg,msg2];\n }\n }\n return null;","outputs":1,"x":312,"y":600,"wires":[["26aba4b0.704bb4"]]},{"id":"1e20ecfd.c0c393","type":"inject","name":"Every minute","topic":"","payload":"","repeat":"0","crontab":"*/1 0-22 * * *","once":true,"x":147,"y":378,"wires":[["b52ffa92.d6b9c"]]},{"id":"c3092d54.828b5","type":"exec","command":"terminal-notifier -title 'Electricity Demand' -message ","append":"","useSpawn":"","name":"Notify","x":732,"y":420,"wires":[[],[],[]]},{"id":"62d8096.aa27f78","type":"mqtt out","name":"Eclipse Paho Server","topic":"smartgrid/uk/demand","broker":"5ff55c38.ef0014","x":563,"y":586,"wires":[]},{"id":"778334c9.ca3c3c","type":"function","name":"Parsing Demand Only","func":"// The received message is stored in 'msg'\n// It will have at least a 'payload' property:\n// console.log(msg.payload);\n// The 'context' object is available to store state\n// between invocations of the function\n// context = {};\n\n // does a simple text extract parse of the http output to provide an\n // object containing the uk power demand, frequency and time\n \n console.log(msg.payload);\n \n if (~msg.payload.indexOf('<BR')) {\n var words = msg.payload.split(\"div\")[1].split(\"<BR\");\n if (words.length >= 1) {\n\n msg.payload = parseInt(words[0].split(\":\")[1]);\n \n return msg;\n }\n }\n return null;","outputs":1,"x":398,"y":437,"wires":[["62d8096.aa27f78","a4521ff1.b25f98","963ae491.d8ac68","c3092d54.828b5"]]},{"id":"a4521ff1.b25f98","type":"mqtt out","name":"Mosquitto Server - Demand only","topic":"/smartgrid/uk/demand","broker":"1d2be19e.6a1c1e","x":553,"y":379,"wires":[]}]
  +
</pre>
  +
  +
From another machine, start :
  +
<pre>
  +
node red.js flow_mqttinput.json
  +
</pre>
  +
  +
''flow_mqttinput.json''
  +
<pre>
  +
[{"id":"f7611a87.089ee8","type":"mqtt-broker","broker":"test.mosquitto.org","port":"1883"},{"id":"80408ea7.7fbf7","type":"mqtt in","name":"UK Smart grid","topic":"smartgrid/uk ","broker":"f7611a87.089ee8","x":920,"y":291,"wires":[["f4f3a91.f0b0c58","ca880431.3577f8","5eb0c39b.a14f3c"]]},{"id":"f4f3a91.f0b0c58","type":"debug","name":"","active":true,"complete":"false","x":1172,"y":296,"wires":[]},{"id":"ca880431.3577f8","type":"function","name":"Console Log","func":"// The received message is stored in 'msg'\n// It will have at least a 'payload' property:\n// console.log(msg.payload);\n// The 'context' object is available to store state\n// between invocations of the function\n// context = {};\n\nconsole.log(msg.payload);\nreturn msg;","outputs":1,"x":1191,"y":341,"wires":[[]]},{"id":"5eb0c39b.a14f3c","type":"notify","title":"","name":"","x":1199,"y":400,"wires":[]}]
  +
</pre>
  +
  +
===MQTT Panel===
  +
[[Image:mqttpanel.png|thumb|200px|right|MQTT Panel]]
  +
MQTT Panel est une application ([[Node.js]]) de visualisation de données publiées sur des topics MQTT. Elle utilise [[JQuery.sparkline]].
  +
  +
Telechargez https://github.com/fabaff/mqtt-panel
  +
  +
Term 1:
  +
<pre>
  +
mosquitto
  +
/usr/local/sbin/mosquitto
  +
</pre>
  +
  +
  +
Term 2:
  +
<pre>
  +
node server.js
  +
</pre>
  +
  +
open index.html
  +
  +
Term 3:
  +
<pre>
  +
./test-messages.py
  +
</pre>
  +
  +
  +
===InfluxDB===
  +
Persitance avec [[InfluxDB#Node.js_et_MQTT|InfluxDB]]
  +
  +
==Examples==
  +
* [[Arduino-Based Sensor for Agriculture]]
  +
* [[SmartDollHouse]]

Latest revision as of 14:23, 31 May 2015

http://mosquitto.org/

Serveur MQTT écrit en C et C++

Installation rapide et simple


Premiers pas

Installation

sur Raspberry Pi

wget https://bitbucket.org/pypa/setuptools/raw/bootstrap/ez_setup.py -O - | python
sudo python get-pip.py

sur MacOS X

brew install mosquitto


sur Ubuntu

http://mosquitto.org/2013/01/mosquitto-debian-repository/

Lancement du broker

Lancez le broker Mosquitto sur l'hôte (10.0.1.3). Remarque : le broker est parfois lancé via initd (vérifiez avec ps -ax | grep mosquitto )

Sur Linux

mosquitto

Sur MacOS

/usr/local/sbin/mosquitto

Publication & Souscription

Command lines

Publication en CLI

Publication depuis le Raspberry Pi http://mosquitto.org/man/mosquitto_pub-1.html

BROKER=test.mosquitto.org
BROKER=localhost
mosquitto_pub -h $BROKER -d -t arduino/temp -m "100"

Souscription en CLI

Souscription depuis l'hôte http://mosquitto.org/man/mosquitto_sub-1.html

BROKER=test.mosquitto.org
BROKER=localhost
mosquitto_sub -h $BROKER -d -t arduino/temp


Souscription aux topics systèmes

mosquitto_sub -h $BROKER -v -t \$SYS/#

Python

Publication en Python

RPI + Arduino running Mosquitto MQTT publisher
Agrisensor pushing MQTT messages
Car park sensor pushing MQTT messages

Publication depuis le Raspberry Pi (et un Arduino) http://mosquitto.org/documentation/python/

BROKER=10.0.1.3
python publisharduino.py $BROKER
#!/usr/bin/python
#
# simple app to read string from serial port (arduino board)
# and publish via MQTT
#
# uses the Python MQTT client from the Mosquitto project
# http://mosquitto.org
#
# initially Andy Piper http://andypiper.co.uk
# 2011/09/15

import serial
import mosquitto
import os
import sys

broker=sys.argv[1]
port = 1883

serialdev = '/dev/ttyACM0'



#MQTT callbacks

def on_connect(mosq, obj, rc):
    if rc == 0:
        print("Connected successfully.")
    else:
        raise Exception


def on_disconnect(mosq, obj, rc):
    print("Disconnected successfully.")


def on_publish(mosq, obj, mid):
    print("Message "+str(mid)+" published.")


def on_subscribe(mosq, obj, mid, qos_list):
    print("Subscribe with mid "+str(mid)+" received.")


def on_unsubscribe(mosq, obj, mid):
    print("Unsubscribe with mid "+str(mid)+" received.")


def on_message(mosq, obj, msg):
    print("Message received on topic "+msg.topic+" with QoS "+str(msg.qos)+" and payload "+msg.payload)


#called on exit
#close serial, disconnect MQTT
def cleanup():
    print "Ending and cleaning up"
    ser.close()
    mqttc.disconnect()

try:
    print "Connecting... ", serialdev
    #connect to serial port
    ser = serial.Serial(serialdev, 9600, timeout=20)
except:
    print "Failed to connect serial"
    #unable to continue with no serial input
    raise SystemExit


try:
    ser.flushInput()
    #create an mqtt client
    mypid = os.getpid()
    client_uniq = "arduino_pub_"+str(mypid)
    mqttc = mosquitto.Mosquitto(client_uniq)

    #attach MQTT callbacks
    mqttc.on_connect = on_connect
    mqttc.on_disconnect = on_disconnect
    mqttc.on_publish = on_publish
    mqttc.on_subscribe = on_subscribe
    mqttc.on_unsubscribe = on_unsubscribe
    #mqttc.on_message = on_message

    #connect to broker
    mqttc.connect(broker, port, 60)

    #remain connected to broker
    #read data from serial and publish
    while mqttc.loop() == 0:
        line = ser.readline()
        #split line as it contains V,temp
        list = line.split(",")
        #second list element is temp
        temp = list[0].rstrip()
        print("Temp is "+temp)
        mqttc.publish("arduino/temp", temp)
        pass


# handle list index error (i.e. assume no data received)
except (IndexError):
    print "No data received within serial timeout period"
    cleanup()
# handle app closure
except (KeyboardInterrupt):
    print "Interrupt received"
    cleanup()
except (RuntimeError):
    print "uh-oh! time to die"
    cleanup()


Souscription en Python

Souscription depuis l'hôte http://mosquitto.org/documentation/python/

BROKER=10.0.1.3
python subscribearduino.py $BROKER
#!/usr/bin/python
#
# simple app to subscribe MQTT topic
#
# uses the Python MQTT client from the Mosquitto project
# http://mosquitto.org
#
# initially Andy Piper http://andypiper.co.uk
# 2011/09/15

import os
import mosquitto
import sys

broker=sys.argv[1]
#broker = "10.0.1.3"
port = 1883

serialdev = '/dev/ttyACM0'

#MQTT callbacks

def on_connect(mosq, obj, rc):
    if rc == 0:
        print("Connected successfully.")
    else:
        raise Exception

def on_disconnect(mosq, obj, rc):
    print("Disconnected successfully.")


def on_publish(mosq, obj, mid):
    print("Message "+str(mid)+" published.")


def on_subscribe(mosq, obj, mid, qos_list):
    print("Subscribe with mid "+str(mid)+" received.")


def on_unsubscribe(mosq, obj, mid):
    print("Unsubscribe with mid "+str(mid)+" received.")


def on_message(mosq, obj, msg):
    print("Message received on topic "+msg.topic+" with QoS "+str(msg.qos)+" and payload "+msg.payload)


#called on exit
# disconnect MQTT
def cleanup():
    print "Ending and cleaning up"
    mqttc.disconnect()

try:
    #create an mqtt client
    mypid = os.getpid()
    client_uniq = "arduino_pub_"+str(mypid)
    mqttc = mosquitto.Mosquitto(client_uniq)

    #attach MQTT callbacks
    mqttc.on_connect = on_connect
    mqttc.on_disconnect = on_disconnect
    mqttc.on_publish = on_publish
    mqttc.on_subscribe = on_subscribe
    mqttc.on_unsubscribe = on_unsubscribe
    mqttc.on_message = on_message

    #connect to broker
    mqttc.connect(broker, port, 60)

    mqttc.subscribe("arduino/temp")

    #remain connected to broker
    while mqttc.loop() == 0:
        pass


# handle app closure
except (KeyboardInterrupt):
    print "Interrupt received"
    cleanup()
except (RuntimeError):
    print "uh-oh! time to die"
    cleanup()

Node.js

Souscription avec Node.js

npm install mqtt
BROKER=10.0.1.3
node subscribe.js $BROKER 1883 arduino/temp
#!/usr/bin/env node

var mqtt = require('mqtt');

var argv = process.argv;

for (var i = 2; i <= 4; i++) {
  if(!argv[i]) process.exit(-1);
}

var port = argv[3]
  , host = argv[2]
  , topic = argv[4];

var c = mqtt.createClient(port, host);

c.on('connect', function() {

  c.on('message', function(topic, message) {
    console.log(topic + ' ' + message);
  });

  c.subscribe(topic);
});


Simple Souscripteur avec une interface Web

npm install mqtt
npm install express
BROKER=10.0.1.3
node webmqttsub.js $BROKER 1883 arduino/#

node webmqttsub.js $BROKER 1883 bbc/#
curl http://localhost:3000/*
curl http://localhost:3000/arduino/
curl http://localhost:3000/arduino/temp


// Simple Gateway between MQTT et HTTP (REST)

// Author: Didier Donsez, 2013
// TODO add PUT, POST (publish), DELETE method

var mqtt = require('mqtt');

var argv = process.argv;

for (var i = 2; i <= 4; i++) {
  if(!argv[i]) process.exit(-1);
}

var brokerport = argv[3]
  , brokerhost = argv[2]
  , subscribedtopicpattern = argv[4];

var topics = new Object();

var c = mqtt.createClient(brokerport, brokerhost);

c.on('connect', function() {

  c.on('message', function(topic, message) {
    console.log("receive "+ topic + ' ' + message);
      topics[topic]=message;
  });

  console.log("subscribe to " + subscribedtopicpattern);
  c.subscribe(subscribedtopicpattern);
});

var express = require('express');
var app = express();
app.use(function(req, res, next){
    var path=req.path;
    console.log('req.path=' + path);
    if(path=="/*"){
	res.set('Content-Type', 'application/json');
	res.send(JSON.stringify(topics));
     } else {
         var topic=path.substring(1);
	 if (topics.hasOwnProperty(topic))  {
	     if(req.get('Content-Type')=='application/json'){
		 res.set('Content-Type', 'application/json');
		 res.send( '{"'+topic+'","'+topics[topic]+'"}' );
	     } else { 
		 res.set('Content-Type', 'text/plain');
		 res.send(topics[topic]);
	     }
	 } else {
	     res.send(404, 'Sorry, we cannot find topic '+ topic);
	 }
     }
});
app.listen(3000);


Publication avec Node.js

#!/usr/bin/env node

var mqtt = require('mqtt');

var argv = process.argv;

for (var i = 2; i <= 5; i++) {
  if(!argv[i]) process.exit(-1);
}

var port = argv[3]
  , host = argv[2]
  , topic = argv[4];
  , message = argv[5];

var c = mqtt.createClient(port, host);

c.on('connect', function() {

  c.on('subscribe', function(packet) {
      console.log("subscribe" + ' ' + packet);
  });

  c.on('close', function() {
      console.log("close");
  });

  c.on('disconnect', function(packet) {
      console.log("disconnect" + ' ' + packet);
  });

  c.on('error', function(e) {
     console.log("error" + ' ' + packet);
  });


  c.publish(topic, message);

  c.end();

});
npm install mqtt
BROKER=10.0.1.3
node publish.js $BROKER 1883 arduino/temp "99999999"

Bridge HTTP REST <--> MQTT

BROKER=test.mosquitto.org
mosquitto_sub -h test.mosquitto.org -t "arduino/#" -v
BROKER=test.mosquitto.org
mosquitto_pub -h test.mosquitto.org -t "arduino/temp" -m "246"
BRIDGE=http://test-mosquitto.herokuapp.com
curl $BRIDGE/arduino/temp
curl -X PUT --data-binary "247" $BRIDGE/arduino/temp
curl -X POST --data-binary "248" $BRIDGE/arduino/temp

...

curl -X DELETE $BRIDGE/arduino/temp

Node-RED with Mosquitto

Node RED with MQTT

TODO with Arduino, Firmata, MQTT


Start the subscription

BROKER=test.mosquitto.org
mosquitto_sub -h $BROKER -d -t smartgrid/uk
BROKER=m2m.eclipse.org
mosquitto_sub -h $BROKER -d -t /smartgrid/uk/demand

Install Terminal Notifier (MacOS X 10.8)

sudo gem install terminal-notifier


node red.js flow_mqttoutput.json

Browse http://127.0.0.1:1880

Import (Ctrl-I) the following flow then deploy. The flow gets UK smartgrid data from http://www.nationalgrid.com/ngrealtime/realtime/systemdata.aspx every minute, then parses it into a message then publishes the message on the smartgrid/uk topic of the test.mosquitto.org public server. The flow is inspired from the Node-RED documentation.

flow_mqttoutput.json

[{"id":"1d2be19e.6a1c1e","type":"mqtt-broker","broker":"test.mosquitto.org","port":"1883"},{"id":"5ff55c38.ef0014","type":"mqtt-broker","broker":"m2m.eclipse.org","port":"1883"},{"id":"26aba4b0.704bb4","type":"mqtt out","name":"Mosquitto Server","topic":"smartgrid/uk","broker":"5ff55c38.ef0014","x":541,"y":698,"wires":[]},{"id":"963ae491.d8ac68","type":"debug","name":"Debug MQTT","active":true,"complete":"false","x":612,"y":497,"wires":[]},{"id":"b52ffa92.d6b9c","type":"http request","name":"UK National Grid Open Data","method":"GET","url":"http://www.nationalgrid.com/ngrealtime/realtime/systemdata.aspx","x":171,"y":499,"wires":[["dbe1deb4.42e0d8","778334c9.ca3c3c"]]},{"id":"dbe1deb4.42e0d8","type":"function","name":"Parsing Grid Data","func":"// The received message is stored in 'msg'\n// It will have at least a 'payload' property:\n//   console.log(msg.payload);\n// The 'context' object is available to store state\n// between invocations of the function\n//   context = {};\n\n    // does a simple text extract parse of the http output to provide an\n    // object containing the uk power demand, frequency and time\n    \n    console.log(msg.payload);\n    \n    if (~msg.payload.indexOf('<BR')) {\n      var words = msg.payload.split(\"div\")[1].split(\"<BR\");\n      if (words.length >= 3) {\n        msg.payload = {};\n        msg.payload.demand = parseInt(words[0].split(\":\")[1]);\n        msg.payload.frequency = parseFloat(words[2].split(\":\")[1]);\n        msg.payload.time = words[1].split(\">\")[1];\n        \n        // Create the true/false signal based on the frequency.\n        msg2 = {};\n        msg2.payload = (msg.payload.frequency >= 50) ? true : false;\n        \n        return [msg,msg2];\n      }\n    }\n    return null;","outputs":1,"x":312,"y":600,"wires":[["26aba4b0.704bb4"]]},{"id":"1e20ecfd.c0c393","type":"inject","name":"Every minute","topic":"","payload":"","repeat":"0","crontab":"*/1 0-22 * * *","once":true,"x":147,"y":378,"wires":[["b52ffa92.d6b9c"]]},{"id":"c3092d54.828b5","type":"exec","command":"terminal-notifier -title 'Electricity Demand' -message ","append":"","useSpawn":"","name":"Notify","x":732,"y":420,"wires":[[],[],[]]},{"id":"62d8096.aa27f78","type":"mqtt out","name":"Eclipse Paho Server","topic":"smartgrid/uk/demand","broker":"5ff55c38.ef0014","x":563,"y":586,"wires":[]},{"id":"778334c9.ca3c3c","type":"function","name":"Parsing Demand Only","func":"// The received message is stored in 'msg'\n// It will have at least a 'payload' property:\n//   console.log(msg.payload);\n// The 'context' object is available to store state\n// between invocations of the function\n//   context = {};\n\n    // does a simple text extract parse of the http output to provide an\n    // object containing the uk power demand, frequency and time\n    \n    console.log(msg.payload);\n    \n    if (~msg.payload.indexOf('<BR')) {\n      var words = msg.payload.split(\"div\")[1].split(\"<BR\");\n      if (words.length >= 1) {\n\n        msg.payload = parseInt(words[0].split(\":\")[1]);\n        \n        return msg;\n      }\n    }\n    return null;","outputs":1,"x":398,"y":437,"wires":[["62d8096.aa27f78","a4521ff1.b25f98","963ae491.d8ac68","c3092d54.828b5"]]},{"id":"a4521ff1.b25f98","type":"mqtt out","name":"Mosquitto Server - Demand only","topic":"/smartgrid/uk/demand","broker":"1d2be19e.6a1c1e","x":553,"y":379,"wires":[]}]

From another machine, start :

node red.js flow_mqttinput.json

flow_mqttinput.json

[{"id":"f7611a87.089ee8","type":"mqtt-broker","broker":"test.mosquitto.org","port":"1883"},{"id":"80408ea7.7fbf7","type":"mqtt in","name":"UK Smart grid","topic":"smartgrid/uk ","broker":"f7611a87.089ee8","x":920,"y":291,"wires":[["f4f3a91.f0b0c58","ca880431.3577f8","5eb0c39b.a14f3c"]]},{"id":"f4f3a91.f0b0c58","type":"debug","name":"","active":true,"complete":"false","x":1172,"y":296,"wires":[]},{"id":"ca880431.3577f8","type":"function","name":"Console Log","func":"// The received message is stored in 'msg'\n// It will have at least a 'payload' property:\n//   console.log(msg.payload);\n// The 'context' object is available to store state\n// between invocations of the function\n//   context = {};\n\nconsole.log(msg.payload);\nreturn msg;","outputs":1,"x":1191,"y":341,"wires":[[]]},{"id":"5eb0c39b.a14f3c","type":"notify","title":"","name":"","x":1199,"y":400,"wires":[]}]

MQTT Panel

MQTT Panel

MQTT Panel est une application (Node.js) de visualisation de données publiées sur des topics MQTT. Elle utilise JQuery.sparkline.

Telechargez https://github.com/fabaff/mqtt-panel

Term 1:

mosquitto
/usr/local/sbin/mosquitto


Term 2:

node server.js

open index.html

Term 3:

./test-messages.py


InfluxDB

Persitance avec InfluxDB

Examples