Mosquitto: Difference between revisions
No edit summary |
|||
| (48 intermediate revisions by the same user not shown) | |||
| Line 14: | Line 14: | ||
sudo python get-pip.py |
sudo python get-pip.py |
||
</pre> |
|||
====sur MacOS X==== |
|||
<pre> |
|||
brew install mosquitto |
|||
</pre> |
|||
====sur Ubuntu==== |
|||
http://mosquitto.org/2013/01/mosquitto-debian-repository/ |
|||
===Lancement du broker=== |
|||
Lancez le broker Mosquitto sur l'hôte (10.0.1.3). Remarque : le broker est parfois lancé via initd (vérifiez avec ps -ax | grep mosquitto ) |
|||
Sur Linux |
|||
<pre> |
|||
mosquitto |
|||
</pre> |
</pre> |
||
Sur MacOS |
|||
<pre> |
<pre> |
||
/usr/local/sbin/mosquitto |
|||
</pre> |
</pre> |
||
==Publication & Souscription== |
|||
===Command lines=== |
|||
====Publication en CLI==== |
|||
Publication depuis le Raspberry Pi http://mosquitto.org/man/mosquitto_pub-1.html |
|||
<pre> |
|||
BROKER=test.mosquitto.org |
|||
BROKER=localhost |
|||
mosquitto_pub -h $BROKER -d -t arduino/temp -m "100" |
|||
</pre> |
|||
====Souscription en CLI==== |
|||
Souscription depuis l'hôte http://mosquitto.org/man/mosquitto_sub-1.html |
|||
<pre> |
|||
BROKER=test.mosquitto.org |
|||
BROKER=localhost |
|||
mosquitto_sub -h $BROKER -d -t arduino/temp |
|||
</pre> |
|||
Souscription aux topics ''systèmes'' |
|||
<pre> |
|||
mosquitto_sub -h $BROKER -v -t \$SYS/# |
|||
</pre> |
|||
===Python=== |
|||
====Publication en Python==== |
|||
[[Image:Rpi+arduino.jpg|200px|thumb|right|RPI + Arduino running Mosquitto MQTT publisher]] |
|||
[[Image:Agrisensor-inside-small.jpg|thumb|200px|right|Agrisensor pushing MQTT messages]] |
|||
[[Image:Carparksensor.jpg|thumb|200px|right|Car park sensor pushing MQTT messages]] |
|||
Publication depuis le Raspberry Pi (et un Arduino) http://mosquitto.org/documentation/python/ |
|||
<pre> |
|||
BROKER=10.0.1.3 |
|||
python publisharduino.py $BROKER |
|||
</pre> |
|||
<pre> |
|||
#!/usr/bin/python |
|||
# |
|||
# simple app to read string from serial port (arduino board) |
|||
# and publish via MQTT |
|||
# |
|||
# uses the Python MQTT client from the Mosquitto project |
|||
# http://mosquitto.org |
|||
# |
|||
# initially Andy Piper http://andypiper.co.uk |
|||
# 2011/09/15 |
|||
import serial |
|||
import mosquitto |
|||
import os |
|||
import sys |
|||
broker=sys.argv[1] |
|||
port = 1883 |
|||
serialdev = '/dev/ttyACM0' |
|||
#MQTT callbacks |
|||
def on_connect(mosq, obj, rc): |
|||
if rc == 0: |
|||
print("Connected successfully.") |
|||
else: |
|||
raise Exception |
|||
def on_disconnect(mosq, obj, rc): |
|||
print("Disconnected successfully.") |
|||
def on_publish(mosq, obj, mid): |
|||
print("Message "+str(mid)+" published.") |
|||
def on_subscribe(mosq, obj, mid, qos_list): |
|||
print("Subscribe with mid "+str(mid)+" received.") |
|||
def on_unsubscribe(mosq, obj, mid): |
|||
print("Unsubscribe with mid "+str(mid)+" received.") |
|||
def on_message(mosq, obj, msg): |
|||
print("Message received on topic "+msg.topic+" with QoS "+str(msg.qos)+" and payload "+msg.payload) |
|||
#called on exit |
|||
#close serial, disconnect MQTT |
|||
def cleanup(): |
|||
print "Ending and cleaning up" |
|||
ser.close() |
|||
mqttc.disconnect() |
|||
try: |
|||
print "Connecting... ", serialdev |
|||
#connect to serial port |
|||
ser = serial.Serial(serialdev, 9600, timeout=20) |
|||
except: |
|||
print "Failed to connect serial" |
|||
#unable to continue with no serial input |
|||
raise SystemExit |
|||
try: |
|||
ser.flushInput() |
|||
#create an mqtt client |
|||
mypid = os.getpid() |
|||
client_uniq = "arduino_pub_"+str(mypid) |
|||
mqttc = mosquitto.Mosquitto(client_uniq) |
|||
#attach MQTT callbacks |
|||
mqttc.on_connect = on_connect |
|||
mqttc.on_disconnect = on_disconnect |
|||
mqttc.on_publish = on_publish |
|||
mqttc.on_subscribe = on_subscribe |
|||
mqttc.on_unsubscribe = on_unsubscribe |
|||
#mqttc.on_message = on_message |
|||
#connect to broker |
|||
mqttc.connect(broker, port, 60) |
|||
#remain connected to broker |
|||
#read data from serial and publish |
|||
while mqttc.loop() == 0: |
|||
line = ser.readline() |
|||
#split line as it contains V,temp |
|||
list = line.split(",") |
|||
#second list element is temp |
|||
temp = list[0].rstrip() |
|||
print("Temp is "+temp) |
|||
mqttc.publish("arduino/temp", temp) |
|||
pass |
|||
# handle list index error (i.e. assume no data received) |
|||
except (IndexError): |
|||
print "No data received within serial timeout period" |
|||
cleanup() |
|||
# handle app closure |
|||
except (KeyboardInterrupt): |
|||
print "Interrupt received" |
|||
cleanup() |
|||
except (RuntimeError): |
|||
print "uh-oh! time to die" |
|||
cleanup() |
|||
</pre> |
|||
====Souscription en Python==== |
|||
Souscription depuis l'hôte http://mosquitto.org/documentation/python/ |
|||
<pre> |
|||
BROKER=10.0.1.3 |
|||
python subscribearduino.py $BROKER |
|||
</pre> |
|||
<pre> |
|||
#!/usr/bin/python |
|||
# |
|||
# simple app to subscribe MQTT topic |
|||
# |
|||
# uses the Python MQTT client from the Mosquitto project |
|||
# http://mosquitto.org |
|||
# |
|||
# initially Andy Piper http://andypiper.co.uk |
|||
# 2011/09/15 |
|||
import os |
|||
import mosquitto |
|||
import sys |
|||
broker=sys.argv[1] |
|||
#broker = "10.0.1.3" |
|||
port = 1883 |
|||
serialdev = '/dev/ttyACM0' |
|||
#MQTT callbacks |
|||
def on_connect(mosq, obj, rc): |
|||
if rc == 0: |
|||
print("Connected successfully.") |
|||
else: |
|||
raise Exception |
|||
def on_disconnect(mosq, obj, rc): |
|||
print("Disconnected successfully.") |
|||
def on_publish(mosq, obj, mid): |
|||
print("Message "+str(mid)+" published.") |
|||
def on_subscribe(mosq, obj, mid, qos_list): |
|||
print("Subscribe with mid "+str(mid)+" received.") |
|||
def on_unsubscribe(mosq, obj, mid): |
|||
print("Unsubscribe with mid "+str(mid)+" received.") |
|||
def on_message(mosq, obj, msg): |
|||
print("Message received on topic "+msg.topic+" with QoS "+str(msg.qos)+" and payload "+msg.payload) |
|||
#called on exit |
|||
# disconnect MQTT |
|||
def cleanup(): |
|||
print "Ending and cleaning up" |
|||
mqttc.disconnect() |
|||
try: |
|||
#create an mqtt client |
|||
mypid = os.getpid() |
|||
client_uniq = "arduino_pub_"+str(mypid) |
|||
mqttc = mosquitto.Mosquitto(client_uniq) |
|||
#attach MQTT callbacks |
|||
mqttc.on_connect = on_connect |
|||
mqttc.on_disconnect = on_disconnect |
|||
mqttc.on_publish = on_publish |
|||
mqttc.on_subscribe = on_subscribe |
|||
mqttc.on_unsubscribe = on_unsubscribe |
|||
mqttc.on_message = on_message |
|||
#connect to broker |
|||
mqttc.connect(broker, port, 60) |
|||
mqttc.subscribe("arduino/temp") |
|||
#remain connected to broker |
|||
while mqttc.loop() == 0: |
|||
pass |
|||
# handle app closure |
|||
except (KeyboardInterrupt): |
|||
print "Interrupt received" |
|||
cleanup() |
|||
except (RuntimeError): |
|||
print "uh-oh! time to die" |
|||
cleanup() |
|||
</pre> |
|||
===[[Node.js]]=== |
|||
====Souscription avec [[Node.js]]==== |
|||
<pre> |
|||
npm install mqtt |
|||
BROKER=10.0.1.3 |
|||
node subscribe.js $BROKER 1883 arduino/temp |
|||
</pre> |
|||
<pre> |
|||
#!/usr/bin/env node |
|||
var mqtt = require('mqtt'); |
|||
var argv = process.argv; |
|||
for (var i = 2; i <= 4; i++) { |
|||
if(!argv[i]) process.exit(-1); |
|||
} |
|||
var port = argv[3] |
|||
, host = argv[2] |
|||
, topic = argv[4]; |
|||
var c = mqtt.createClient(port, host); |
|||
c.on('connect', function() { |
|||
c.on('message', function(topic, message) { |
|||
console.log(topic + ' ' + message); |
|||
}); |
|||
c.subscribe(topic); |
|||
}); |
|||
</pre> |
|||
Simple Souscripteur avec une interface Web |
|||
<pre> |
|||
npm install mqtt |
|||
npm install express |
|||
BROKER=10.0.1.3 |
|||
node webmqttsub.js $BROKER 1883 arduino/# |
|||
node webmqttsub.js $BROKER 1883 bbc/# |
|||
</pre> |
|||
<pre> |
|||
curl http://localhost:3000/* |
|||
curl http://localhost:3000/arduino/ |
|||
curl http://localhost:3000/arduino/temp |
|||
</pre> |
|||
<pre> |
|||
// Simple Gateway between MQTT et HTTP (REST) |
|||
// Author: Didier Donsez, 2013 |
|||
// TODO add PUT, POST (publish), DELETE method |
|||
var mqtt = require('mqtt'); |
|||
var argv = process.argv; |
|||
for (var i = 2; i <= 4; i++) { |
|||
if(!argv[i]) process.exit(-1); |
|||
} |
|||
var brokerport = argv[3] |
|||
, brokerhost = argv[2] |
|||
, subscribedtopicpattern = argv[4]; |
|||
var topics = new Object(); |
|||
var c = mqtt.createClient(brokerport, brokerhost); |
|||
c.on('connect', function() { |
|||
c.on('message', function(topic, message) { |
|||
console.log("receive "+ topic + ' ' + message); |
|||
topics[topic]=message; |
|||
}); |
|||
console.log("subscribe to " + subscribedtopicpattern); |
|||
c.subscribe(subscribedtopicpattern); |
|||
}); |
|||
var express = require('express'); |
|||
var app = express(); |
|||
app.use(function(req, res, next){ |
|||
var path=req.path; |
|||
console.log('req.path=' + path); |
|||
if(path=="/*"){ |
|||
res.set('Content-Type', 'application/json'); |
|||
res.send(JSON.stringify(topics)); |
|||
} else { |
|||
var topic=path.substring(1); |
|||
if (topics.hasOwnProperty(topic)) { |
|||
if(req.get('Content-Type')=='application/json'){ |
|||
res.set('Content-Type', 'application/json'); |
|||
res.send( '{"'+topic+'","'+topics[topic]+'"}' ); |
|||
} else { |
|||
res.set('Content-Type', 'text/plain'); |
|||
res.send(topics[topic]); |
|||
} |
|||
} else { |
|||
res.send(404, 'Sorry, we cannot find topic '+ topic); |
|||
} |
|||
} |
|||
}); |
|||
app.listen(3000); |
|||
</pre> |
|||
====Publication avec [[Node.js]]==== |
|||
<pre> |
|||
#!/usr/bin/env node |
|||
var mqtt = require('mqtt'); |
|||
var argv = process.argv; |
|||
for (var i = 2; i <= 5; i++) { |
|||
if(!argv[i]) process.exit(-1); |
|||
} |
|||
var port = argv[3] |
|||
, host = argv[2] |
|||
, topic = argv[4]; |
|||
, message = argv[5]; |
|||
var c = mqtt.createClient(port, host); |
|||
c.on('connect', function() { |
|||
c.on('subscribe', function(packet) { |
|||
console.log("subscribe" + ' ' + packet); |
|||
}); |
|||
c.on('close', function() { |
|||
console.log("close"); |
|||
}); |
|||
c.on('disconnect', function(packet) { |
|||
console.log("disconnect" + ' ' + packet); |
|||
}); |
|||
c.on('error', function(e) { |
|||
console.log("error" + ' ' + packet); |
|||
}); |
|||
c.publish(topic, message); |
|||
c.end(); |
|||
}); |
|||
</pre> |
|||
<pre> |
|||
npm install mqtt |
|||
BROKER=10.0.1.3 |
|||
node publish.js $BROKER 1883 arduino/temp "99999999" |
|||
</pre> |
|||
===Bridge HTTP REST <--> MQTT=== |
|||
<pre> |
|||
BROKER=test.mosquitto.org |
|||
mosquitto_sub -h test.mosquitto.org -t "arduino/#" -v |
|||
</pre> |
|||
<pre> |
|||
BROKER=test.mosquitto.org |
|||
mosquitto_pub -h test.mosquitto.org -t "arduino/temp" -m "246" |
|||
</pre> |
|||
<pre> |
|||
BRIDGE=http://test-mosquitto.herokuapp.com |
|||
curl $BRIDGE/arduino/temp |
|||
curl -X PUT --data-binary "247" $BRIDGE/arduino/temp |
|||
curl -X POST --data-binary "248" $BRIDGE/arduino/temp |
|||
... |
|||
curl -X DELETE $BRIDGE/arduino/temp |
|||
</pre> |
|||
===[[Node-RED]] with Mosquitto=== |
|||
[[Image:Node-RED.png|300px|right|thumb|Node RED with MQTT]] |
|||
TODO with [[Arduino]], [[Firmata]], [[MQTT]] |
|||
Start the subscription |
|||
<pre> |
|||
BROKER=test.mosquitto.org |
|||
mosquitto_sub -h $BROKER -d -t smartgrid/uk |
|||
</pre> |
|||
<pre> |
|||
BROKER=m2m.eclipse.org |
|||
mosquitto_sub -h $BROKER -d -t /smartgrid/uk/demand |
|||
</pre> |
|||
Install [[Terminal Notifier]] (MacOS X 10.8) |
|||
<pre> |
|||
sudo gem install terminal-notifier |
|||
</pre> |
|||
<pre> |
|||
node red.js flow_mqttoutput.json |
|||
</pre> |
|||
Browse http://127.0.0.1:1880 |
|||
Import (Ctrl-I) the following flow then deploy. The flow gets UK smartgrid data from http://www.nationalgrid.com/ngrealtime/realtime/systemdata.aspx every minute, then parses it into a message then publishes the message on the smartgrid/uk topic of the test.mosquitto.org public server. The flow is inspired from the [http://nodered.org/docs/getting-started/second-flow.html Node-RED documentation]. |
|||
''flow_mqttoutput.json'' |
|||
<pre> |
|||
[{"id":"1d2be19e.6a1c1e","type":"mqtt-broker","broker":"test.mosquitto.org","port":"1883"},{"id":"5ff55c38.ef0014","type":"mqtt-broker","broker":"m2m.eclipse.org","port":"1883"},{"id":"26aba4b0.704bb4","type":"mqtt out","name":"Mosquitto Server","topic":"smartgrid/uk","broker":"5ff55c38.ef0014","x":541,"y":698,"wires":[]},{"id":"963ae491.d8ac68","type":"debug","name":"Debug MQTT","active":true,"complete":"false","x":612,"y":497,"wires":[]},{"id":"b52ffa92.d6b9c","type":"http request","name":"UK National Grid Open Data","method":"GET","url":"http://www.nationalgrid.com/ngrealtime/realtime/systemdata.aspx","x":171,"y":499,"wires":[["dbe1deb4.42e0d8","778334c9.ca3c3c"]]},{"id":"dbe1deb4.42e0d8","type":"function","name":"Parsing Grid Data","func":"// The received message is stored in 'msg'\n// It will have at least a 'payload' property:\n// console.log(msg.payload);\n// The 'context' object is available to store state\n// between invocations of the function\n// context = {};\n\n // does a simple text extract parse of the http output to provide an\n // object containing the uk power demand, frequency and time\n \n console.log(msg.payload);\n \n if (~msg.payload.indexOf('<BR')) {\n var words = msg.payload.split(\"div\")[1].split(\"<BR\");\n if (words.length >= 3) {\n msg.payload = {};\n msg.payload.demand = parseInt(words[0].split(\":\")[1]);\n msg.payload.frequency = parseFloat(words[2].split(\":\")[1]);\n msg.payload.time = words[1].split(\">\")[1];\n \n // Create the true/false signal based on the frequency.\n msg2 = {};\n msg2.payload = (msg.payload.frequency >= 50) ? true : false;\n \n return [msg,msg2];\n }\n }\n return null;","outputs":1,"x":312,"y":600,"wires":[["26aba4b0.704bb4"]]},{"id":"1e20ecfd.c0c393","type":"inject","name":"Every minute","topic":"","payload":"","repeat":"0","crontab":"*/1 0-22 * * *","once":true,"x":147,"y":378,"wires":[["b52ffa92.d6b9c"]]},{"id":"c3092d54.828b5","type":"exec","command":"terminal-notifier -title 'Electricity Demand' -message ","append":"","useSpawn":"","name":"Notify","x":732,"y":420,"wires":[[],[],[]]},{"id":"62d8096.aa27f78","type":"mqtt out","name":"Eclipse Paho Server","topic":"smartgrid/uk/demand","broker":"5ff55c38.ef0014","x":563,"y":586,"wires":[]},{"id":"778334c9.ca3c3c","type":"function","name":"Parsing Demand Only","func":"// The received message is stored in 'msg'\n// It will have at least a 'payload' property:\n// console.log(msg.payload);\n// The 'context' object is available to store state\n// between invocations of the function\n// context = {};\n\n // does a simple text extract parse of the http output to provide an\n // object containing the uk power demand, frequency and time\n \n console.log(msg.payload);\n \n if (~msg.payload.indexOf('<BR')) {\n var words = msg.payload.split(\"div\")[1].split(\"<BR\");\n if (words.length >= 1) {\n\n msg.payload = parseInt(words[0].split(\":\")[1]);\n \n return msg;\n }\n }\n return null;","outputs":1,"x":398,"y":437,"wires":[["62d8096.aa27f78","a4521ff1.b25f98","963ae491.d8ac68","c3092d54.828b5"]]},{"id":"a4521ff1.b25f98","type":"mqtt out","name":"Mosquitto Server - Demand only","topic":"/smartgrid/uk/demand","broker":"1d2be19e.6a1c1e","x":553,"y":379,"wires":[]}] |
|||
</pre> |
|||
From another machine, start : |
|||
<pre> |
|||
node red.js flow_mqttinput.json |
|||
</pre> |
|||
''flow_mqttinput.json'' |
|||
<pre> |
|||
[{"id":"f7611a87.089ee8","type":"mqtt-broker","broker":"test.mosquitto.org","port":"1883"},{"id":"80408ea7.7fbf7","type":"mqtt in","name":"UK Smart grid","topic":"smartgrid/uk ","broker":"f7611a87.089ee8","x":920,"y":291,"wires":[["f4f3a91.f0b0c58","ca880431.3577f8","5eb0c39b.a14f3c"]]},{"id":"f4f3a91.f0b0c58","type":"debug","name":"","active":true,"complete":"false","x":1172,"y":296,"wires":[]},{"id":"ca880431.3577f8","type":"function","name":"Console Log","func":"// The received message is stored in 'msg'\n// It will have at least a 'payload' property:\n// console.log(msg.payload);\n// The 'context' object is available to store state\n// between invocations of the function\n// context = {};\n\nconsole.log(msg.payload);\nreturn msg;","outputs":1,"x":1191,"y":341,"wires":[[]]},{"id":"5eb0c39b.a14f3c","type":"notify","title":"","name":"","x":1199,"y":400,"wires":[]}] |
|||
</pre> |
|||
===MQTT Panel=== |
|||
[[Image:mqttpanel.png|thumb|200px|right|MQTT Panel]] |
|||
MQTT Panel est une application ([[Node.js]]) de visualisation de données publiées sur des topics MQTT. Elle utilise [[JQuery.sparkline]]. |
|||
Telechargez https://github.com/fabaff/mqtt-panel |
|||
Term 1: |
|||
<pre> |
|||
mosquitto |
|||
/usr/local/sbin/mosquitto |
|||
</pre> |
|||
Term 2: |
|||
<pre> |
|||
node server.js |
|||
</pre> |
|||
open index.html |
|||
Term 3: |
|||
<pre> |
|||
./test-messages.py |
|||
</pre> |
|||
===InfluxDB=== |
|||
Persitance avec [[InfluxDB#Node.js_et_MQTT|InfluxDB]] |
|||
==Examples== |
|||
* [[Arduino-Based Sensor for Agriculture]] |
|||
* [[SmartDollHouse]] |
|||
Latest revision as of 12:23, 31 May 2015
Serveur MQTT écrit en C et C++
Installation rapide et simple
Premiers pas
Installation
sur Raspberry Pi
wget https://bitbucket.org/pypa/setuptools/raw/bootstrap/ez_setup.py -O - | python sudo python get-pip.py
sur MacOS X
brew install mosquitto
sur Ubuntu
http://mosquitto.org/2013/01/mosquitto-debian-repository/
Lancement du broker
Lancez le broker Mosquitto sur l'hôte (10.0.1.3). Remarque : le broker est parfois lancé via initd (vérifiez avec ps -ax | grep mosquitto )
Sur Linux
mosquitto
Sur MacOS
/usr/local/sbin/mosquitto
Publication & Souscription
Command lines
Publication en CLI
Publication depuis le Raspberry Pi http://mosquitto.org/man/mosquitto_pub-1.html
BROKER=test.mosquitto.org BROKER=localhost mosquitto_pub -h $BROKER -d -t arduino/temp -m "100"
Souscription en CLI
Souscription depuis l'hôte http://mosquitto.org/man/mosquitto_sub-1.html
BROKER=test.mosquitto.org BROKER=localhost mosquitto_sub -h $BROKER -d -t arduino/temp
Souscription aux topics systèmes
mosquitto_sub -h $BROKER -v -t \$SYS/#
Python
Publication en Python
Publication depuis le Raspberry Pi (et un Arduino) http://mosquitto.org/documentation/python/
BROKER=10.0.1.3 python publisharduino.py $BROKER
#!/usr/bin/python
#
# simple app to read string from serial port (arduino board)
# and publish via MQTT
#
# uses the Python MQTT client from the Mosquitto project
# http://mosquitto.org
#
# initially Andy Piper http://andypiper.co.uk
# 2011/09/15
import serial
import mosquitto
import os
import sys
broker=sys.argv[1]
port = 1883
serialdev = '/dev/ttyACM0'
#MQTT callbacks
def on_connect(mosq, obj, rc):
if rc == 0:
print("Connected successfully.")
else:
raise Exception
def on_disconnect(mosq, obj, rc):
print("Disconnected successfully.")
def on_publish(mosq, obj, mid):
print("Message "+str(mid)+" published.")
def on_subscribe(mosq, obj, mid, qos_list):
print("Subscribe with mid "+str(mid)+" received.")
def on_unsubscribe(mosq, obj, mid):
print("Unsubscribe with mid "+str(mid)+" received.")
def on_message(mosq, obj, msg):
print("Message received on topic "+msg.topic+" with QoS "+str(msg.qos)+" and payload "+msg.payload)
#called on exit
#close serial, disconnect MQTT
def cleanup():
print "Ending and cleaning up"
ser.close()
mqttc.disconnect()
try:
print "Connecting... ", serialdev
#connect to serial port
ser = serial.Serial(serialdev, 9600, timeout=20)
except:
print "Failed to connect serial"
#unable to continue with no serial input
raise SystemExit
try:
ser.flushInput()
#create an mqtt client
mypid = os.getpid()
client_uniq = "arduino_pub_"+str(mypid)
mqttc = mosquitto.Mosquitto(client_uniq)
#attach MQTT callbacks
mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect
mqttc.on_publish = on_publish
mqttc.on_subscribe = on_subscribe
mqttc.on_unsubscribe = on_unsubscribe
#mqttc.on_message = on_message
#connect to broker
mqttc.connect(broker, port, 60)
#remain connected to broker
#read data from serial and publish
while mqttc.loop() == 0:
line = ser.readline()
#split line as it contains V,temp
list = line.split(",")
#second list element is temp
temp = list[0].rstrip()
print("Temp is "+temp)
mqttc.publish("arduino/temp", temp)
pass
# handle list index error (i.e. assume no data received)
except (IndexError):
print "No data received within serial timeout period"
cleanup()
# handle app closure
except (KeyboardInterrupt):
print "Interrupt received"
cleanup()
except (RuntimeError):
print "uh-oh! time to die"
cleanup()
Souscription en Python
Souscription depuis l'hôte http://mosquitto.org/documentation/python/
BROKER=10.0.1.3 python subscribearduino.py $BROKER
#!/usr/bin/python
#
# simple app to subscribe MQTT topic
#
# uses the Python MQTT client from the Mosquitto project
# http://mosquitto.org
#
# initially Andy Piper http://andypiper.co.uk
# 2011/09/15
import os
import mosquitto
import sys
broker=sys.argv[1]
#broker = "10.0.1.3"
port = 1883
serialdev = '/dev/ttyACM0'
#MQTT callbacks
def on_connect(mosq, obj, rc):
if rc == 0:
print("Connected successfully.")
else:
raise Exception
def on_disconnect(mosq, obj, rc):
print("Disconnected successfully.")
def on_publish(mosq, obj, mid):
print("Message "+str(mid)+" published.")
def on_subscribe(mosq, obj, mid, qos_list):
print("Subscribe with mid "+str(mid)+" received.")
def on_unsubscribe(mosq, obj, mid):
print("Unsubscribe with mid "+str(mid)+" received.")
def on_message(mosq, obj, msg):
print("Message received on topic "+msg.topic+" with QoS "+str(msg.qos)+" and payload "+msg.payload)
#called on exit
# disconnect MQTT
def cleanup():
print "Ending and cleaning up"
mqttc.disconnect()
try:
#create an mqtt client
mypid = os.getpid()
client_uniq = "arduino_pub_"+str(mypid)
mqttc = mosquitto.Mosquitto(client_uniq)
#attach MQTT callbacks
mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect
mqttc.on_publish = on_publish
mqttc.on_subscribe = on_subscribe
mqttc.on_unsubscribe = on_unsubscribe
mqttc.on_message = on_message
#connect to broker
mqttc.connect(broker, port, 60)
mqttc.subscribe("arduino/temp")
#remain connected to broker
while mqttc.loop() == 0:
pass
# handle app closure
except (KeyboardInterrupt):
print "Interrupt received"
cleanup()
except (RuntimeError):
print "uh-oh! time to die"
cleanup()
Node.js
Souscription avec Node.js
npm install mqtt BROKER=10.0.1.3 node subscribe.js $BROKER 1883 arduino/temp
#!/usr/bin/env node
var mqtt = require('mqtt');
var argv = process.argv;
for (var i = 2; i <= 4; i++) {
if(!argv[i]) process.exit(-1);
}
var port = argv[3]
, host = argv[2]
, topic = argv[4];
var c = mqtt.createClient(port, host);
c.on('connect', function() {
c.on('message', function(topic, message) {
console.log(topic + ' ' + message);
});
c.subscribe(topic);
});
Simple Souscripteur avec une interface Web
npm install mqtt npm install express BROKER=10.0.1.3 node webmqttsub.js $BROKER 1883 arduino/# node webmqttsub.js $BROKER 1883 bbc/#
curl http://localhost:3000/* curl http://localhost:3000/arduino/ curl http://localhost:3000/arduino/temp
// Simple Gateway between MQTT et HTTP (REST)
// Author: Didier Donsez, 2013
// TODO add PUT, POST (publish), DELETE method
var mqtt = require('mqtt');
var argv = process.argv;
for (var i = 2; i <= 4; i++) {
if(!argv[i]) process.exit(-1);
}
var brokerport = argv[3]
, brokerhost = argv[2]
, subscribedtopicpattern = argv[4];
var topics = new Object();
var c = mqtt.createClient(brokerport, brokerhost);
c.on('connect', function() {
c.on('message', function(topic, message) {
console.log("receive "+ topic + ' ' + message);
topics[topic]=message;
});
console.log("subscribe to " + subscribedtopicpattern);
c.subscribe(subscribedtopicpattern);
});
var express = require('express');
var app = express();
app.use(function(req, res, next){
var path=req.path;
console.log('req.path=' + path);
if(path=="/*"){
res.set('Content-Type', 'application/json');
res.send(JSON.stringify(topics));
} else {
var topic=path.substring(1);
if (topics.hasOwnProperty(topic)) {
if(req.get('Content-Type')=='application/json'){
res.set('Content-Type', 'application/json');
res.send( '{"'+topic+'","'+topics[topic]+'"}' );
} else {
res.set('Content-Type', 'text/plain');
res.send(topics[topic]);
}
} else {
res.send(404, 'Sorry, we cannot find topic '+ topic);
}
}
});
app.listen(3000);
Publication avec Node.js
#!/usr/bin/env node
var mqtt = require('mqtt');
var argv = process.argv;
for (var i = 2; i <= 5; i++) {
if(!argv[i]) process.exit(-1);
}
var port = argv[3]
, host = argv[2]
, topic = argv[4];
, message = argv[5];
var c = mqtt.createClient(port, host);
c.on('connect', function() {
c.on('subscribe', function(packet) {
console.log("subscribe" + ' ' + packet);
});
c.on('close', function() {
console.log("close");
});
c.on('disconnect', function(packet) {
console.log("disconnect" + ' ' + packet);
});
c.on('error', function(e) {
console.log("error" + ' ' + packet);
});
c.publish(topic, message);
c.end();
});
npm install mqtt BROKER=10.0.1.3 node publish.js $BROKER 1883 arduino/temp "99999999"
Bridge HTTP REST <--> MQTT
BROKER=test.mosquitto.org mosquitto_sub -h test.mosquitto.org -t "arduino/#" -v
BROKER=test.mosquitto.org mosquitto_pub -h test.mosquitto.org -t "arduino/temp" -m "246"
BRIDGE=http://test-mosquitto.herokuapp.com curl $BRIDGE/arduino/temp curl -X PUT --data-binary "247" $BRIDGE/arduino/temp curl -X POST --data-binary "248" $BRIDGE/arduino/temp ... curl -X DELETE $BRIDGE/arduino/temp
Node-RED with Mosquitto
TODO with Arduino, Firmata, MQTT
Start the subscription
BROKER=test.mosquitto.org mosquitto_sub -h $BROKER -d -t smartgrid/uk
BROKER=m2m.eclipse.org mosquitto_sub -h $BROKER -d -t /smartgrid/uk/demand
Install Terminal Notifier (MacOS X 10.8)
sudo gem install terminal-notifier
node red.js flow_mqttoutput.json
Browse http://127.0.0.1:1880
Import (Ctrl-I) the following flow then deploy. The flow gets UK smartgrid data from http://www.nationalgrid.com/ngrealtime/realtime/systemdata.aspx every minute, then parses it into a message then publishes the message on the smartgrid/uk topic of the test.mosquitto.org public server. The flow is inspired from the Node-RED documentation.
flow_mqttoutput.json
[{"id":"1d2be19e.6a1c1e","type":"mqtt-broker","broker":"test.mosquitto.org","port":"1883"},{"id":"5ff55c38.ef0014","type":"mqtt-broker","broker":"m2m.eclipse.org","port":"1883"},{"id":"26aba4b0.704bb4","type":"mqtt out","name":"Mosquitto Server","topic":"smartgrid/uk","broker":"5ff55c38.ef0014","x":541,"y":698,"wires":[]},{"id":"963ae491.d8ac68","type":"debug","name":"Debug MQTT","active":true,"complete":"false","x":612,"y":497,"wires":[]},{"id":"b52ffa92.d6b9c","type":"http request","name":"UK National Grid Open Data","method":"GET","url":"http://www.nationalgrid.com/ngrealtime/realtime/systemdata.aspx","x":171,"y":499,"wires":[["dbe1deb4.42e0d8","778334c9.ca3c3c"]]},{"id":"dbe1deb4.42e0d8","type":"function","name":"Parsing Grid Data","func":"// The received message is stored in 'msg'\n// It will have at least a 'payload' property:\n// console.log(msg.payload);\n// The 'context' object is available to store state\n// between invocations of the function\n// context = {};\n\n // does a simple text extract parse of the http output to provide an\n // object containing the uk power demand, frequency and time\n \n console.log(msg.payload);\n \n if (~msg.payload.indexOf('<BR')) {\n var words = msg.payload.split(\"div\")[1].split(\"<BR\");\n if (words.length >= 3) {\n msg.payload = {};\n msg.payload.demand = parseInt(words[0].split(\":\")[1]);\n msg.payload.frequency = parseFloat(words[2].split(\":\")[1]);\n msg.payload.time = words[1].split(\">\")[1];\n \n // Create the true/false signal based on the frequency.\n msg2 = {};\n msg2.payload = (msg.payload.frequency >= 50) ? true : false;\n \n return [msg,msg2];\n }\n }\n return null;","outputs":1,"x":312,"y":600,"wires":[["26aba4b0.704bb4"]]},{"id":"1e20ecfd.c0c393","type":"inject","name":"Every minute","topic":"","payload":"","repeat":"0","crontab":"*/1 0-22 * * *","once":true,"x":147,"y":378,"wires":[["b52ffa92.d6b9c"]]},{"id":"c3092d54.828b5","type":"exec","command":"terminal-notifier -title 'Electricity Demand' -message ","append":"","useSpawn":"","name":"Notify","x":732,"y":420,"wires":[[],[],[]]},{"id":"62d8096.aa27f78","type":"mqtt out","name":"Eclipse Paho Server","topic":"smartgrid/uk/demand","broker":"5ff55c38.ef0014","x":563,"y":586,"wires":[]},{"id":"778334c9.ca3c3c","type":"function","name":"Parsing Demand Only","func":"// The received message is stored in 'msg'\n// It will have at least a 'payload' property:\n// console.log(msg.payload);\n// The 'context' object is available to store state\n// between invocations of the function\n// context = {};\n\n // does a simple text extract parse of the http output to provide an\n // object containing the uk power demand, frequency and time\n \n console.log(msg.payload);\n \n if (~msg.payload.indexOf('<BR')) {\n var words = msg.payload.split(\"div\")[1].split(\"<BR\");\n if (words.length >= 1) {\n\n msg.payload = parseInt(words[0].split(\":\")[1]);\n \n return msg;\n }\n }\n return null;","outputs":1,"x":398,"y":437,"wires":[["62d8096.aa27f78","a4521ff1.b25f98","963ae491.d8ac68","c3092d54.828b5"]]},{"id":"a4521ff1.b25f98","type":"mqtt out","name":"Mosquitto Server - Demand only","topic":"/smartgrid/uk/demand","broker":"1d2be19e.6a1c1e","x":553,"y":379,"wires":[]}]
From another machine, start :
node red.js flow_mqttinput.json
flow_mqttinput.json
[{"id":"f7611a87.089ee8","type":"mqtt-broker","broker":"test.mosquitto.org","port":"1883"},{"id":"80408ea7.7fbf7","type":"mqtt in","name":"UK Smart grid","topic":"smartgrid/uk ","broker":"f7611a87.089ee8","x":920,"y":291,"wires":[["f4f3a91.f0b0c58","ca880431.3577f8","5eb0c39b.a14f3c"]]},{"id":"f4f3a91.f0b0c58","type":"debug","name":"","active":true,"complete":"false","x":1172,"y":296,"wires":[]},{"id":"ca880431.3577f8","type":"function","name":"Console Log","func":"// The received message is stored in 'msg'\n// It will have at least a 'payload' property:\n// console.log(msg.payload);\n// The 'context' object is available to store state\n// between invocations of the function\n// context = {};\n\nconsole.log(msg.payload);\nreturn msg;","outputs":1,"x":1191,"y":341,"wires":[[]]},{"id":"5eb0c39b.a14f3c","type":"notify","title":"","name":"","x":1199,"y":400,"wires":[]}]
MQTT Panel
MQTT Panel est une application (Node.js) de visualisation de données publiées sur des topics MQTT. Elle utilise JQuery.sparkline.
Telechargez https://github.com/fabaff/mqtt-panel
Term 1:
mosquitto /usr/local/sbin/mosquitto
Term 2:
node server.js
open index.html
Term 3:
./test-messages.py
InfluxDB
Persitance avec InfluxDB