Welcome to MQTT/UDP¶
Package version 0.5
You can get this document in PDF format.
Introduction¶
MQTT/UDP is a simplest possible protocol for IoT, smart home applications and robotics. As you can guess from its name, it is based on MQTT (which is quite simple too), but based on UDP and needs no broker.
Fast track for impatient readers: MQTT/UDP native implementations exist in Java, Python, C, Lua and PLC specific ST language. See corresponding references:
- C Language API Reference
- Java Language API Reference
- Python Language API Reference
- Lua Language API Reference
- CodeSys ST Language API Reference
If you want to test MQTT/UDP on a real hardware, take a look at Sketches part. Ready made software is described in Integration and tools part.
Now some words on MQTT/UDP idea. It is quite simple. Broker is a single point of failure and can be avoided. Actual traffic of smart home installation is not too big and comes over a separated (by firewall) network. There are many listeners that need same data, such as:
- main UI subsystem (such as OpenHAB installation)
- special function controllers (light, climate units)
- per-room or per-function controllers (kitchen ventilation, bath room sensors, room CO2 sensors, etc)
- in-room displays (room and outdoor temperature)
All these points generate some information (local sensors, state) and need some other information.
So, MQTT/UDP is sending data with UDP broadcast. It means that every message is simuloneusly sent to all possible recipients with just one network packet.
Every listener selects packets it wants to listen to and processes them as it wishes.
As a result, minmal MQTT/UDP implementation is extremely simple. Though, there are more options exist which are described later.
Main use cases for MQTT/UDP are covered below.
Data exchange¶
Main and, for most applicartions, the only use case. It is really simple. Sender transmits one PUBLISH packet per message. Packet contains topic name (such as “rooms/dinner/temperature”) and value. Value can be text string or binary data, but most programs will wait for text as packet value.
As there is no broker, parties do not need such things as CONNNECT, SUBSCRIBE or anything else but PUBLISH message.
All the MQTT/UDP programs on the network will receive message and decide if they need it.
Reliable exchange¶
Sender transmits PUBLISH message with non-zero QoS field. Receiver replies with PUBACK packet. If no acknowledge received, sender re-sends message.
Current libraries do not support this scenario out of the box, but it can be implemented by user code. Later versions of libraries will have this case implemented.
Data request¶
There is request-reply scenario possible. Requesting party sends SUBSCRIBE message, one that is responcible for requested topic replies with PUBLISH message.
This scenario can be used for remote configuration use case: configuration daemon keeps set of topics and configuration settings per topic, but does not send them to not to spam network with rarely needed data. Some IoT device turns on and requests topics that contain needed configuration parameters, gets needed settings and continues working.
If configuration settings are changed, config server re-publishes corresponding topics to update device settings.
Please see Java config.Provider
and config.Requester
classes for further info.
There is also a ready simple remote configuration server in tools/config_server
.
Topic request¶
There is reverse scenario possible. Remote configuration program can send SUBSCRIBE message for topic, that is a wildcard for all possible configuration topics for device or all devices. Devices should respond back with PUBLISH messages for all the configurable items.
See Passive remote configuration for more details.
Discovery¶
Party that needs to find who is on the network sends PINGREQ request. All the others reply with PINGRESP messages, and requester builds a map of all active MQTT/UDP hosts on the network.
Possible topologies¶
Here is a list of more or less obvious scenarios for MQTT/UDP
Fault-tolerant sensors¶
Some 2-4 temperature sensors are placed in one room and send updates every 10 seconds or so. Update topic is the same for all the sensors, so that every reader gets mix of all the readings.
Reader should calculate average for last 4-8 readings.
Result: reader gets average temperature in room and failure of one or two sensors is not a problem at all.
Trying to build corresponding configuration with traditional MQTT or, for example, Modbus you will have to:
- Setup broker
- Setup transport (topic names) for all separate sensors
- Setup some smart code which detects loss of updates from sensors
- Still calculate average
- Feed calculated average back if you want to share data with other system nodes
One sensor, many listeners¶
IoT network is a lot of parties operating together. It is usual that many of them need one data source to make a decision. Just as an example, my house control system consists of about 10 processing units of different size. Many of them need to know if it is dark outside, to understand how to control local lighting. Currently I have to distribute light sensor data via two possible points of failure - controller it is connected to and OpenHub software as a broker. I’m going to swithch to MQTT/UDP and feed all the units directly.
Multiple smart switches¶
Some wall switches are controlling the same device. All of them send and read one topic which translates on/off state for the device.
Of course, if one switch changes the state, all others read the state broadcast and note it, so that next time each switch knows, which state it should switch to.
It is possible, of course, that UDP packet from some switch will be lost. So when you switch it, nothing happens. What do you do in such a situation? Turn switch again, of course, until it works!
In this example I wanted to illustrate that even in this situation UDP transport is not really that bad.
Roadside processor¶
Data processors such as triggers, unit converters, calculators of different kinds can be easily implemented with MQTT/UDP as standalone script or a small program that just listens for required incoming data, performs calculations and sends results back to MQTT/UDP.
Other script or IoT/SmartHome component can then use resulting data.
There is an example of such combination in MQTT/UDP repository. Java
program in tools/tray
is setting up desktop tray informer which
displays some MQTT/UDP parameters if user clicks on tray icon.
Companion script lang/python3/examples/trigger.py
is listening to
some topic and if topic value is out of range sends information on
tray/message
topic with a worning. Tray program listens for that
topic and displays a warning to user on reception of such a message.
System debugging¶
Broadcast/multicast nature of MQTT/UDP lets you see what is going on on the “bus” exactly the same way as all the parties see. There is a simple tool exist for that in this repository, but you can use, for example well known WireShark as well.
Reliability¶
Note
There’s QoS support for MQTT/UDP is in development, which makes it as reliable as TCP version.
As MQTT/UDP is based on UDP protocol, which does not guarantee packet delivery, one can suppose that MQTT/UDP is not reliable. Is it?
Not at all.
If we use it for repeated updates, such as sensor data transfer, UDP is actually more reliable, than TCP! Really. If our network drops each second packet, TCP connection will be effectively dead, attempting to resend again and again outdated packets which are not needed anymore. And MQTT/UDP will just loose half of readings, which is not really a problem for 99% of installations. So, TCP helps just if packet loss rate is quite low.
Actualy, simple test was made [1] to ckeck UDP reliability. One host in my house’s local net was generating MQTT/UDP traffic as fast as possible and other checked packets to be sequent, counting speed and error rate. Two IPTV units was started to show HD content and one of the computers was copying some few GBytes to file server. Result was quite surprising: MQTT/UDP error rate grew to… 0.4% with about 50K packets/second, but TV sets stopped showing, being, obviusly, starved.
Anyway, I’m going to add completely reliable mode to MQTT/UDP in near future.
Footnotes
[1] | Corresponding tools are in repository and you can run such test yourself. |
Speed limit¶
There is one more reliability issue exist when we use UDP. Low power microcontrollers are quite slow and their ability to receive lots of UDP packets per second are limited. There is possible packet loss due to low processing power of some slow nodes, not because of network delivery is not reliable.
That’s why protocol implementations include throttling subsystem, which limits amount of packets sent per time interval.
By default it is tuned for maximum of 10 packets per second. Java and Python implementations use millisecond timing and send max of 3 packets with no speed limit, and then add 300 msec pause. C implementation currently uses 1 second time granularity and lets application send up to 10 packets with no limit and then waits for a second.
Actual tests of reception speed capability were done with Wemos D1 Mini unit programmed with MQTT/UDP Lua implementation.
There is set_throttle
/setThrottle
function in all languages but Lua, which lets you set
speed limit according to your hardware capabilities, or disable it at all by setting to 0.
Remote configuration¶
MQTT/UDP can be used as a remote configuration protocol. There are two basic modes of remote configuration: acive and passive. Active remote configuration mode is needed when node to be configured has no local storage at all and has to request all settings at each restart. Passive mode is good for nodes that keep settings from start to start (in local FS or NVRAM) and configuration process is executed for them once or rarely.
Passive remote configuration¶
Passive remote configuration is divided in three steps. First, when user starts configurator GUI application, application sends (and repeats from time to time) a SUBSCRIBE request. Second, any MQTT/UDP nodes that support remote configuration see that request and answer with PUBLISH packet for all items that can be set up. Last, configurator application sends back message with a new item value.
All the configurable items must have special topic name: $SYS/conf/{node-id}/{kind}/{name}
.
For example: $SYS/conf/024F55A20001/info/uptime
.
Node id¶
Node id is any string that will not change at least during one program run or node start. If just one MQTT/UDP program is run on this hardware, you can use net MAC address as id. Otherwise some GUID will do. Note that node-id can be configurable item too!
Item kind¶
There are four predefined kinds:
info
- This kind is a readonly description which can not be configured
but tells other side about us. There are
info/soft
- name of program,info/ver
- version of program,info/uptime
- as is. node
- This kind describes setting for program/device in general and is editable by user. Name and location are obvious items to be in this kind. Any other global (not related to specific function or input/output) setting can be added to this kind.
topic
- This kind is treated as configurable value that is a topic name used to send or receive data. Item name must be descriptive so that user can understand what it is about.
net
- Reserved for network settings.
You can use other kind strings as you wish. They will not be interpreted in any way.
Item name¶
Just a descriptive name of an item, quite short but understandable by human. In a later version there will be added long human readable description, but in any case item name must be reasonable.
Predefined items¶
Protocol implementations expect some item kind/name pairs to mean special thins. Here is a list of know ones.
info/soft
- Readonly, name of software running in this node.
info/ver
- Readonly, version of software.
info/uptime
- Text string, uptime: “255d 02:12:34” or “5y 2m 13d 23:55:01”
node/name
- Name of this MQTT/UDP node. Human readable, not interpreted. “Weather sensors” or “A/C control”
node/location
- Where you can find it: “Kitchen”, “building 12, 2nd floor, room 212”, whatever.
net/mac
- Network MAC address, if not hardwired.
net/ip
- Network IP address, if static.
net/mask
- Netmask.
net/router
- Network default route.
Other network settings can be put to net
kind.
Implementations¶
Configurator GUI tool exists as part of big Java MQTT/UDP viewer program,
see tools/viewer
and build/mqtt_udp_view.*
. Clien implementations
examples are done for most project languages.
- Java
RemoteConfig
class inru.dz.mqtt_udp.config
package. Simple example code is inmain
function of this class.- Python
- Example code is in
lang/python3/examples/mqtt_udp_rconfig.py
. - C
- Simple example code is in
lang/c/examples/rconfig.c
. More advanced embedded atmega microcontroller example application is in a separate repository, see https://github.com/dzavalishin/smart-home-devices/tree/master/mmnet_mqt_udp_server/main - Lua
- Lua example is up and running, but as Lua has no threads,
adding this example to a real program will require quite
a clever tailoring. See
lang/lua/examples/mqtt_rconfig.lua
Active remote configuration¶
Active remote configuration is even simpler than passive one. Starting node just requests in a loop all the items it needs to run. Configuration server replies with settings. Node continues working when all critical data is received. Server can proactively update settings later if some of them are changed.
Implementations¶
Currently this mode is implemented in Java as classes Requester
and Provider
(ru.dz.mqtt_udp.config
package) and server application tools/config_server
.
See also build/config_server.sh
.
Packets and general logic¶
Differences from MQTT¶
MQTT/UDP is based on classic MQTT, but differs a bit. First of all, just a subset of packet types used and, of course, as there is no broker there is no need for CONNECT, CONNACK or DISCONNECT.
Additionally, MQTT/UDP does not send or expect variable header (packet ID field) present in some MQTT packets.
Current implementation also ignores packet flags completely, but it will change later.
Most implementations support Tagged Tail Records addition to the protocol, which extends and replaces variable header in an extensible and flexible way.
Tagged tail records can be used to add any kinds of additional information to the classic MQTT packets, but the most noticeable use of TTRs in MQTT/UDP is digital signature.
Please read detailed description at project Wiki.
Packet types and use¶
- PUBLISH
It is extremely simple to use MQTT/UDP. Basic use case is: one party sends PUBLISH packets, other receives, selecting for itself ones with topics it needs. That is all. No connect, no subscribe, no broker address to configure - we’re broadcasting.
For most applications it is all that you need. But there are 3 other packet types that possibly can be used.
- SUBSCRIBE
- MQTT/UDP uses this as a request to resend some topic value. It is not automated in any way by library code (but will be), so you have to respond to such a packet manually, if you want. It is intended for remote configuration use to let configuration program to request settings values from nodes. This is partially implemented.
- PINGREQ
- Ping request, ask all nodes to reply. This is for remote configuration also, it helps config program to detect all nodes on the network.
Library code automatically replies to
PINGREQ
withPINGRESP
. - PINGRESP
- Reply to ping. You don’t need to send it manually. It is done automatically.
It is supposed to use PUBACK packet later to support reliable delivery.
Topic names¶
One important thing about topics is $SYS topic. MQTT/UDP is a broadcast environment, so each node which wants to use $SYS must distinguish itself by adding MAC address or other id as a subtopic under $SYS: $SYS/{group}/02AF03E6235C. Topic name $SYS/conf/{host-id} is to be used for configurable parameters.
One more special thing I’m going to use is $META topic name suffix. It will possibly be used to request/send topic metadata. For example, if we have kitchen/temperature topic, then kitchen/temperature/$META/name can be used to pass printable topic name, and kitchen/temperature/$META/unit - to send measuring unit name.
API Reference¶
MQTT/UDP is implemented in five languages, but implementations differ. Most complete and fast developing are Java and Python versions. Others follow a bit later. Please see map of languages and features on a project Wiki.
C Language API Reference¶
There is a native MQTT/UDP implementation in C. You can browse sources at https://github.com/dzavalishin/mqtt_udp/tree/master/lang/c repository.
Lets begin with a simplest examples.
Send data:
int rc = mqtt_udp_send_publish( topic, value );
Listen for data:
int main(int argc, char *argv[])
{
...
int rc = mqtt_udp_recv_loop( mqtt_udp_dump_any_pkt );
...
}
int mqtt_udp_dump_any_pkt( struct mqtt_udp_pkt *o )
{
printf( "pkt %x flags %x, id %d", o->ptype, o->pflags, o->pkt_id );
if( o->topic_len > 0 )
printf(" topic '%s'", o->topic );
if( o->value_len > 0 )
printf(" = '%s'", o->value );
printf( "\n");
}
Now lets look at the packet structure definition:
struct mqtt_udp_pkt
{
int from_ip;
int ptype; // upper 4 bits, not shifted
int pflags; // lower 4 bits
size_t total;
int pkt_id;
size_t topic_len;
char * topic;
size_t value_len;
char * value;
char is_signed;
};
- from_ip
- Ip address of message sender. Usually ignored.
- ptype
- Packet type. You will be interested in
`PTYPE_PUBLISH`
most of time. See`mqtt_udp_defs.h`
for more. - pflags
- Flags specific for each type. Ignore. Current version of MQTT/UDP does not use them at all, and in any case everything critical will be processed by library.
- total
- This field is internal for library.
- pkt_id
- Packet id. Leave zero for outgoing packets, and sequential number will be provided. In incoming packets it will be filled if sender supports TTRs (extended packet structure).
- topic and topic_len
- Message topic, NULL terminated. Length of topic in bytes.
- value and value_len
- Message value, also NULL terminated. Length of value in bytes.
- is_signed
- This packet has correct digital signature.
Listen for packets¶
See Example C code.
For listening for data from the network you need just some of fields. First, you have to check that packet is transferring item data:
struct mqtt_udp_pkt p;
if( p->ptype == PTYPE_PUBLISH )
{
// Got data message
}
For the first implementation just ignore all other packets. Frankly, there’s not much for you to ignore.
Now get topic and data from packet you got:
strlcpy( my_value_buf, p->value, sizeof(my_data_buf) );
strlcpy( my_topic_buf, p->topic, sizeof(my_topic_buf) );
And you’re done, now ypou have topic and value received.
Functions¶
Send PUBLISH packet:
int mqtt_udp_send_publish( char *topic, char *data );
Send SUBSCRIBE packet:
int mqtt_udp_send_subscribe( char *topic );
Send PINGREQ packet, ask others to respond:
int mqtt_udp_send_ping_request( void );
Send PINGRESP packet, tell that you’re alive:
int mqtt_udp_send_ping_responce( void );
Start loop for packet reception, providing callback to be called when packet arrives:
typedef int (*process_pkt)( struct mqtt_udp_pkt *pkt );
int mqtt_udp_recv_loop( process_pkt callback );
Dump packet structure. Handy to debug things:
int mqtt_udp_dump_any_pkt( struct mqtt_udp_pkt *o );
Set minimal time between outgoing packets (msec), control maximum send speed:
void mqtt_udp_set_throttle(int msec);
Set callback to handle internal errors such as net IO error:
typedef enum {
MQ_Err_Other,
MQ_Err_Establish, // open socket
MQ_Err_IO, // net io
MQ_Err_Proto, // broken pkt
MQ_Err_Timeout,
} mqtt_udp_err_t;
typedef int err_func_t( mqtt_udp_err_t type, int err_no , char * msg, char * arg );
void mqtt_udp_set_error_handler( err_func_t *handler );
User error handler can:
- Return zero: caller must attempt to ignore error, if possible.
- Return err_no: caller must return with error in turn, if possible.
- Exit (or restart application completely) if error is supposed to be fatal.
Handler can also be used for logging.
Digital signature¶
There is implementation of digital signature implemented. To use it call mqtt_udp_enable_signature
passing encryption key. Same key must be used on all nodes that use signature. Nodes that have no
signature turned on will not sign outgoing messages and will ignore incoming signatures.
Start using signature:
int mqtt_udp_enable_signature( const char *key, size_t key_len );
- key
- Key used to sign outgoing messages and check signature on incomnig ones. Usually just an ASCII string, but can be any binary data.
- key_len
- Number of valid bytes in key.
If signature is turned on and incoming packet is correctly signed, it
will have nonzero is_signed
field.
Service¶
Match topic name against a pattern, processing + and # wildcards, returns 1 on match:
int mqtt_udp_match( const char *wildcard, const char *topic_name );
Remote configuration¶
This part of API lets user to configure program/device by network. There is a detailed description in Passive remote configuration and in the Python part of this book, see Module mqttudp.rconfig. Here is description of C implementation.
Set up remote config subsystem:
#include "runtime_cfg.h"
mqtt_udp_rconfig_item_t rconfig_list[] =
{
{ MQ_CFG_TYPE_STRING, MQ_CFG_KIND_TOPIC, "Switch 1 topic", "topic/sw1", { .s = 0 } },
{ MQ_CFG_TYPE_STRING, MQ_CFG_KIND_TOPIC, "Switch 2 topic", "topic/sw2", { .s = 0 } },
{ MQ_CFG_TYPE_STRING, MQ_CFG_KIND_TOPIC, "Di 0 topic", "topic/di0", { .s = 0 } },
{ MQ_CFG_TYPE_STRING, MQ_CFG_KIND_TOPIC, "Di 1 topic", "topic/di1", { .s = 0 } },
{ MQ_CFG_TYPE_STRING, MQ_CFG_KIND_OTHER, "MAC address", "net/mac", { .s = 0 } },
{ MQ_CFG_TYPE_STRING, MQ_CFG_KIND_INFO, "Switch 4 topic", "info/soft", { .s = DEVICE_NAME } },
{ MQ_CFG_TYPE_STRING, MQ_CFG_KIND_INFO, "Switch 4 topic", "info/ver", { .s = 0 } },
{ MQ_CFG_TYPE_STRING, MQ_CFG_KIND_INFO, "Switch 4 topic", "info/uptime", { .s = 0 } },
{ MQ_CFG_TYPE_STRING, MQ_CFG_KIND_OTHER, "Name", "node/name", { .s = 0 } },
{ MQ_CFG_TYPE_STRING, MQ_CFG_KIND_OTHER, "Location", "node/location", { .s = 0 } },
};
int rconfig_list_size = sizeof(rconfig_list) / sizeof(mqtt_udp_rconfig_item_t);
int rc = mqtt_udp_rconfig_client_init( mac_string, rconfig_rw_callback, rconfig_list, rconfig_list_size );
if( rc ) printf("rconfig init failed, %d\n", rc );
Each array item is one parameter to be set up remotely. The only type supported now is MQ_CFG_TYPE_STRING
. Kinds:
- MQ_CFG_KIND_INFO
- Read-only information about this instance (program or device)
- MQ_CFG_KIND_TOPIC
- Is a configurable topic name, used to publish or receive information.
- MQ_CFG_KIND_OTHER
- Any other parameter type. (R/W and not topic)
Third item field is human-readable item description, currently it is not used, but will be translated to
configuration tool. Fourth item is identification of configurable item, both for local and remote side.
For remote side it is sent as part of configuration message topic and is shown to user as configuration
iem description. Last field is current parameter value. For read-only parameters you can just put any string
pointer here. For R/W string must be malloc’ed (or set with mqtt_udp_rconfig_set_string()
).
To be precise:
/// Definition of configuration parameter
typedef struct
{
mqtt_udp_rconfig_item_type_t type; ///< Item (.value field) data type (string, bool, number, other)
mqtt_udp_rconfig_inetm_kind_t kind; ///< Item kind, not processed by network code
const char * name; ///< Human readable name for this config parameter
const char * topic; ///< MQTT/UDP topic name for this config parameter
mqtt_udp_rconfig_item_value_t value; ///< Current value
mqtt_udp_rconfig_item_value_t opaque; ///< user data item, not processed by MQTT/UDP code at all
} mqtt_udp_rconfig_item_t;
- type
- Data type for
.value
, must be MQ_CFG_TYPE_STRING as for now. - kind
- Kind of item, see above. If kind is MQ_CFG_KIND_TOPIC,
.topic
field must begin with “topic/”. - name
- Human-readable description, unused now.
- value
- Current value. You will be using
.value.s
union field. - opaque
- Not used or interpreted, use as pointer to external storage for this item, internal item index or function pointer to read/set item as you wish.
Now lets look at available functions.
Init subsystem:
int rc = mqtt_udp_rconfig_client_init( mac_string, rconfig_rw_callback, rconfig_list, rconfig_list_size );
- mac_string
- Id string (12 bytes) used as unique id of this configurable instance. MAC address of device is a good candidate.
- rconfig_rw_callback
- Callback called by subsystem to ask you provide current value for item or get new setting
after instance item was remotely set up. Prototype is
int rconfig_rw_callback( int pos, int write )
, wherepos
is item position (index) in array andwrite
is nonzero if callback shall get new setting from instance array and save it somewhare for next run. If zero, callback must read saved instance value and callmqtt_udp_rconfig_set_string()
for it.
Set item value:
int mqtt_udp_rconfig_set_string( int pos, char *string );
- pos
- Item position (index) in array
- string
- New value
Get item value checking kind:
const char * rconfig_get_string_by_item_index( int pos, mqtt_udp_rconfig_inetm_kind_t kind );
- pos
- Item position (index) in array
- kind
- Expected kind for item. If not, global error callback is called and
NULL
is returned. This function is supposed to be used to get configurable topic for outgoing message so usually this parameter isMQ_CFG_KIND_TOPIC
.
Find item by .value
string:
int rconfig_find_by_string_value( const char *search, mqtt_udp_rconfig_inetm_kind_t kind );
- search
- String value to be found.
- kind
- Only lines of this kind will match. This function is supposed to look up
incoming items topics to find if some of configurable topics match. So
this parameter usually is
MQ_CFG_KIND_TOPIC
.
Please study this API use example in sample remote config C application.
UDP IO interface¶
Default implementation uses POSIX API to communicate with network, but for embedded use you can redefine corresponding functions. Here are things to reimplement.
Receive UDP packet. Returning value is number of bytes in packet received or
negative error code. Must return sender’s address in src_ip_addr
:
int mqtt_udp_recv_pkt( int fd, char *buf, size_t buflen, int *src_ip_addr );
Broadcast UDP packet:
int mqtt_udp_send_pkt( int fd, char *data, size_t len );
Send UDP packet (actually not used now, but can be later):
int mqtt_udp_send_pkt_addr( int fd, char *data, size_t len, int ip_addr );
Create UDP socket which can be used to send or broadcast:
int mqtt_udp_socket(void);
Prepare socket for reception on MQTT_PORT:
int mqtt_udp_bind( int fd )
Close UDP socket:
int mqtt_udp_close_fd( int fd )
Java Language API Reference¶
There is a native MQTT/UDP implementation in Java. You can browse sources at https://github.com/dzavalishin/mqtt_udp/tree/master/lang/java repository.
Again, here are simplest examples.
Send data:
PublishPacket pkt = new PublishPacket(topic, value);
pkt.send();
Listen for data:
PacketSourceServer ss = new PacketSourceServer();
ss.setSink( pkt -> {
System.out.println("Got packet: "+pkt);
if (p instanceof PublishPacket) {
PublishPacket pp = (PublishPacket) p;
}
});
Listen for packets¶
See Example Java code.
Here it is:
package ru.dz.mqtt_udp.util;
import java.io.IOException;
import java.net.SocketException;
import ru.dz.mqtt_udp.IPacket;
import ru.dz.mqtt_udp.MqttProtocolException;
import ru.dz.mqtt_udp.SubServer;
public class Sub extends SubServer
{
public static void main(String[] args) throws SocketException, IOException, MqttProtocolException
{
Sub srv = new Sub();
srv.start();
}
@Override
protected void processPacket(IPacket p) {
System.out.println(p);
if (p instanceof PublishPacket) {
PublishPacket pp = (PublishPacket) p;
// now use pp.getTopic() and pp.getValueString() or pp.getValueRaw()
}
}
}
Now what we are doung here. Our class Sub
is based on SubServer
, which is doing all the reception job, and calls processPacket
when it got some data for you. There are many possible types of packets, but for now we need just one, which is
PublishPacket
. Hence we check for type, and convert:
if (p instanceof PublishPacket) {
PublishPacket pp = (PublishPacket) p;
Now we can do what we wish with data we got using pp.getTopic()
and pp.getValueString()
.
Listen code we’ve seen in a first example is slightly different:
PacketSourceServer ss = new PacketSourceServer();
ss.setSink( pkt -> {
System.out.println("Got packet: "+pkt);
if (p instanceof PublishPacket) {
PublishPacket pp = (PublishPacket) p;
}
});
Used here PacketSourceServer
, first of all, starts automatically, and uses Sink
you pass to setSink
to pass packets received to you. The rest of the story is the same.
There is another, more complex listen server class, `PacketSourceMultiServer`
.
Instance of it can provide incoming packets to more than one listener:
PacketSourceMultiServer ms = new PacketSourceMultiServer();
ms.addPacketSink( first_listener );
ms.addPacketSink( next_listener );
ms.start(); // Does not start automatically
Packet classes¶
There are PublishPacket
, SubscribePacket
, PingReqPacket
and PingRespPacket
. Usage is extremely simple:
new PingReqPacket().send();
Service¶
Match topic name against a pattern, processing + and # wildcards, returns true on match:
TopicFilter tf = new TopicFilter("aaa/+/bbb");
boolean matches = tf.test("aaa/ccc/bbb") );
TopicFlter is a Predicate (functional interface implementation).
Control¶
- setMuted( boolean mute )
- PacketSourceServer and PacketSourceMultiServer can be swiched to not to reply to any incoming packet (such as PING) automatically.
- Engine.setThrottle(int msec)
- Set average time in milliseconds between packets sent. Set to 0 to turn throttling off.
Error handling¶
There is a global error handler which is called from library code on
IO, protocol and some other errors. Default handler just prints error message
on stderr
. You can install your own error handler with call to
GlobalErrorHandler.setHandler(BiConsumer<ErrorType, String> h)
.
Digital Signature¶
Note
Development is in progress, not a final implementation.
Java ru.dz.mqtt_udp.Engine
class has preliminary controls for message digital
signature. It is implemented with HMAC MD5 technology.
Set signature secret key to sign and check signature:
void setSignatureKey(String key)
Set requirement for all incoming packets to be signed:
void setSignatureRequired(boolean req)
At this moment other language implementations ignore and do not generate signature at all.
Python Language API Reference¶
As you already guessed, python implementation is native too. You can browse sources at https://github.com/dzavalishin/mqtt_udp/tree/master/lang/python3 repository. There is also lang/python directory, which is for older 2.x python environment, but it is outdated. Sorry, can’t afford to support it. If you need python 2.x, you can backport some python3 code, it should be quite easy.
Let’s begin with examples, as usual.
Send data:
mqttudp.engine.send_publish( "test_topic", "Hello, world!" )
Listen for data:
def recv_packet(pkt):
if pkt.ptype != me.PacketType.Publish:
print( str(pkt.ptype) + ", " + pkt.topic + "\t\t" + str(pkt.addr) )
return
print( pkt.topic+"="+pkt.value+ "\t\t" + str(pkt.addr) )
mqttudp.engine.listen(recv_packet)
Module mqttudp.engine¶
Main package, implements MQTT/UDP protocol.
Paccket class:
class PacketType(Enum):
Unknown = 0
Publish = 0x30
Subscribe = 0x80
PingReq = 0xC0
PingResp = 0xD0
class Packet(object):
def __init__( self, ptype, topic, value, pflags, ttrs ):
self.ptype = ptype
self.pflags = pflags
self.topic = topic
self.value = value
self.ttrs = ttrs
self.addr = None
Functions:
send_ping()
- send PINGREQ packet.send_ping_responce()
- send PINGRESP packet. It is sent automatically, you don’t have to.listen(callback)
- listen for incoming packets.send_publish( topic, payload)
- this what is mostly used.send_subscribe(topic)
- ask other party to send corresponding item again. This is optional.set_muted(mode: bool)
- turn off protocol replies. Use for send-only daemons which do not need to be discovered.
Match topic name against a pattern, processing + and # wildcards, returns True on match:
import mqttudp.engine as me
me.match("aaa/+/bbb", "aaa/ccc/bbb")
Turn of automatic protocol replies:
set_muted(mode: bool)
Set minimum time between packets sent, msec:
set_throttle(msec: int)
Set network address to listen at (choose incoming packets network interface). Address must be equal to address of some network interface:
set_bind_address( "192.168.1.1" )
Set network address to broadcast to (choose outgoing packets network interface). Address must be broadcast address for some of existing network interfaces. Ask local network administrator if unsure:
set_broadcast_address( "192.168.1.255" )
Module mqttudp.config¶
Additional module, sets up configuration file reader. Most command line utilities use it to get settings.
It reads mqtt-udp.ini
file in current directory. Here is an example:
[DEFAULT]
host = smart.
[mqtt-gate] # Settings for MQTT to MQTT/UDP gate
login =
password =
subscribe=#
#host = smart. # See [DEFAULT] above
#blacklist=/topic # Regexp to check if topic is forbidden to relay
#blacklist=/openhab
[openhab-gate]
#port=8080 # There's builtin default
#host = smart. # Settings for MQTT/UDP to OpehHAB gate
#blacklist=/topic # Regexp to check if topic is forbidden to relay
# which sitemap to use for reading data from openhab
#sitemap=default
Usage:
import mqttudp.config as cfg
cfg.setGroup('mqtt-gate') # set ours .ini file [section]
blackList=cfg.get('blacklist') # read setting
Module mqttudp.rconfig¶
Additional module implemening passive remote configuration client (party that is being configured) implementation.
There is a complete demonstration example exist.
To see example working please run mqtt_udp_rconfig.py
first
and mqtt_udp_view after it. In viewer please press middle toolbar button
to open remote configuration window. This window will show all
running MQTT/UDP instances that can be configured. There must be
Python test node
among them. Select its tab. You will see all
the configurable items (from init_items
dictionary) as text
fields. Meanwhile mqtt_udp_rconfig.py
will be sending a random
number with “test” topic. Enter new topic name in a field near
“topic: test” description and press nearest button to send new
setting to program. Notice that now it sends random data with a topic
you just set up.
Now lets look at example code (see examples/mqtt_udp_rconfig.py):
import mqttudp.rconfig as rcfg
init_items = {
## read only
"info/soft" : "Pyton example",
"info/ver" : "0.0",
"info/uptime" : "?",
## common instance info
"node/name" : "Unnamed",
"node/location" : "Nowhere",
# items we want to send out
"topic/test" : "test",
"topic/ai0" : "unnamed_ai0",
"topic/di0" : "unnamed_di0",
"topic/pwm0" : "unnamed_pwm0",
}
def send_thread():
while True:
n = str(random.randint(0, 9))
print( "Send "+n )
rcfg.publish_for( "test", n )
time.sleep(2)
if __name__ == "__main__":
print( "Will demonstrate remote config" )
rcfg.init( init_items )
st = threading.Thread(target=send_thread, args=())
st.start()
mq.listen( rcfg.recv_packet )
This example shows how to use remote configuration subsystem. Dictionary
init_items
contains list of items which can possiblly be configured
remotely. Different elements are used in a different ways.
In general each item in a list is a configurable thing. For example,
"node/location" : "Nowhere"
is item which name is node/location
and initial value is Nowhere
. (It is supposed as a memo for user
to know where an appliance or computer running this code is installed.)
Another example is "topic/ai0" : "unnamed_ai0"
- it is supposed
to be a configurable topic name that device uses to send data from
some analogue input. User must configure topic name and it will be
used by node to send data.
Generally item keys consist of two parts
separated with slash: "topic/pwm0"
, "info/uptime"
or
"node/name"
. Left part is named kind and defines the way item
is processed. Here is a list of known kinds.
info
- Read only description of node/program.
node
- General node information or settings.
topic
- This kind is a configurable value that is a topic name.
net
- Reserved for network settings.
Please see more on kinds in Passive remote configuration.
Lets go on with code. Line rcfg.init( init_items )
sets subsystem
up. Remote config subsystem first loads previous settings from .ini file
(you can set file name with set_ini_file_name(fn)
function) and
fills all items from given init_items
dictionary that was not read
from .ini file. Items with info
kind are taken from init_items
in any way.
After init your program continues working, but must call recv_packet
function of mqttudp.rconfig
for each incoming MQTT/UDP packet for
remote configuration to work.
There are three ways to use configured parameters.
- Just read parameter
- You can just call
get_setting( name )
for needed item to get current configured value. For example,get_setting( "net/mac" )
or, say,lcd_print( get_setting( "node/location" ))
. If your node will be reconfigured in run time, on next call there will be new value. - Send data for configurable topic
- By calling
publish_for( item_of_topic, data )
you will send data to a topic which is configured by item with, guess what,topic
kind. See abovercfg.publish_for( "test", n )
- this line looks up config item namedtopic/test
, and uses its value as a topic to publisn value of variblen
to. - Check incoming packet topic
- On receiving incoming PUBLISH packet, you can use
is_for( topic_item, topic )
function, which checkstopic
parameter to be equal to value of config item named"topic/"+topic_item
, such asis_for( "pwm0", topic )
will returnTrue
iftopic
variable contains string equal to value of config item"topic/pwm0"
.
Only thing left to mention is that you can set callback with call to set_on_config( callback )
and it will be called if remote configuration happens. Config item name and
new value will be passed as parameters.
Module mqttudp.interlock¶
Additional module, implements two classes: Bidirectional
and Timer
.
Bidirectional
is used by bidiractional gateways to prevent loop traffic:
# Init interlock object which will
# forbid reverse direction traffic
# for 5 seconds after message passed
# in one direction.
ilock = mqttudp.interlock.Bidirectional(5)
# Check if we can pass forward
if ilock.broker_to_udp(msg.topic, msg.payload):
mqttudp.engine.send_publish( msg.topic, msg.payload )
print("To UDP: "+msg.topic+"="+str(msg.payload))
else:
print("BLOCKED to UDP: "+msg.topic+"="+str(msg.payload))
# and back
if ilock.udp_to_broker(topic, value):
bclient.publish(topic, value, qos=0)
print( "From UDP: "+topic+"="+value )
else:
print( "BLOCKED from UDP: "+topic+"="+value )
Value is not actually used in current implementation. It is passed for later and smarter versions.
Timer
prevents updates from coming too frequently:
it = mqttudp.interlock.Timer(10)
if it.can_pass( topic, value ):
print("From broker "+topic+" "+value)
mqttudp.engine.send_publish( topic, value )
else:
print("From broker REPEAT BLOCKED "+topic+" "+value)
It checks if value is changed. Such values are permitted to pass through. Unchanged ones will pass only if time (10 seconds in this example) is passed since previous item come through.
Module mqttudp.mqtt_udp_defs¶
This module is not for user code, it is used internally. But you can get library release version from it:
PACKAGE_VERSION_MAJOR = 0
PACKAGE_VERSION_MINOR = 4
Lua Language API Reference¶
You can browse sources at https://github.com/dzavalishin/mqtt_udp/tree/master/lang/lua repository.
Basic examples in Lua.
Send data:
local mq = require "mqttudp"
mq.send_publish( topic, val );
Listen for data:
local mq = require "mqttudp"
local listener = function( ptype, topic, value, ip, port )
print("'"..topic.."' = '"..val.."'".." from: ", ip, port)
end
mq.listen( listener )
Send packets¶
There are functions to send different kinds of packets:
local mq = require "mqttudp"
mq.send_pingreq()
mq.send_pingresp()
mq.send_subscribe( topic )
mq.send_publish( topic, value )
Service¶
Match topic name against a pattern, processing + and # wildcards, returns true
on match:
local mu = require "mqttudp"
local ok = mu.match( wildcard, topic_name )
NodeMCU¶
There is a version of Lua library for NodeMCU microcontroller firmware.
See lang/lua/nodemcu
for examples.
CodeSys ST Language API Reference¶
Note
This implementation ise currently send only.
Sorry, due to PLC limitations, there is no clear API in this code example, just integrated protocol and client code example.
PLC is specific: it runs all its programs in loop and it is assumed that each program is running without blocking and does not spend too much time each loop cycle. There’s usually a watch dog that checks for it. Hence, ST implementation is cycling, sending just one topic per loop cycle.
Actual API is simple:
FUNCTION MQTT_SEND : BOOL
VAR_INPUT
socket : DINT;
topic : STRING;
data : STRING;
sock_adr_out : SOCKADDRESS;
END_VAR
FUNCTION MQ_SEND_REAL : BOOL
VAR_INPUT
socket : DINT;
m_SAddress : SOCKADDRESS;
topic : STRING;
data : REAL;
END_VAR
Here is how it is used in main program:
PROGRAM MQTT_PRG
VAR
STEP : INT := 0;
socket : DINT := SOCKET_INVALID;
wOutPort : INT := 1883;
addr : SOCKADDRESS;
END_VAR
CASE STEP OF
0:
socket := SysSockCreate( SOCKET_AF_INET, SOCKET_DGRAM, SOCKET_IPPROTO_UDP );
addr.sin_family := SOCKET_AF_INET;
addr.sin_port := SysSockHtons( wOutPort );
addr.sin_addr := 16#FFFFFFFF; (* broadcast *)
1: MQ_SEND_REAL( socket, addr, 'PLK0_WarmWater', WarmWaterConsumption );
2: MQ_SEND_REAL( socket, addr, 'PLK0_ColdWater', ColdWaterConsumption );
3: MQ_SEND_REAL( socket, addr, 'PLK0_activePa', activePa_avg );
4: MQ_SEND_REAL( socket, addr, 'PLK0_Va', Va );
ELSE
IF socket <> SOCKET_INVALID THEN
SysSockClose( socket );
END_IF
socket := SOCKET_INVALID;
END_CASE
STEP := STEP + 1;
IF socket = SOCKET_INVALID THEN
STEP := 0;
END_IF
END_PROGRAM
Integration and tools¶
Connectors¶
Project includes two simple connectors. One joins MQTT/UDP with classical MQTT, other connects to OpenHAB.
All the tools read mqtt-udp.ini
file, see Module mqttudp.config for detailed description. You have, at
least, to set host name for both tools.
Classic MQTT¶
It is obvious that MQTT/UDP can be used together with traditional MQTT, so there’s a simple gateway to pass traffic back and forth. It is written in Python and copies everything from one side to another and back. There’s interlock logic introduced that prevents loops by not passing same topic message in reverse direction for some 5 seconds.
To run connector go to lang/python3/examples
directory and start mqtt_bidir_gate.py
program.
There are also unidirectional gates mqtt_broker_to_udp.py
and mqtt_udp_to_broker.py
.
There is an example of service configuration file mqttudpgate.service
for Unix systemctl
service control tools.
This bridge is also can be used to integrate with cloud MQTT servers. Just set up corresponding host/port/login/password
in section [mqtt-gate]
of mqtt-udp.ini
and run mqtt_bidir_gate.py
.
OpenHAB¶
At the moment there are two one way gateways, from MQTT/UDP to OpenHAB and back, and one complete bidirectional gateway.
To run connector go to lang/python3/examples
directory and start mqtt_udp_to_openhab.py
,
openhab_to_udp.py
, or openhab_bidir_gate.py
program.
Minimal configuration required is to set OpenHAB host name in section [openhab-gate]
of mqtt-udp.ini
file.
Gateway uses OpenHAB sitemap to get list of items to read. By default it uses sitemap named default
. If your
OpenHAB setup most populated sitemap is not default one, please set sitemap name in .ini file too.
CCU825 GSM Controller¶
There is a connector for a CCU825 controller in a separate repository.
Programs¶
There are some programs and scripts made to help testing MQTT/UDP library. Some of them are written in C and Java but most exist just in Python version.
C programs¶
- mqtt_udp_clock - sends date and time value to network once a minute. Can be used to set clock in IoT/smarthome peripheral devices. NB! Use SNTP if you need high accuracy.
Java programs¶
- tools/config_server - simple remote configuration server. See corresponding README for details.
Python programs¶
- random_to_udp.py - send random numbers with 2 sec interval, to test reception.
- dump.py - just show all traffic.
- ping.py - send ping and show responces. By using set_muted(mode: bool) function it turns off protocol replies so it will not resond to itself.
- subscribe.py - send subscribe request.
- seq_storm_send.py - send sequential data with no speed limit (use -s to set limit, though).
- seq_storm_check.py - check traffic sent by seq_storm_send.py and calculate speed and error rate.
Traffic viewer¶
A GUI tool to view what’s going on and send data too.
It is supposed that this tool can be used as remote configuration for MQTT/UDP nodes on the network.
To run program go to project root directory and start mqtt_udp_view.cmd
or mqtt_udp_view
depending on your OS. You will
need Java 8 and JavaFX installed for it to run. Please download it from http://java.com or
try to use OpenJDK. (I did not yet.)
Actual user guide is at project Wiki: https://github.com/dzavalishin/mqtt_udp/wiki/MQTT-UDP-Viewer-Help
To run viewer you will need MqttUdpViewer.jar - on any OS java -jar MqttUdpViewer.jar
will start
program. For Windows there is MqttUdpViewer.exe which is a starter for MqttUdpViewer.jar,
so in widows you can start it with MqttUdpViewer
command.
For details please read wiki, but in short, viewer has following parts:
Value view¶
Top list displays current value of all topics that was transmitted since program start. There is also time of last update.
Log view¶
Shows each message passing. You can choose if you will see ping/reply packets or no.
Host list¶
Displays list of network hosts sending MQTT/UDP traffic.
Message Editor¶
Can be used to send messages to network. It is possible to send message just to one host or broadcast them. It is also possible to send SUBSCRIBE messages to request topic data to be sent.
Remote configurator¶
Remote configuration is described in Passive remote configuration in detail. This program implements passive remote configuration mode.
Programs or devices that use MQTT/UDP passive remote configuration feature can be configured by network with this tool. Instances provide list of configurable items and each tab of config window lets you set instance parameters. There is a complete example made for Python, see Module mqttudp.rconfig for description. Implementations of client side also made for C, Java and Lua languages.
System Tray Informer¶
There is a simple program that adds an icon to a system tray. This icon lets you see some data from MQTT/UDP or control one OpenHAB item. Being a Java program it should run on MacOS and Linux, but it was not tested with Linux yet. Illustrations show how it looks in Windows and Mac OS.
Setting up¶
This program reads an mqttudptray.ini
configuration file on start:
topic1=PLK0_activePa
topic2=PLK0_Va
topic1header=Power consumption
topic2header=Mains Voltage
# experimental
#
controltopic=GroupGuestMain
You can define which two topics will be displayed, and what human readable names they have.
The controltopic
setting is for controlling light (or other ON/OFF switch) via
OpenHAB. If defined, Light on and Light off menu items of a tray icon will send ON and OFF
values to corresponding topic.
Current version of MQTT/UDP does not support QoS, and, possiblly on/off message can be lost. That is why this function is marked as experimental.
Running¶
In any OS you will need MqttUdpTray.jar
and mqttudptray.ini
. There is MqttUdpTray.exe
for windows. In other systems (with Java 8 installed) please execute javaw -jar MqttUdpTray.jar
or java -jar MqttUdpTray.jar
command. All the files are in the build
directory.
Addendums¶
Cook Book¶
Even if you think that MQTT/UDP is not for you and can’t be used as primary transport in your project, there are other possibilities to use it together with traditional IoT infrastructure
Displays¶
Send a copy of all the items state to MQTT/UDP and use it to bring data to hardware and software displays. For example, this
project includes an example program (tools/tray
directory, see figure Tray icon on mouse over ) to display some MQTT/UDP items via an icon in a desktop
tray. Being a Java program it should work in Windows, MacOS and Unix.
Sensors and integrations¶
It is not really easy to write a native Java connector for OpenHAB. Write it in Python for MQTT/UDP and translate data from MQTT/UDP to OpenHAB. It is really easy.
By the way, there is quite a lot of sensors drivers in Python for Raspberry and clones.
Don’t like Raspberry? Use Arduino or some ARM CPU unit and C version of MQTT/UDP.
Sketches¶
There are more or less complete demo implementations exist.
Wemos D1 Mini Pro¶
This sketh must also run on any NodeMCU hardware.
See lang/lua/nodemcu for source code and instruction.
Arduino¶
This sketch must run on any Arduino device as long as it has ENC28J60 ethernet module connected.
See lang/arduino for source code and instructions for this one.
Network¶
Note
Basic digital signature subsystem for MQTT/UDP is in development now. Java implementation is already
supporting it, contact us if you want to test it or take part in development.
See ru.dz.mqtt_udp.Engine
class.
Current implementation of MQTT/UDP has no security support. It is supposed that later some kind of packet digital signature will be added. At the moment I suppose that protocol can be used in comletely secure networks or for not really important data.
Actually I personally use MQTT/UDP in typical home network, separated from Internet with NAT but with no separation between smart home and other computers. I do think that would my home network be hacked into, intervention into the smart home system is the lesser of possible evils.
Work in progress¶
There are parts of protocol or additional components design that not finished completely, and are subject of discussion.
QoS Server¶
Nowadays UDP is quite reliable (typical loss rate is < 0.5% even in a busy network), but not 100% reliable. Things are easy in point to point communications. Send a message, wait for acknowledgement, resend if none received in a reasonable time. But MQTT/UDP is broadcast protocol. Do we have to wait for ack from all nodes in a network? Some nodes? Which ones?
It is makes sense that we can build a map of nodes that listen to us by collecting their responces. But we want to keep MQTT/UDP implementation simple and this is not that simple. And not any node needs such high a reliability.
The idea is to add separate server on a network that will build lists of listeners for each topic, collect low-QoS ack packets and sent one high-QoS ack packet to topic publisher(s).
Note that such server is not a single point of failure. First of all, there can be more than one instance of QoS server. Second, even if QoS server fails, nodes continue to send data. Though, each packet will be resent few times, but it is not a communications failure. Last, but not least, sending node can stop resending after few acks with lower QoS. For example, sending node can take for acknowledge one QoS 3 ack message and 2 or 3 QoS 2 ones.
FAQ¶
Q: There’s MQTT-SN, aren’t you repeating it?
- A:
- MQTT-SN still needs broker. And MQTT/UDP still simpler. :)
Q: Why such a set of languages?
- A:
C is for embedded use. I want it to be easy to build smert sensor or wall display/control unit based on MQTT/UDP.
Python is for gateways and scripting. Writing small command line program or daemon in Python is easy. Also, there is a lot of Python drivers for various sensors and displays on Raspberry/Orange/Banana/whatever PI.
Java is for serious programming and GUI apps. Viewer was easy thing to do with JavaFX.
Lua is for NodeMCU and, possibly, other embedded platforms.
CodeSys is evil you can’t escape.
Links¶
GitHUb: https://github.com/dzavalishin/mqtt_udp
Error reports and feature requests: https://github.com/dzavalishin/mqtt_udp/issues
If you use MQTT/UDP, please let me know by adding issue at GitHub. :)
Additional repositories¶
These projects use or extend MQTT/UDP.
- CCU825 connector, Java
- https://github.com/dzavalishin/ccu825modbus
- AtMega128 IO Unit
- https://github.com/dzavalishin/smart-home-devices/tree/master/mmnet_mqt_udp_server