Intrexx Industrial - Servicebus

1. Vorwort

Für Intrexx Industrial wurde die Funktionalität des Prozessdesigners erweitert. Neu hinzugekommen ist der sogenannte Servicebus, mit dem Nachrichten zwischen Prozessen ausgetauscht werden können. Diese Nachrichten werden – ähnlich wie bei JMS oder MQTT - über ein Publish/Subscriber Modell abgebildet. Der Service-Bus steht ab Intrexx Version 18.03 mit Online Update 02 zur Verfügung.

Welche Vorteile bringt der Servicebus? Stellen Sie sich vor, Sie möchten eine Intrexx-Applikation erstellen, die Daten eines IoT-Devices weiterverarbeitet. Das können Sie auf Basis der MQTT-Integration bereits relativ einfach umsetzen. Wenn Sie jedoch eine Applikation erstellen möchten, die mit möglichst vielen unterschiedlichen Devices arbeiten soll – und jedes Device eine unterschiedliche Nachricht schickt – müssen Sie eine Möglichkeit haben, diese Nachrichten in ein einheitliches Format zu überführen. An dieser Stelle kommt der Servicebus ins Spiel: Sie erstellen eine Applikation mit Prozessen, die eine genau definierte Nachricht verarbeiten. Jetzt benötigen Sie nur noch einen Device-abhängigen Prozess, der die Nachricht in das gewünschtes Format bringt. Die Applikation ist dadurch losgelöst von einem spezifischen Device. Mit dem Servicebus können Prozesse realisiert werden, die auf ganz bestimmte Nachrichten reagieren.

2. Nachricht an den Servicebus senden

Im folgenden Beispiel zeigen wir einen Prozess, der eine Nachricht auf dem Servicebus bereitstellt und auch auf den Servicebus reagiert. Wenn Sie das Beispiel nachvollziehen wollen, gehen Sie bitte wie folgt vor:
  1. Importieren Sie das Zertifikat für den Test-MQTT-Server wie hier beschrieben.
  2. Hier können Sie Beispielapplikation und -Prozess herunterladen und wie gewohnt importieren. Aktivieren Sie den Prozess beim Import, damit alle Funktionen zur Verfügung stehen.
  3. Nachrichten werden erst erzeugt, wenn der Globale Timer im Prozess aktiviert wird. Öffnen Sie dazu den Prozess. Wenn Sie den Timer über das Hauptmenü Bearbeiten / Element aktivieren aktivieren, wird er gemäß dem eingestellten Ausführungszeitpunkt ausgeführt. Vergessen Sie bitte nicht, den Timer wieder zu deaktivieren, da sonst sehr viele Werte in Ihre Datenbank geschrieben werden. Alternativ kann der Timer auch über das Hauptmenü Bearbeiten / Globalen Timerjob starten manuell gestartet werden, um den Prozess zu testen - diese Variante erzeugt eine kontrollierbare Menge von Nachrichten.


Die erste Prozesskette - hier links im Bild - mit der eine Nachricht gesendet wird, startet mit dem Globalen Timer. Mit ihm ist ein Timer-Ereignisbehandler verbunden, der auf die Ereignisse des globalen Timers reagiert. Der nächste Prozessschritt ist eine Groovy-Aktion mit dem folgenden Skript:
//write message
g_sharedState.ixbrokerMessage = "test message"
Der Variable "g_sharedState.ixbrokerMessage" kann jedes beliebige Objekt zugeordnet werden. Dieser Wert wird automatisch von der im nächsten Prozessschritt folgenden Servicebus-Aktion verwendet. Es muss keine weitere Zuordnung erfolgen.



In der Generischen Aktion ist die Klasse "de.uplanet.lucy.server.broker.event.action.IxBrokerWorkflowAction" ausgewählt. Mit der Eigenschaft "topicToPublishTo" kann ein beliebiger Wert als Name Ihrer Nachricht festgelegt werden. In diesem Beispiel ist der Name "testtopic".

Mit der Eigenschaft "onlyPushOnChange" und dem Wert "true" können Sie steuern, ob das Topic auch aktualisiert werden soll, wenn der aktuelle Wert auf dem Bus unterschiedlich ist.

3. Auf eine Nachricht aus dem Servicebus reagieren




Im zweiten Teil des Prozesses ist die Klasse "de.uplanet.lucy.server.broker.event.handler" in den Eigenschaften des generischen Ereignisbehandlers ausgewählt. Mit der Eigenschaft "topicToListen" kann als Wert das gewünschte Topic (hier im Beispiel "testtopic") angegeben werden. Dieses Topic wird damit automatisch abonniert.



Mit einer Groovy-Aktion oder wie hier im Beispiel einer Groovy-Bedingung holen Sie den Wert aus dem Servicebus ab. Dieser Prozessschritt wird vom Ereignisbehandler aufgerufen, sobald ein Wert im Servicebus des abonnierten Topics aktualisiert wird. Das folgende Skript wird dann ausgeführt:
    String msg = g_event.message

    g_syslog.info("Msg received from IxBroker: " + msg)

    g_sharedState.topicvalue = msg

    return go
Diesen Wert können Sie nun nach Belieben weiterverarbeiten, mit Informationen anreichern (und dann wieder als neues Topic auf den ServiceBus legen) oder wie hier im Beispiel mit dem nächsten Prozessschritt in die Datenbank schreiben.



Die folgende Datengruppen-Aktion werden zwei benutzerdefinierte Werte aus dem Verarbeitungskontext den Feldern "Titel" und "Value" aus der verbundenen Beispielapplikation zugeordnet: Der Wert "topicvalue" stammt aus dem vorausgehenden Groovy-Skript.

3. Clientseitig auf eine Nachricht aus dem Servicebus reagieren

Gerade in IoT-Umgebungen kann es vorkommen, dass eine bestimmte Seite bzw. Elemente einer Seite periodisch aktualisiert werden sollen. Über den Servicebus haben Sie die Möglichkeit, direkt auf geänderte Werte eines gewünschten Topics zu reagieren.



Fügen Sie dazu folgendes Javascript auf der gewünschten Seite (hier in der Beispielapplikation die Seite "Alle Einträge") ein.
function startPolling()
    {
		registerAjaxPolling({
				/*
				* topicName: <NAME OF THE SERVICEBUS_TOPIC>
				* numberOfValues: Maximum amount of Messages since last request
				*/
				key: "ixBrokerPolling&topicName=testtopic&numberOfValues=1",
				success: function(json, poller){
						checkReload(json, poller);
				},

				successNoChange: function(json, poller){
						checkReload(json, poller);
				},

				error: function(){
						errorOnPolling();
				},
				/*
				* id: unique ID for this polling sequence
				*/
				id: "testId",
				millisec: 5000
		});
    }


    function checkReload(json, poller)
    {
        var resultStr = ""
		//returned json contains an array, if numberOfValues > 1
		
		if (json.values){

			var jsonArray = json.values

			for (var i = 0; i < jsonArray.length; i++) {
               resultStr += jsonArray[i].value + "\n";
            }
         }

        if (json.value){
            resultStr = json.value;
        }
		
		/*
		* Add your code here...
		*/
		alert(resultStr);
    }
Mit diesem Script können Sie direkt auf den Servicebus und das gewünschte Topic zugreifen. Im Beispielcode wird das Topic "testtopic" abonniert und periodisch überprüft. Tritt eine Aktualisierung ein, wird automatisch die Javascript-Funktion "checkReload(json, poller)" aufgerufen. Dort können Sie Ihre gewünschten Aktionen durchführen. Beachten Sie in diesem Zusammenhang auch die Dokumente
Es muss keine weitere Konfiguration auf dem Server erfolgen. Abweichend zur Beschreibung in der Polling-Dokumentation ist es nicht erforderlich, zusätzliche Spring-Konfigurationen anzulegen.

function stopPolling()
    {
            unregisterAjaxPolling("ixBrokerPolling&topicName=testtopic&numberOfValues=1", "testId");
    }


    function errorOnPolling()
    {
            Notifier.status.notify("Text: Error on retrieving reload information", "Titel: Polling", "ID: MyId");
    }
Diese Funktion trägt die Polling-Registrierung beim Verlassen der Seite "Alle Einträge" aus und stoppt damit das Polling.

5. Weitere Informationen

  1. Intrexx Industrial
  2. Test-MQTT-Broker
  3. MQTT in Intrexx
  4. Polling
  5. Seiteninhalte mit JavaScript aktualisieren
  6. Konfiguration eines MQTT-Message Brokers
  7. Kompletter Java Sample Code für eine nahtlose Integration im Modul Prozesse (Ereignisquelle, Ereignisbehandler und Aktion)
  8. http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html