Zur Nutzung des MQTT-Protokolls wird ein MQTT-Broker benötigt. Dieser kann separat betrieben oder aber in eine Anwendung eingebettet werden. Ein MQTT-Broker, welcher sich für die Einbettung unter Java eignet, ist Moquette. Zur Einbindung von Moquette muss es den Abhängigkeiten des Projektes hinzugefügt werden:
<dependency> <groupId>io.moquette</groupId> <artifactId>moquette-broker</artifactId> <version>0.12.1</version> </dependency>
Die Funktionalitäten zum Start und Stop des MQTT-Brokers werden in diesem Beispiel in der Klasse Broker gekapselt. In der Klasse wird eine Instanz vom Typ Server angelegt und über die Methode startServer kann der MQTT-Broker gestartet werden. Beim Start wird in diesem Beispiel das Topic /exit angelegt. Weitere Topics können über die Methode pushTopic angelegt werden. Mittels der Methode stopServer kann der Broker wieder gestoppt werden.
public final class Broker { private static Logger log = LoggerFactory.getLogger(new Exception().fillInStackTrace().getStackTrace()[0].getClassName()); final Server mqttBroker = new Server(); public void startServer() { // Load class path for configuration IResourceLoader classpathLoader = new ClasspathResourceLoader(); final IConfig classPathConfig = new ResourceLoaderConfig(classpathLoader); // Start MQTT broker log.info("Start MQTT broker..."); List extends InterceptHandler> userHandlers = Collections.singletonList(new PublisherListener()); try { mqttBroker.startServer(classPathConfig, userHandlers); } catch (IOException e) { log.error("MQTT broker start failed..."); } // Wait before publish topics log.info("Wait before topics are pushed..."); try { Thread.sleep(20000); } catch (InterruptedException e) { log.warn("Pause for publishing topics interupted."); } // Publishing topics log.info("Pushing topics..."); pushTopic("/exit"); log.info("Topics pushed..."); } public void stopServer() { mqttBroker.stopServer(); } public void pushTopic(String topic) { MqttPublishMessage message = MqttMessageBuilders.publish() .topicName(topic) .retained(true) .qos(MqttQoS.EXACTLY_ONCE) .payload(Unpooled.copiedBuffer("{}".getBytes(UTF_8))) .build(); mqttBroker.internalPublish(message, "INTRLPUB"); } }
Beim Start des Servers wird eine Konfigurationsdatei mit dem Namen moquette.conf aus dem Ordner resources/config geladen. Diese könnte beispielhaft wie folgt aussehen:
############################################## # Moquette configuration file. # # The syntax is equals to mosquitto.conf # ############################################## port 1883 #websocket_port 8080 host 0.0.0.0 #Password file #password_file password_file.conf #ssl_port 8883 #jks_path serverkeystore.jks #key_store_password passw0rdsrv #key_manager_password passw0rdsrv allow_anonymous true
Beim Start des MQTT-Brokers wird ein Handler vom Typ PublisherListener registriert. Diese Handler muss natürlich vorher definiert werden:
public class PublisherListener extends AbstractInterceptHandler { private static Logger log = LoggerFactory.getLogger(new Exception().fillInStackTrace().getStackTrace()[0].getClassName()); @Override public String getID() { return "PublishListener"; } @Override public void onPublish(InterceptPublishMessage msg) { // Create array for payload int readableBytes = msg.getPayload().readableBytes(); byte[] payload = new byte[readableBytes]; // Read bytes from payload for (int i = 0; i < readableBytes; i++) { payload[i] = msg.getPayload().readByte(); } // Create string from payload String decodedPayload = new String(payload, UTF_8); log.debug("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload); } }
Der Handler wertet alle Publish-Nachrichten aus und loggt diese mittels des Loggers. Nun kann der Broker gestartet werden:
// Start broker Broker broker = new Broker(); broker.startServer(); // Bind a shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { log.info("Stopping MQTT broker..."); broker.stopServer(); }));
Über den registrierten Shutdown-Hook kann der Server anschließend wieder beendet werden. Damit ist die beispielhafte Einbindung von Moquette in eine Java-Applikation abgeschlossen. Der Quelltext von Moquette kann über GitHub bezogen werden. Lizenziert ist das Projekt unter der Apache License in Version 2 und damit freie Software.