Zur Nutzung des MQTT-Protokolls wird ein MQTT-Broker benötigt. Dieser kann separat betrieben oder aber in eine Anwendung eingebettet werden. Ein MQTT-Broker, welcher sich für die Einbettung unter Java eignet, ist Moquette. Zur Einbindung von Moquette muss es den Abhängigkeiten des Projektes hinzugefügt werden:
<dependency>
<groupId>io.moquette</groupId>
<artifactId>moquette-broker</artifactId>
<version>0.12.1</version>
</dependency>
Die Funktionalitäten zum Start und Stop des MQTT-Brokers werden in diesem Beispiel in der Klasse Broker gekapselt. In der Klasse wird eine Instanz vom Typ Server angelegt und über die Methode startServer kann der MQTT-Broker gestartet werden. Beim Start wird in diesem Beispiel das Topic /exit angelegt. Weitere Topics können über die Methode pushTopic angelegt werden. Mittels der Methode stopServer kann der Broker wieder gestoppt werden.
public final class Broker {
private static Logger log = LoggerFactory.getLogger(new Exception().fillInStackTrace().getStackTrace()[0].getClassName());
final Server mqttBroker = new Server();
public void startServer() {
// Load class path for configuration
IResourceLoader classpathLoader = new ClasspathResourceLoader();
final IConfig classPathConfig = new ResourceLoaderConfig(classpathLoader);
// Start MQTT broker
log.info("Start MQTT broker...");
List extends InterceptHandler> userHandlers = Collections.singletonList(new PublisherListener());
try {
mqttBroker.startServer(classPathConfig, userHandlers);
} catch (IOException e) {
log.error("MQTT broker start failed...");
}
// Wait before publish topics
log.info("Wait before topics are pushed...");
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
log.warn("Pause for publishing topics interupted.");
}
// Publishing topics
log.info("Pushing topics...");
pushTopic("/exit");
log.info("Topics pushed...");
}
public void stopServer() {
mqttBroker.stopServer();
}
public void pushTopic(String topic) {
MqttPublishMessage message = MqttMessageBuilders.publish()
.topicName(topic)
.retained(true)
.qos(MqttQoS.EXACTLY_ONCE)
.payload(Unpooled.copiedBuffer("{}".getBytes(UTF_8)))
.build();
mqttBroker.internalPublish(message, "INTRLPUB");
}
}
Beim Start des Servers wird eine Konfigurationsdatei mit dem Namen moquette.conf aus dem Ordner resources/config geladen. Diese könnte beispielhaft wie folgt aussehen:
##############################################
# Moquette configuration file.
#
# The syntax is equals to mosquitto.conf
#
##############################################
port 1883
#websocket_port 8080
host 0.0.0.0
#Password file
#password_file password_file.conf
#ssl_port 8883
#jks_path serverkeystore.jks
#key_store_password passw0rdsrv
#key_manager_password passw0rdsrv
allow_anonymous true
Beim Start des MQTT-Brokers wird ein Handler vom Typ PublisherListener registriert. Diese Handler muss natürlich vorher definiert werden:
public class PublisherListener extends AbstractInterceptHandler {
private static Logger log = LoggerFactory.getLogger(new Exception().fillInStackTrace().getStackTrace()[0].getClassName());
@Override
public String getID() {
return "PublishListener";
}
@Override
public void onPublish(InterceptPublishMessage msg) {
// Create array for payload
int readableBytes = msg.getPayload().readableBytes();
byte[] payload = new byte[readableBytes];
// Read bytes from payload
for (int i = 0; i < readableBytes; i++) {
payload[i] = msg.getPayload().readByte();
}
// Create string from payload
String decodedPayload = new String(payload, UTF_8);
log.debug("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload);
}
}
Der Handler wertet alle Publish-Nachrichten aus und loggt diese mittels des Loggers. Nun kann der Broker gestartet werden:
// Start broker
Broker broker = new Broker();
broker.startServer();
// Bind a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("Stopping MQTT broker...");
broker.stopServer();
}));
Über den registrierten Shutdown-Hook kann der Server anschließend wieder beendet werden. Damit ist die beispielhafte Einbindung von Moquette in eine Java-Applikation abgeschlossen. Der Quelltext von Moquette kann über GitHub bezogen werden. Lizenziert ist das Projekt unter der Apache License in Version 2 und damit freie Software.