MQTT-Broker in Java einbinden

Zur Nutzung des MQTT-Protokolls wird ein MQTT-Broker benötigt. Dieser kann separat betrieben oder aber in eine Anwendung eingebettet werden. Ein MQTT-Broker, welcher sich für die Einbettung unter Java eignet, ist Moquette. Zur Einbindung von Moquette muss es den Abhängigkeiten des Projektes hinzugefügt werden:

<dependency>
	<groupId>io.moquette</groupId>
	<artifactId>moquette-broker</artifactId>
	<version>0.12.1</version>
</dependency>

Die Funktionalitäten zum Start und Stop des MQTT-Brokers werden in diesem Beispiel in der Klasse Broker gekapselt. In der Klasse wird eine Instanz vom Typ Server angelegt und über die Methode startServer kann der MQTT-Broker gestartet werden. Beim Start wird in diesem Beispiel das Topic /exit angelegt. Weitere Topics können über die Methode pushTopic angelegt werden. Mittels der Methode stopServer kann der Broker wieder gestoppt werden.

public final class Broker {

    private static Logger log = LoggerFactory.getLogger(new Exception().fillInStackTrace().getStackTrace()[0].getClassName());

    final Server mqttBroker = new Server();

    public void startServer() {

        // Load class path for configuration
        IResourceLoader classpathLoader = new ClasspathResourceLoader();
        final IConfig classPathConfig = new ResourceLoaderConfig(classpathLoader);

        // Start MQTT broker
        log.info("Start MQTT broker...");
        List userHandlers = Collections.singletonList(new PublisherListener());

        try {
            mqttBroker.startServer(classPathConfig, userHandlers);
        } catch (IOException e) {
            log.error("MQTT broker start failed...");
        }

        // Wait before publish topics
        log.info("Wait before topics are pushed...");

        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            log.warn("Pause for publishing topics interupted.");
        }

        // Publishing topics
        log.info("Pushing topics...");

        pushTopic("/exit");

        log.info("Topics pushed...");
    }

    public void stopServer() {
        mqttBroker.stopServer();
    }

    public void pushTopic(String topic) {

        MqttPublishMessage message = MqttMessageBuilders.publish()
                .topicName(topic)
                .retained(true)
                .qos(MqttQoS.EXACTLY_ONCE)
                .payload(Unpooled.copiedBuffer("{}".getBytes(UTF_8)))
                .build();

        mqttBroker.internalPublish(message, "INTRLPUB");
    }
}

Beim Start des Servers wird eine Konfigurationsdatei mit dem Namen moquette.conf aus dem Ordner resources/config geladen. Diese könnte beispielhaft wie folgt aussehen:

##############################################
#  Moquette configuration file. 
#
#  The syntax is equals to mosquitto.conf
# 
##############################################

port 1883

#websocket_port 8080

host 0.0.0.0

#Password file
#password_file password_file.conf

#ssl_port 8883
#jks_path serverkeystore.jks
#key_store_password passw0rdsrv
#key_manager_password passw0rdsrv

allow_anonymous true

Beim Start des MQTT-Brokers wird ein Handler vom Typ PublisherListener registriert. Diese Handler muss natürlich vorher definiert werden:

public class PublisherListener extends AbstractInterceptHandler {

    private static Logger log = LoggerFactory.getLogger(new Exception().fillInStackTrace().getStackTrace()[0].getClassName());

    @Override
    public String getID() {
        return "PublishListener";
    }

    @Override
    public void onPublish(InterceptPublishMessage msg) {

        // Create array for payload
        int readableBytes = msg.getPayload().readableBytes();
        byte[] payload = new byte[readableBytes];

        // Read bytes from payload
        for (int i = 0; i < readableBytes; i++) {
            payload[i] = msg.getPayload().readByte();
        }

        // Create string from payload
        String decodedPayload = new String(payload, UTF_8);
        log.debug("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload);
    }
}

Der Handler wertet alle Publish-Nachrichten aus und loggt diese mittels des Loggers. Nun kann der Broker gestartet werden:

// Start broker
Broker broker = new Broker();
broker.startServer();

// Bind a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {

	log.info("Stopping MQTT broker...");
	broker.stopServer();
}));

Über den registrierten Shutdown-Hook kann der Server anschließend wieder beendet werden. Damit ist die beispielhafte Einbindung von Moquette in eine Java-Applikation abgeschlossen. Der Quelltext von Moquette kann über GitHub bezogen werden. Lizenziert ist das Projekt unter der Apache License in Version 2 und damit freie Software.

Java Exceptions in Strings verpacken

Wenn in einem Java-Programm etwas schiefläuft so, wird in den meisten Fällen eine Exception erzeugt und diese nach oben im Stack durchgereicht, bis sie behandelt wird. Manchmal ist es notwendig die Exception bzw. Teile davon in einen String umzuwandeln. Natürlich kann die Exception nun mühsam von Hand auseinander genommen und in einen String konvertiert werden. Sinnvoller ist es die Hilfsmethoden aus der Utility-Bibliothek Guava zu nutzen:

String stacktrace = Throwables.getStackTraceAsString(e);

Bei der Methode getStackTraceAsString wird der Stacktrace der betreffenden Exception in einen String umgewandelt. Neben dieser Methode existieren weitere Methoden rund um das Exception-Handling in der Klasse Throwables. Einfacher wird das Ganze, wenn ein Logger genutzt wird und in diesem die Exception auftauchen soll:

Logger logger = LoggerFactory.getLogger(new Exception().fillInStackTrace().getStackTrace()[0].getClassName());
logger.error("Exception: ", e);

In diesem Fall bieten die meisten Logger die Möglichkeit an, das Throwable direkt als Parameter zu übergeben. Die weitere Verarbeitung der Exception wird anschließend vom Logger erledigt.

Klassenname für Logger unter Java ermitteln

Wenn ein Logger unter Java erstellt wird, so wird diesem in den meisten Fällen ein Name übergeben, über den er später identifiziert und konfiguriert werden kann. Mit der Simple Logging Facade for Java kurz SLF4J würde das Ganze so aussehen:

Logger logger = LoggerFactory.getLogger("Testclass");

Die erste Möglichkeit wäre es nun den Namen der Klasse, über die Klasseninformationen mittels:

Logger logger = LoggerFactory.getLogger(Testclass.class.getName());

zu ermitteln. Alternativ kann diese Klasseninformation direkt in die Methode gegeben werden:

Logger logger = LoggerFactory.getLogger(Testclass.class);

Alle diese Methoden haben den Nachteil das die jeweilige Klasse in irgendeiner Form übergeben werden muss. Wenn nun dieser Logger in eine andere Klasse kopiert wird um ihn dort ebenfalls zu nutzen muss der Name der Klasse angepasst werden. Anders funktioniert das Ganze mit der folgenden Methode:

Logger logger = LoggerFactory.getLogger(new Exception().fillInStackTrace().getStackTrace()[0].getClassName());

Bei diesem Beispiel wird eine neue Exception instanziiert und aus dem Stacktrace dieser Exception der Name der Klasse ermittelt. Somit kann diese Zeile von Klasse zu Klasse kopiert werden, ohne das eine Anpassung notwendig wird.