MQTT-Broker in Java einbinden

Zur Nutzung des MQTT-Protokolls wird ein MQTT-Broker benötigt. Dieser kann separat betrieben oder aber in eine Anwendung eingebettet werden. Ein MQTT-Broker, welcher sich für die Einbettung unter Java eignet, ist Moquette. Zur Einbindung von Moquette muss es den Abhängigkeiten des Projektes hinzugefügt werden:

<dependency>
	<groupId>io.moquette</groupId>
	<artifactId>moquette-broker</artifactId>
	<version>0.12.1</version>
</dependency>

Die Funktionalitäten zum Start und Stop des MQTT-Brokers werden in diesem Beispiel in der Klasse Broker gekapselt. In der Klasse wird eine Instanz vom Typ Server angelegt und über die Methode startServer kann der MQTT-Broker gestartet werden. Beim Start wird in diesem Beispiel das Topic /exit angelegt. Weitere Topics können über die Methode pushTopic angelegt werden. Mittels der Methode stopServer kann der Broker wieder gestoppt werden.

public final class Broker {

    private static Logger log = LoggerFactory.getLogger(new Exception().fillInStackTrace().getStackTrace()[0].getClassName());

    final Server mqttBroker = new Server();

    public void startServer() {

        // Load class path for configuration
        IResourceLoader classpathLoader = new ClasspathResourceLoader();
        final IConfig classPathConfig = new ResourceLoaderConfig(classpathLoader);

        // Start MQTT broker
        log.info("Start MQTT broker...");
        List userHandlers = Collections.singletonList(new PublisherListener());

        try {
            mqttBroker.startServer(classPathConfig, userHandlers);
        } catch (IOException e) {
            log.error("MQTT broker start failed...");
        }

        // Wait before publish topics
        log.info("Wait before topics are pushed...");

        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            log.warn("Pause for publishing topics interupted.");
        }

        // Publishing topics
        log.info("Pushing topics...");

        pushTopic("/exit");

        log.info("Topics pushed...");
    }

    public void stopServer() {
        mqttBroker.stopServer();
    }

    public void pushTopic(String topic) {

        MqttPublishMessage message = MqttMessageBuilders.publish()
                .topicName(topic)
                .retained(true)
                .qos(MqttQoS.EXACTLY_ONCE)
                .payload(Unpooled.copiedBuffer("{}".getBytes(UTF_8)))
                .build();

        mqttBroker.internalPublish(message, "INTRLPUB");
    }
}

Beim Start des Servers wird eine Konfigurationsdatei mit dem Namen moquette.conf aus dem Ordner resources/config geladen. Diese könnte beispielhaft wie folgt aussehen:

##############################################
#  Moquette configuration file. 
#
#  The syntax is equals to mosquitto.conf
# 
##############################################

port 1883

#websocket_port 8080

host 0.0.0.0

#Password file
#password_file password_file.conf

#ssl_port 8883
#jks_path serverkeystore.jks
#key_store_password passw0rdsrv
#key_manager_password passw0rdsrv

allow_anonymous true

Beim Start des MQTT-Brokers wird ein Handler vom Typ PublisherListener registriert. Diese Handler muss natürlich vorher definiert werden:

public class PublisherListener extends AbstractInterceptHandler {

    private static Logger log = LoggerFactory.getLogger(new Exception().fillInStackTrace().getStackTrace()[0].getClassName());

    @Override
    public String getID() {
        return "PublishListener";
    }

    @Override
    public void onPublish(InterceptPublishMessage msg) {

        // Create array for payload
        int readableBytes = msg.getPayload().readableBytes();
        byte[] payload = new byte[readableBytes];

        // Read bytes from payload
        for (int i = 0; i < readableBytes; i++) {
            payload[i] = msg.getPayload().readByte();
        }

        // Create string from payload
        String decodedPayload = new String(payload, UTF_8);
        log.debug("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload);
    }
}

Der Handler wertet alle Publish-Nachrichten aus und loggt diese mittels des Loggers. Nun kann der Broker gestartet werden:

// Start broker
Broker broker = new Broker();
broker.startServer();

// Bind a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {

	log.info("Stopping MQTT broker...");
	broker.stopServer();
}));

Über den registrierten Shutdown-Hook kann der Server anschließend wieder beendet werden. Damit ist die beispielhafte Einbindung von Moquette in eine Java-Applikation abgeschlossen. Der Quelltext von Moquette kann über GitHub bezogen werden. Lizenziert ist das Projekt unter der Apache License in Version 2 und damit freie Software.

Chronologische Abfrage unter Elasticsearch

Elasticsearch ist eine quelloffene Suchmaschine. Über eine Query kann eine Suche spezifiziert werden, welche für die entsprechenden Ergebnisse sorgt. Manchmal soll allerdings die Ausgabe der Daten eines Index von Elasticsearch chronologisch erfolgen. Dies ist z.B. sinnvoll, wenn Logdaten oder ähnliches in diesem gespeichert werden. In einem solchen Fall sollen sie unter Umständen chronologisch ähnlich dem Aufruf tail -f angezeigt werden. Dazu dient folgender Request:

GET https://elastic.example.org:9200/index/_search

Zusätzlich muss die eigentliche Query im Body der Anfrage, als JSON hinterlegt werden:

{
  "query": {
    "match_all": {}
  },
  "size": 10,
  "sort": [
    {
      "timestamp": {
        "order": "desc"
      }
    }
  ]
}

Das hier für die Sortierung genutzte Feld timestamp muss in den Daten enthalten sein, sonst funktioniert die Sortierung nicht. Nach dem Absetzen der Query, erhält der Nutzer zehn Einträge, beginnend mit den neusten Einträgen.

Zugriff auf MAMP-Logs

MAMP (Mac, Apache, MySQL und PHP) ist ein Paket zum lokalen Betrieb eines Webservers für Test- und Entwicklungszwecke. Neben der ursprünglichen Version für macOS, existiert mittlerweile auch eine Version für Windows. Bei der Entwicklung von Software z.B. einer PHP-Anwendung ist es hilfreich die Logs der einzelnen Bestandteile von MAMP nutzen zu können. Unter macOS befinden sich die Logs in dem Ordner:

/Applications/MAMP/logs/

In diesem Ordner sind eine Reihe von Dateien zu finden. Die einzelnen Logdateien tragen folgende Dateinamen:

/Applications/MAMP/logs/apache_error.log 
/Applications/MAMP/logs/mysql_error_log.err
/Applications/MAMP/logs/nginx_access.log 
/Applications/MAMP/logs/nginx_error.log 
/Applications/MAMP/logs/php_error.log

Die Nginx-Log-Dateien tauchen nur auf, wenn Nginx als Webserver in den Einstellungen von MAMP verwendet wurde. In der Windows-Version befinden sich die Log-Dateien ebenfalls im log-Ordner, welcher sich wiederum im MAMP-Installationsordner befindet.

Rewrites unter Nginx loggen

In der Konfiguration für den Webserver Nginx bzw. dessen Seiten, ist es möglich Rewrite-Direktiven zu nutzen. Damit können URLs umgeschrieben werden. Ein Beispiel für eine solche Direktive wäre z.B.

try_files $uri $uri/ @rewrite;

location @rewrite {
  rewrite ^/(.*)$ /index.php?_url=/$1;
}

Wenn dieser Rewrite zur Anwendung kommt, erscheint leider keine Meldung in den Nginx-Logs access.log und error.log. Um das Logging für Rewrites zu aktivieren, muss die Option:

rewrite_log on;

im server-Block der Konfiguration, welche unter /etc/nginx/sites-available/ zu finden ist, aktiviert werden. Daneben muss die Logging-Severity, also die Schwere der Events die geloggt werden sollen, angepasst werden. Dazu wird folgende Option dem server-Block hinzugefügt:

error_log /var/log/nginx/error.log notice;

Wenn die Log-Datei nun angeschaut wird, so finden sich dort für jeden Rewrite entsprechende Meldungen:

2019/05/29 08:35:30 [notice] 16054#16054: *127817 “^/(.*)$” matches “/name/german”, client: 82.193.248.37, server: api.example.com, request: “GET /name/german HTTP/1.1”, host: “api.example.com”
2019/05/29 08:35:30 [notice] 16054#16054: *127817 rewritten data: “/index.php”, args: “_url=/name/german”, client: 82.193.248.37, server: api.example.com, request: “GET /name/german HTTP/1.1”, host: “api.example.com”

Garbage Collection Log erstellen und auswerten

In regelmäßigen Abstand führen Java-Applikationen eine sogenannte Garbage Collection durch. Dabei werden Objekte welche nicht mehr benötigt werden entfernt und somit der Speicher der Anwendung bereinigt. Die Ausführung der Garbage Collection kann während der Ausführung der Applikation mitgeloggt werden:

java -Xloggc:garbage.log -jar application.jar

Der Parameter -Xloggc spezifiziert die Datei in welche das Log geschrieben wird. Mit Hilfe dieses Logs können Probleme der Applikationen analysiert werden. Ein Beispiel für ein solches Log könnte z.B. so aussehen:

Java HotSpot(TM) 64-Bit Server VM (25.144-b01) for bsd-amd64 JRE (1.8.0_144-b01), built on Jul 21 2017 22:07:42 by "java_re" with gcc 4.2.1 (Based on Apple Inc. build 5658) (LLVM build 2336.11.00)
Memory: 4k page, physical 8388608k(122952k free)

/proc/meminfo:

CommandLine flags: -XX:InitialHeapSize=134217728 -XX:MaxHeapSize=2147483648 -XX:+PrintGC -XX:+PrintGCTimeStamps -XX:+UseCompressedClassPointers -XX:+UseCompressedOops -XX:+UseParallelGC 
4,347: [GC (Allocation Failure)  33280K->8385K(125952K), 0,0125825 secs]
4,475: [GC (Metadata GC Threshold)  12196K->8961K(125952K), 0,0061766 secs]
4,481: [Full GC (Metadata GC Threshold)  8961K->6026K(87552K), 0,0206784 secs]
16,232: [GC (Allocation Failure)  39306K->15981K(87552K), 0,0095999 secs]

Zur Auswertung kann, neben vielen anderen Tools, der Webdienst GCeasy, welcher sich selbst als Universal GC Log Analyzer bezeichnet, genutzt werden.

Die Analyse des Garbage Collection Logs

Mit Hilfe der Auswertung können eventuelle Probleme im Zusammenhang mit der Garbage Collection besser eingegrenzt und analysiert werden. Zu finden ist der Dienst unter gceasy.io.