MQTT-Broker in Java einbinden

Zur Nutzung des MQTT-Protokolls wird ein MQTT-Broker benötigt. Dieser kann separat betrieben oder aber in eine Anwendung eingebettet werden. Ein MQTT-Broker, welcher sich für die Einbettung unter Java eignet, ist Moquette. Zur Einbindung von Moquette muss es den Abhängigkeiten des Projektes hinzugefügt werden:

<dependency>
	<groupId>io.moquette</groupId>
	<artifactId>moquette-broker</artifactId>
	<version>0.12.1</version>
</dependency>

Die Funktionalitäten zum Start und Stop des MQTT-Brokers werden in diesem Beispiel in der Klasse Broker gekapselt. In der Klasse wird eine Instanz vom Typ Server angelegt und über die Methode startServer kann der MQTT-Broker gestartet werden. Beim Start wird in diesem Beispiel das Topic /exit angelegt. Weitere Topics können über die Methode pushTopic angelegt werden. Mittels der Methode stopServer kann der Broker wieder gestoppt werden.

public final class Broker {

    private static Logger log = LoggerFactory.getLogger(new Exception().fillInStackTrace().getStackTrace()[0].getClassName());

    final Server mqttBroker = new Server();

    public void startServer() {

        // Load class path for configuration
        IResourceLoader classpathLoader = new ClasspathResourceLoader();
        final IConfig classPathConfig = new ResourceLoaderConfig(classpathLoader);

        // Start MQTT broker
        log.info("Start MQTT broker...");
        List userHandlers = Collections.singletonList(new PublisherListener());

        try {
            mqttBroker.startServer(classPathConfig, userHandlers);
        } catch (IOException e) {
            log.error("MQTT broker start failed...");
        }

        // Wait before publish topics
        log.info("Wait before topics are pushed...");

        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            log.warn("Pause for publishing topics interupted.");
        }

        // Publishing topics
        log.info("Pushing topics...");

        pushTopic("/exit");

        log.info("Topics pushed...");
    }

    public void stopServer() {
        mqttBroker.stopServer();
    }

    public void pushTopic(String topic) {

        MqttPublishMessage message = MqttMessageBuilders.publish()
                .topicName(topic)
                .retained(true)
                .qos(MqttQoS.EXACTLY_ONCE)
                .payload(Unpooled.copiedBuffer("{}".getBytes(UTF_8)))
                .build();

        mqttBroker.internalPublish(message, "INTRLPUB");
    }
}

Beim Start des Servers wird eine Konfigurationsdatei mit dem Namen moquette.conf aus dem Ordner resources/config geladen. Diese könnte beispielhaft wie folgt aussehen:

##############################################
#  Moquette configuration file. 
#
#  The syntax is equals to mosquitto.conf
# 
##############################################

port 1883

#websocket_port 8080

host 0.0.0.0

#Password file
#password_file password_file.conf

#ssl_port 8883
#jks_path serverkeystore.jks
#key_store_password passw0rdsrv
#key_manager_password passw0rdsrv

allow_anonymous true

Beim Start des MQTT-Brokers wird ein Handler vom Typ PublisherListener registriert. Diese Handler muss natürlich vorher definiert werden:

public class PublisherListener extends AbstractInterceptHandler {

    private static Logger log = LoggerFactory.getLogger(new Exception().fillInStackTrace().getStackTrace()[0].getClassName());

    @Override
    public String getID() {
        return "PublishListener";
    }

    @Override
    public void onPublish(InterceptPublishMessage msg) {

        // Create array for payload
        int readableBytes = msg.getPayload().readableBytes();
        byte[] payload = new byte[readableBytes];

        // Read bytes from payload
        for (int i = 0; i < readableBytes; i++) {
            payload[i] = msg.getPayload().readByte();
        }

        // Create string from payload
        String decodedPayload = new String(payload, UTF_8);
        log.debug("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload);
    }
}

Der Handler wertet alle Publish-Nachrichten aus und loggt diese mittels des Loggers. Nun kann der Broker gestartet werden:

// Start broker
Broker broker = new Broker();
broker.startServer();

// Bind a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {

	log.info("Stopping MQTT broker...");
	broker.stopServer();
}));

Über den registrierten Shutdown-Hook kann der Server anschließend wieder beendet werden. Damit ist die beispielhafte Einbindung von Moquette in eine Java-Applikation abgeschlossen. Der Quelltext von Moquette kann über GitHub bezogen werden. Lizenziert ist das Projekt unter der Apache License in Version 2 und damit freie Software.

Benchmarking mit dem JMH-Framework

Benchmarking unter Java hat mit einigen Problemen zu kämpfen. So optimiert die Java-Laufzeitumgebung den Quellcode, je nachdem wie oft er benutzt wird. Wenn nun kleinere Dinge getestet werden sollen, wie z.B. ob Methode A oder B besser funktioniert so wird dies problematisch. Am Anfang würde besagte Methoden nur interpretiert werden, anschließend würden sie bei häufiger Benutzung kompiliert werden und bei noch häufigerer Benutzung weiter optimiert werden. Daneben kann es passieren dass der Benchmark komplett wegoptimiert wird, da die JVM unter Umständen erkennt, dass die genutzten Werte bzw. die Ergebnisse des Benchmarks später nicht mehr genutzt werden.

Die offizielle Seite des Java Microbenchmark Harness-Framework

Damit sich ein Entwickler nicht immer und immer wieder mit solchen Problemen beim Benchmarking herumschlagen muss, wird seit einigen Jahren im Rahmen des OpenJDK-Projektes an dem Java Microbenchmark Harness-Framework gearbeitet. Das JMH-Framework nimmt dem Entwickler viele Aufgaben ab, um diese Probleme zu lösen. So kann bzw. wird die JVM durch JMH aufgewärmt werden und auch die Messung der Zeiten übernimmt JMH. Um JMH zu nutzen, müssen die entsprechenden Abhängigkeiten (in diesem Fall über die Maven-POM-Datei) dem Projekt hinzugefügt werden:

<dependency>
	<groupId>org.openjdk.jmh</groupId>
	<artifactId>jmh-core</artifactId>
	<version>1.21</version>
</dependency>
<dependency>
	<groupId>org.openjdk.jmh</groupId>
	<artifactId>jmh-generator-annprocess</artifactId>
	<version>1.21</version>
</dependency>

Sind die Abhängigkeiten eingebunden, kann mit dem eigentlichen Aufbau des Benchmarks begonnen werden. Jede Methode, welche ein Benchmark durchführen möchte, muss mit der @Benchmark-Annotation gekennzeichnet werden:

@Benchmark
public int addInts() {
    return aInt + bInt;
}

Die Klasse, in welcher die Benchmarks definiert sind, muss mit einigen weiteren Annotationen ausgestattet werden:

@State(Scope.Benchmark)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class AddingBenchmark {
...

Die @State(Scope.Benchmark)-Annotation in diesem Beispiel sorgt dafür das die Felder im Rahmen des Benchmarks für die Benchmark-Methoden zur Verfügung stehen. Mit der @BenchmarkMode-Annotation wird definiert, welche Art von Messung vorgenommen werden soll. In diesem Fall wird die durchschnittliche Zeit für eine einzelne Operation gemessen und ausgegeben. Über die @OutputTimeUnit-Annotation wird definiert wie die Zeiten für die Messungen ausgegeben werden. Werden für den Benchmark Daten benötigt so können sie über eine Methode, welche mit der @SetupAnnotation versehen wird, erzeugt werden:

@Setup
public void setup() {
    ...
}

Die setup-Methode wird damit vor dem eigentlichen Benchmark ausgeführt und somit können benötigte Daten dort erzeugt werden. Damit der Benchmark nun ausgeführt werden kann muss das Java Microbenchmark Harness-Framework angestartet werden. Dafür wird eine Klasse geschrieben und mit einer main-Methode versehen:

package net.seeseekey.JavaBenchmark;

import org.openjdk.jmh.runner.RunnerException;

import java.io.IOException;

public class Runner {

    public static void main(String[] args) throws IOException, RunnerException {
        
        org.openjdk.jmh.Main.main(args);
    }
}

Über die main-Methode wird die main-Methode des Frameworks gestartet und damit wird das Benchmark durchgeführt. Alternativ kann die Methode org.openjdk.jmh.Main.main auch direkt in der Maven-POM-Datei als Startklasse definiert werden:

<plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <version>3.1.0</version>
    <configuration>
        <archive>
            <manifest>
                <mainClass>org.openjdk.jmh.Main.main</mainClass>
            </manifest>

Nachdem der Benchmark implementiert ist, kann dieser natürlich über die IDE ausgeführt werden. Das würde allerdings nicht ganz der Intention von JMH entsprechen. Stattdessen sollte eine JAR aus dem Projekt erzeugt werden und diese, auf einem möglichst unbelastetem System, über die Konsole ausgeführt werden:

java -jar benchmark.jar

Damit wird der Benchmark über JMH ausgeführt. JMH beginnt mit einer Warmup-Phase und führt anschließend den eigentlichen Benchmark durch. Nach einigen Minuten erhält der Entwickler die Auswertung des Benchmark:

Benchmark                                 Mode  Cnt   Score    Error  Units
JavaBenchmark.AddingBenchmark.addDoubles  avgt   25  ≈ 10⁻⁵           ms/op
JavaBenchmark.AddingBenchmark.addInts     avgt   25  ≈ 10⁻⁵           ms/op

In diesem Fall sieht der Entwickler bei der Ausgabe, dass die Ausgabeeinheit für die Zeit mit Millisekunden zu grob gewählt wurde und stattdessen besser Nanosekunden genutzt werden sollten.

Ausführbare jar-Datei mittels Maven generieren

Für bestimmte Java-Projekte ist es durchaus praktisch als Ergebnis eine jar-Datei inklusive der benötigten Abhängigkeiten zu generieren. So kann das Projekt in Form einer einzelnen Datei unkompliziert ausgeliefert werden. Mittels Maven lässt sich dies relativ einfach bewerkstelligen. Dabei wird eine Basis-POM (pom.xml) benötigt, in welcher das Plugin maven-assembly-plugin konfiguriert wird:

<plugin>
	<artifactId>maven-assembly-plugin</artifactId>
	<version>3.1.0</version>
	<configuration>
		<archive>
			<manifest>
				<mainClass>com.example.webservice.Webservice</mainClass>
			</manifest>
		</archive>
		<descriptorRefs>
			<descriptorRef>jar-with-dependencies</descriptorRef>
		</descriptorRefs>
	</configuration>
</plugin>

Soll nach erfolgter Entwicklung die jar-Datei mit den Abhängigkeiten erzeugt werden, muss das Kommando:

mvn clean compile assembly:single

genutzt werden. Damit das Ganze auch beim gewöhnlichen:

mvn package

funktioniert, muss die pom.xml angepasst werden. Dabei wird zur Plugin-Definition ein executions-Block hinzugefügt, welcher dafür sorgt das dass Ziel single auch während der Phase package ausgeführt wird. Die komplette pom.xml sieht dann wie folgt aus:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>webservice</artifactId>
    <version>1.0.0</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.5</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.example.webservice.Webservice</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Alle im dependencies-Block verwendeten Abhängigkeiten, werden somit in die jar-Datei übernommen.