MQTT – eine Einführung

Wer Daten von A nach B übermitteln möchte, hat unzählige Möglichkeiten dies zu tun. Je nach Anforderung und Anwendungsfall sieht die ideale Möglichkeit der Datenübermittlung anders aus. Für die Kommunikation zwischen Sensoren und im IoT-Bereich hat das Protokoll MQTT den Standard gesetzt. Immer dort wo verteilte Systeme miteinander kommunizieren müssen, eignet sich MQTT. MQTT steht für Message Queue Telemetry Transport und ist architektonisch relativ einfach aufgebaut. MQTT ist Nachrichten-orientiert und zentralisiert.

Die MQTT-Architektur

Für MQTT wird ein Broker benötigt, die zentrale Instanz an welche alle Clients ihre Nachrichten senden und sie von diesem Broker empfangen. Im Umkehrschluss bedeutet dies, das sich die Clients untereinander nicht kennen. Die gesamte Kommunikation läuft über dem Broker ab. Die Nachrichten werden nicht einfach wahllos an den Broker geschickt, sondern an ein sogenanntes Topic, ein Thema z.B. bad/lichtsensor1. Diese Topics sind hierarchisch aufgebaut und werden wie ein Pfad, mit einem Slash als Trennzeichen, definiert. Die Topics können von anderen Clients abonniert werden, so das diese bei einer neuen Nachricht betreffend des Topics informiert werden. Durch den hierarchischen Aufbau ist es möglich alle Topics einer bestimmten Hierarchieebene zu abonnieren. So würde der Topic:

bad/#

alle Untertopics von bad abonnieren. Eine weitere Möglichkeit ist es an einer bestimmten Stelle im Pfad das Sonderzeichen Plus zu benutzen:

+/lautsprecher/

In diesem Beispiel würden alle Lautsprecher aller Zimmer abonniert, so wären bad/lautsprecher als auch wohnzimmer/lautsprecher abgedeckt.

Nach der Verbindung mit einer CONNECT-Nachricht antwortet der Broker mit einer CONNACK-Nachricht. Damit ist die Verbindung etabliert. Der Client abonniert in dem Beispiel (siehe Bild) den Topic bad/lichtsensor1. Nun kann der Broker den Client über Nachrichten dieses Topic betreffend informieren. Wenn eine solche Nachricht eingeht, reagiert der Client, indem er auf dem Topic bad/lautsprecher die Nachricht bzw. den Wert on hinterlässt. Soll die Verbindung später wieder beendet werden, so wird eine DISCONNECT-Nachricht gesendet.

Die Kommunikation zwischen Client und Broker

Wenn ein Client keine aktive Verbindung zum Broker hat und eine neue Nachricht auf einem Topic aufläuft, auf welches der Client ein Abonnement hält, so verpasst er diese Nachricht. Anders sieht es aus, wenn der Sender beim Senden der Nachricht das sogenannte Retain-Flag für die Nachricht gesetzt hat. In diesem Fall stellt der Broker die Nachricht später noch zu.

Für den Fall, das die Verbindung vom Client nicht beendet wird bzw. beendet werden kann, existiert in MQTT ein sogenanntes Testament. So kann es bei Sensoren, die über Batterien gespeist sind durchaus vorkommen, dass die Verbindung plötzlich abbricht. Deshalb kann ein Client ein Testament, eine Nachricht auf ein Topic, hinterlegen, welche im Falle des Verbindungsabbruches vom Broker über das Topic versendet wird.

In der Praxis existieren zwei Versionen von MQTT: MQTT 3 welches die größte Basis besitzt und MQTT 5, welches mit einigen Neuerungen aufwarten kann, um das Protokoll fit für die Zukunft zu machen. Spezifiziert wird MQTT von OASIS, der Organization for the Advancement of Structured Information Standards. Die größten Unterschiede zwischen der letzten 3er-Version 3.1.1 und 5 sind Shared Subscriptions, welche Load Balancing auf Clientseite erlauben, Negative Acks eine Art Statuscodes ähnlich den HTTP-Statuscodes und User Properties, bei denen es sich um die Entsprechung zu den HTTP-Headern handelt. Diese können unter anderem für Metadaten genutzt werden. Ergeben haben sich diese Neuerungen hauptsächlich durch die Rückmeldungen aus der Community über die letzten Jahre.

Das OSI-Modell

Technisch basiert das MQTT-Protokoll auf TCP/IP. Dabei werde die Ports 1883 und 8883 genutzt. Der erste Port ist für die unverschlüsselte, der zweite Port für die verschlüsselte Kommunikation reserviert. Im OSI-Schichtenmodell befindet sich MQTT, wie HTTP auf dem Application Layer (OSI Layer 7). Im Gegensatz zu HTTP ist MQTT bleibt die Verbindung bei MQTT auch bestehen, wenn keine Daten übertragen werden.

Gestartet wird die Kommunikation des Clients mit einer CONNECT-Nachricht, woraufhin der Broker das Ganze mit einer CONNACK-Nachricht bestätigt. Nun kann der Client Topics abonnieren und Informationen zu einem Topic senden (PUBLISH).

MQTT beherrscht drei verschiedene Stufen des Quality of Service (QoS). Stufe 0 ist vom Modell her Fire-and-Forgot; die Nachricht wird einmal versendet und danach vom Broker vergessen. Ob sie ankommt, ist auf dieser QoS-Stufe nicht relevant. Bei Stufe 1 garantiert der Broker das die Nachricht mindestens einmal zugestellt wird, sie kann aber durchaus auch mehrfach bei den Clients ankommen. Stufe 2 hingegen garantiert, dass die Nachricht exakt einmal ankommt. Offiziell sind die QoS-Stufen wie folgt benannt:

At most once (0)
At least once (1)
Exactly once (2)

Je nach gewählter QoS-Stufe kommt es zu vermehrter Kommunikation über das MQTT-Protokoll. Bei Stufe 1 würde die Gegenstelle nach einer PUBLISH-Nachricht mit einer PUBACK-Nachricht antworten. Ohne eine solche Nachricht wird bei Stufe 1 der Sendevorgang so lange wiederholt, bis er von der Gegenseite bestätigt wurde. Deshalb ist nicht sichergestellt das die Nachricht nur einmal ankommt.

Wenn dies nicht gewünscht ist, kann stattdessen die QoS-Stufe 2 genutzt werden. Hier wird in der PUBLISH-Nachricht vom Sender eine ID mitgegeben. Nach dem Empfang wird die Nachricht gespeichert, aber noch nicht verarbeitet. Der Empfänger sendet eine PUBREC-Nachricht mit der ID an den Sender. Erhält der Sender diese Nachricht, sendet er eine Release-Nachricht (PUBREL), an den Empfänger. Als letzte Aktion sendet Empfänger ein PUBCOMP-Nachricht an den Sender und verarbeitet anschließend die Nachricht. Der Sender löscht beim Empfang der PUBCOMP-Nachricht die Nachricht.

Hintergrund für die unterschiedlichen Qualitätsstufen ist der Ressourcenverbrauch; je niedriger die QoS-Stufe um so weniger Ressourcen wie Zeit, Speicher und CPU, werden benötigt, um die Nachricht zu verarbeiten.

An Brokern und Client-Bibliotheken mangelt es MQTT nicht. Zu den bekanntesten Brokern gehören Mosquitto, HiveMQ und VerneMQ. Im Produktivbetrieb muss auf eine Absicherung der Topics geachtet werden. So ist es je nach Broker möglich das diese eine Authentifizierung der Clients verlangen und erst dann den Zugriff auf die Topics erlauben. Client-Bibliotheken für MQTT gibt es wie Sand am Meer, für unterschiedlichste Systeme wie Arduino, C, Java, .NET, Go und viele weitere Sprachen und Frameworks.

MQTTBox unter macOS

Daneben existieren grafische Client, welche die Nachrichten auf dem Broker anzeigen und die Interaktion mit einem Broker ermöglichen. Zu diesen Clients gehören unter anderem MQTT.fx, mqtt-spy und MQTTBox.

Daten unter Java in ein Byte-Array umwandeln

Für ein Projekt wollte ich eine Liste von long-Werten unter Java in ein einzelnes Byte-Array umwandeln. Dafür kann unter Java die Klasse ByteBuffer genutzt werden. In diesem Beispiel wird eine Liste namens dataStream definiert:

List dataStream = new ArrayList<>();

In dieser Liste befinden sich eine unbestimmte Anzahl von Werten. Mit der Klasse ByteBuffer kann diese Liste bzw. die Werte in der Liste in ein Byte-Array überführt werden:

ByteBuffer byteBuffer = ByteBuffer.allocate(dataStream.size() * 8);

for(long value: dataStream) {
	byteBuffer.putLong(value);
}

byte[] array = byteBuffer.array();

Im ersten Schritt wird der ByteBuffer mit einer Kapazität angelegt, welcher der Größe der long-Werte in Byte entspricht. Anschließend werden die long-Werte in den ByteBuffer geschrieben. Ist dies geschehen kann ein Byte-Array aus dem ByteBuffer erstellt werden. Damit ist die Umwandlung abgeschlossen.

Code Snippets über Postman generieren

Mit der App Postman ist es möglich REST-API Aufrufe gegen beliebige Endpunkte durchzuführen. Allerdings bietet Postman weitere Funktionalität, welche vom normalen Tagesgeschäft der App abweicht. Eine dieser Funktionalitäten ist der Code Snippet Generator.

Code Snippets können für unterschiedliche Sprachen und Frameworks erzeugt werden

Mit diesem Generator kann ein beliebiger Request in Quellcode umgewandelt werden. Der Generator unterstützt unterschiedliche Programmiersprachen und Frameworks. Für Java würde, unter Nutzung der OK HTTP Bibliothek, das Ganze so aussehen:

OkHttpClient client = new OkHttpClient();

MediaType mediaType = MediaType.parse("text/plain");
RequestBody body = RequestBody.create(mediaType, "{\r\n\tfield: \"data\",\r\n\tfield2: \"data\",\r\n\tfield3: \"data\"\r\n}");
Request request = new Request.Builder()
  .url("https://example.com")
  .post(body)
  .addHeader("HeaderField", "headerValue")
  .addHeader("Content-Type", "text/plain")
  .addHeader("User-Agent", "PostmanRuntime/7.11.0")
  .addHeader("Accept", "*/*")
  .addHeader("Cache-Control", "no-cache")
  .addHeader("Postman-Token", "7dda208f-ba63-467d-99cd-98455c2b3a7a,9125dbf4-cd5c-4070-87e3-fcda7416ca08")
  .addHeader("Host", "example.com")
  .addHeader("accept-encoding", "gzip, deflate")
  .addHeader("content-length", "56")
  .addHeader("Connection", "keep-alive")
  .addHeader("cache-control", "no-cache")
  .build();

Response response = client.newCall(request).execute();

Erreichbar ist das Feature über den Code-Link, welcher unter dem Send-Button des Hauptfensters zu finden ist. Nach einem Klick auf den Link erscheint ein Dialog in welchem die gewünschte Sprach- und Frameworkkombination ausgewählt werden kann.

Unter dem Send-Button ist der Code-Link zu finden

Als Sprachen für die Generierung von Code Snippets werden unter anderem C#, Go, Java, JavaScript, Objective-C, PHP, Python und Swift unterstützt. Daneben werden bestimmte Tools wie curl und wget unterstützt. Postman selber kann unter getpostman.com bezogen werden.

Benchmarking mit dem JMH-Framework

Benchmarking unter Java hat mit einigen Problemen zu kämpfen. So optimiert die Java-Laufzeitumgebung den Quellcode, je nachdem wie oft er benutzt wird. Wenn nun kleinere Dinge getestet werden sollen, wie z.B. ob Methode A oder B besser funktioniert so wird dies problematisch. Am Anfang würde besagte Methoden nur interpretiert werden, anschließend würden sie bei häufiger Benutzung kompiliert werden und bei noch häufigerer Benutzung weiter optimiert werden. Daneben kann es passieren dass der Benchmark komplett wegoptimiert wird, da die JVM unter Umständen erkennt, dass die genutzten Werte bzw. die Ergebnisse des Benchmarks später nicht mehr genutzt werden.

Die offizielle Seite des Java Microbenchmark Harness-Framework

Damit sich ein Entwickler nicht immer und immer wieder mit solchen Problemen beim Benchmarking herumschlagen muss, wird seit einigen Jahren im Rahmen des OpenJDK-Projektes an dem Java Microbenchmark Harness-Framework gearbeitet. Das JMH-Framework nimmt dem Entwickler viele Aufgaben ab, um diese Probleme zu lösen. So kann bzw. wird die JVM durch JMH aufgewärmt werden und auch die Messung der Zeiten übernimmt JMH. Um JMH zu nutzen, müssen die entsprechenden Abhängigkeiten (in diesem Fall über die Maven-POM-Datei) dem Projekt hinzugefügt werden:

<dependency>
	<groupId>org.openjdk.jmh</groupId>
	<artifactId>jmh-core</artifactId>
	<version>1.21</version>
</dependency>
<dependency>
	<groupId>org.openjdk.jmh</groupId>
	<artifactId>jmh-generator-annprocess</artifactId>
	<version>1.21</version>
</dependency>

Sind die Abhängigkeiten eingebunden, kann mit dem eigentlichen Aufbau des Benchmarks begonnen werden. Jede Methode, welche ein Benchmark durchführen möchte, muss mit der @Benchmark-Annotation gekennzeichnet werden:

@Benchmark
public int addInts() {
    return aInt + bInt;
}

Die Klasse, in welcher die Benchmarks definiert sind, muss mit einigen weiteren Annotationen ausgestattet werden:

@State(Scope.Benchmark)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class AddingBenchmark {
...

Die @State(Scope.Benchmark)-Annotation in diesem Beispiel sorgt dafür das die Felder im Rahmen des Benchmarks für die Benchmark-Methoden zur Verfügung stehen. Mit der @BenchmarkMode-Annotation wird definiert, welche Art von Messung vorgenommen werden soll. In diesem Fall wird die durchschnittliche Zeit für eine einzelne Operation gemessen und ausgegeben. Über die @OutputTimeUnit-Annotation wird definiert wie die Zeiten für die Messungen ausgegeben werden. Werden für den Benchmark Daten benötigt so können sie über eine Methode, welche mit der @SetupAnnotation versehen wird, erzeugt werden:

@Setup
public void setup() {
    ...
}

Die setup-Methode wird damit vor dem eigentlichen Benchmark ausgeführt und somit können benötigte Daten dort erzeugt werden. Damit der Benchmark nun ausgeführt werden kann muss das Java Microbenchmark Harness-Framework angestartet werden. Dafür wird eine Klasse geschrieben und mit einer main-Methode versehen:

package net.seeseekey.JavaBenchmark;

import org.openjdk.jmh.runner.RunnerException;

import java.io.IOException;

public class Runner {

    public static void main(String[] args) throws IOException, RunnerException {
        
        org.openjdk.jmh.Main.main(args);
    }
}

Über die main-Methode wird die main-Methode des Frameworks gestartet und damit wird das Benchmark durchgeführt. Alternativ kann die Methode org.openjdk.jmh.Main.main auch direkt in der Maven-POM-Datei als Startklasse definiert werden:

<plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <version>3.1.0</version>
    <configuration>
        <archive>
            <manifest>
                <mainClass>org.openjdk.jmh.Main.main</mainClass>
            </manifest>

Nachdem der Benchmark implementiert ist, kann dieser natürlich über die IDE ausgeführt werden. Das würde allerdings nicht ganz der Intention von JMH entsprechen. Stattdessen sollte eine JAR aus dem Projekt erzeugt werden und diese, auf einem möglichst unbelastetem System, über die Konsole ausgeführt werden:

java -jar benchmark.jar

Damit wird der Benchmark über JMH ausgeführt. JMH beginnt mit einer Warmup-Phase und führt anschließend den eigentlichen Benchmark durch. Nach einigen Minuten erhält der Entwickler die Auswertung des Benchmark:

Benchmark                                 Mode  Cnt   Score    Error  Units
JavaBenchmark.AddingBenchmark.addDoubles  avgt   25  ≈ 10⁻⁵           ms/op
JavaBenchmark.AddingBenchmark.addInts     avgt   25  ≈ 10⁻⁵           ms/op

In diesem Fall sieht der Entwickler bei der Ausgabe, dass die Ausgabeeinheit für die Zeit mit Millisekunden zu grob gewählt wurde und stattdessen besser Nanosekunden genutzt werden sollten.

Tastatur und Maus unter Java fernsteuern

Für bestimmte Automatisierungsaufgaben ist es manchmal nötig die Tastatur und die Maus eines Rechners fernzusteuern. Unter Java kann dies mit der Klasse Robot aus dem Package java.awt erledigt werden. Ein einfaches Beispiel zur Nutzung sieht dabei wie folgt aus:

// Create robot
Robot robot = new Robot();

// Move mouse and make a mouse click, then wait
robot.mouseMove(1000, 1000);
robot.mousePress(InputEvent.BUTTON1_MASK);
robot.mouseRelease(InputEvent.BUTTON1_MASK);
robot.delay(1000);

// Press key a, then wait
robot.keyPress(KeyEvent.VK_A);
robot.keyRelease(KeyEvent.VK_A);
robot.delay(1000);

In diesem Beispiel wird zunächst eine Instanz der Klasse Robot angelegt. Anschließend wird die Maus bewegt und die linke Maustaste gedrückt und wieder losgelassen. Danach wartet der Robot eine Sekunde, um anschließend die Taste A zu drücken. Neben den im Beispiel gezeigten Funktionalität kann unter anderem das Mausrad gesteuert werden. Auch Methoden um einen Pixel auszulesen oder größere Bereiche des Bildschirmes sind in der Klasse zu finden.