MQTT-Broker in Java einbinden

Zur Nutzung des MQTT-Protokolls wird ein MQTT-Broker benötigt. Dieser kann separat betrieben oder aber in eine Anwendung eingebettet werden. Ein MQTT-Broker, welcher sich für die Einbettung unter Java eignet, ist Moquette. Zur Einbindung von Moquette muss es den Abhängigkeiten des Projektes hinzugefügt werden:

<dependency>
	<groupId>io.moquette</groupId>
	<artifactId>moquette-broker</artifactId>
	<version>0.12.1</version>
</dependency>

Die Funktionalitäten zum Start und Stop des MQTT-Brokers werden in diesem Beispiel in der Klasse Broker gekapselt. In der Klasse wird eine Instanz vom Typ Server angelegt und über die Methode startServer kann der MQTT-Broker gestartet werden. Beim Start wird in diesem Beispiel das Topic /exit angelegt. Weitere Topics können über die Methode pushTopic angelegt werden. Mittels der Methode stopServer kann der Broker wieder gestoppt werden.

public final class Broker {

    private static Logger log = LoggerFactory.getLogger(new Exception().fillInStackTrace().getStackTrace()[0].getClassName());

    final Server mqttBroker = new Server();

    public void startServer() {

        // Load class path for configuration
        IResourceLoader classpathLoader = new ClasspathResourceLoader();
        final IConfig classPathConfig = new ResourceLoaderConfig(classpathLoader);

        // Start MQTT broker
        log.info("Start MQTT broker...");
        List userHandlers = Collections.singletonList(new PublisherListener());

        try {
            mqttBroker.startServer(classPathConfig, userHandlers);
        } catch (IOException e) {
            log.error("MQTT broker start failed...");
        }

        // Wait before publish topics
        log.info("Wait before topics are pushed...");

        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            log.warn("Pause for publishing topics interupted.");
        }

        // Publishing topics
        log.info("Pushing topics...");

        pushTopic("/exit");

        log.info("Topics pushed...");
    }

    public void stopServer() {
        mqttBroker.stopServer();
    }

    public void pushTopic(String topic) {

        MqttPublishMessage message = MqttMessageBuilders.publish()
                .topicName(topic)
                .retained(true)
                .qos(MqttQoS.EXACTLY_ONCE)
                .payload(Unpooled.copiedBuffer("{}".getBytes(UTF_8)))
                .build();

        mqttBroker.internalPublish(message, "INTRLPUB");
    }
}

Beim Start des Servers wird eine Konfigurationsdatei mit dem Namen moquette.conf aus dem Ordner resources/config geladen. Diese könnte beispielhaft wie folgt aussehen:

##############################################
#  Moquette configuration file. 
#
#  The syntax is equals to mosquitto.conf
# 
##############################################

port 1883

#websocket_port 8080

host 0.0.0.0

#Password file
#password_file password_file.conf

#ssl_port 8883
#jks_path serverkeystore.jks
#key_store_password passw0rdsrv
#key_manager_password passw0rdsrv

allow_anonymous true

Beim Start des MQTT-Brokers wird ein Handler vom Typ PublisherListener registriert. Diese Handler muss natürlich vorher definiert werden:

public class PublisherListener extends AbstractInterceptHandler {

    private static Logger log = LoggerFactory.getLogger(new Exception().fillInStackTrace().getStackTrace()[0].getClassName());

    @Override
    public String getID() {
        return "PublishListener";
    }

    @Override
    public void onPublish(InterceptPublishMessage msg) {

        // Create array for payload
        int readableBytes = msg.getPayload().readableBytes();
        byte[] payload = new byte[readableBytes];

        // Read bytes from payload
        for (int i = 0; i < readableBytes; i++) {
            payload[i] = msg.getPayload().readByte();
        }

        // Create string from payload
        String decodedPayload = new String(payload, UTF_8);
        log.debug("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload);
    }
}

Der Handler wertet alle Publish-Nachrichten aus und loggt diese mittels des Loggers. Nun kann der Broker gestartet werden:

// Start broker
Broker broker = new Broker();
broker.startServer();

// Bind a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {

	log.info("Stopping MQTT broker...");
	broker.stopServer();
}));

Über den registrierten Shutdown-Hook kann der Server anschließend wieder beendet werden. Damit ist die beispielhafte Einbindung von Moquette in eine Java-Applikation abgeschlossen. Der Quelltext von Moquette kann über GitHub bezogen werden. Lizenziert ist das Projekt unter der Apache License in Version 2 und damit freie Software.

Server gebraucht kaufen

Ab und an baue ich mir einen Server für den Heimgebrauch zusammen. Dieser muss dabei nicht frisch vom Band kommen, sondern kann durchaus gebraucht sein. Natürlich hat man beim privaten Gebrauchtkauf dass Problem das man nicht ganz genau weiß was man bekommt.

Einer der Server der Serverschmiede

Die Vorteile des Gebrauchtkaufes, ohne die meisten der Nachteile bietet der Händler Serverschmiede.com GmbH, dessen Webseite unter der gleichnamigen Domain serverschmiede.com zu finden ist. Dort werden aufgearbeitete Server und deren Komponenten angeboten und kann sich so schnell einen Server zusammenstellen, der preislich wesentlich unter dem Neupreis liegt. Daneben erhält man auf Wunsch eine bis zu 36-monatige Garantie. Für den Hausgebrauch und die eine oder andere Firma ist dies sicherlich eine gangbare Alternative zu einem Neukauf.