Intrexx Industrial - Service bus

1. Foreword

The functionality of the the Process Designer has been expanded for Intrexx Industrial. The so-called service bus has been added that is used to exchange messages between processes. These messages are portrayed via a publish-subscribe model - as is the case with JMS or MQTT. The service bus is available in Intrexx Version 18.03 with Online Update 02 or higher.

What are the benefits of the service bus? Imagine you want to create an Intrexx application that processes data from an IoT device. This is relatively easy to implement based on MQTT integration. However, if you want to create an application that should work with as many different devices as possible - and each device sends a different message - you need to have a way of transferring these messages in a standardized format. The service bus comes into play here: you create an application with processes that process a precisely defined message. You now only need a device-dependent process that transforms the message into the desired format. This means the application is no longer dependent on a specific device. With the service bus, processes can be implemented that respond to very specific messages.

2. Send message to the service bus

In the following example, we will demonstrate a process that publishes a message on the service bus and responds to the service bus. If you want to reconstruct the example, please proceed as follows:
  1. Import the certificate for the Test MQTT Server as described here.
  2. Click here to download the example application and process, and then import these as usual. Activate the process so that all functions are available.
  3. Messages are only generated when the global timer in the process is activated. Open the process to do this. Once you activate the timer via the Edit menu / Activate element, it will be performed based on the configured execution time. Please do not forget to deactivate the timer afterwards, otherwis a lot of values will be written to your database. Alternatively, you can start the timer manually via the Alternativ kann der Timer auch über das Edit menu / Start global timer job to test the process - this option generates an overseeable amount of messages.


The first process chain - left in the picture - that sends the message starts with a global timer. A timer event handler is connected to this that responds to events from the global timer. The next process step is a Groovy script action with the following script:
//write message
g_sharedState.ixbrokerMessage = "test message"
Any object can be assigned to the variable "g_sharedState.ixbrokerMessage". This value is automatically used by the subsequent service bus action in the next process step. Another assignment does not need to be made.



The class "de.uplanet.lucy.server.broker.event.action.IxBrokerWorkflowAction" is selected in the generic action. A value of your choice can be defined as the name of your message with the property "topicToPublishTo". In this example, the name is "testtopic".

With the property "onlyPushOnChange" and the value "true", you can determine whether the topic should also be refreshed if the current value on the bus is different.

3. Respond to a message from the service bus




In the second part of the process, the class "de.uplanet.lucy.server.broker.event.handler" is selected in the properties of the generic event handler. The desired topic (in our example "testtopic") can be specified as the value of the property "topicToListen". This topic will then be subscribed to automatically.



You can retrieve the value from the service bus using a Groovy script action or a Groovy condition, as in our example. This process step is triggered by the event handler as soon as a value in the service bus of the topic subscribed to is updated. The following script is then performed:
    String msg = g_event.message

    g_syslog.info("Msg received from IxBroker: " + msg)

    g_sharedState.topicvalue = msg

    return go
You can now process this value as you like by adding more information (and then putting is onto the service bus again as a new topic) or by writing it to the database in a subsequent process step, as is the case in our example.



In the subsequent data group-action, two user-defined values from the processing context are assigned to the fields "Title" and "Value" from the connected application: The value "topicvalue" comes from the preceding Groovy script.

3. Respond to a message from the service bus on the client side

Especially in IoT environments, it may be necessary to update a specific page or specifc elements of a page periodically. With the service bus, you have the ability to respond directly to changed values of a specific topic.



Add the following JavaScript to a page of your choice (in our example, we have added it to the "All Entries" page).
function startPolling()
    {
		registerAjaxPolling({
				/*
				* topicName: <NAME OF THE SERVICEBUS_TOPIC>
				* numberOfValues: Maximum amount of Messages since last request
				*/
				key: "ixBrokerPolling&topicName=testtopic&numberOfValues=1",
				success: function(json, poller){
						checkReload(json, poller);
				},

				successNoChange: function(json, poller){
						checkReload(json, poller);
				},

				error: function(){
						errorOnPolling();
				},
				/*
				* id: unique ID for this polling sequence
				*/
				id: "testId",
				millisec: 5000
		});
    }


    function checkReload(json, poller)
    {
        var resultStr = ""
		//returned json contains an array, if numberOfValues > 1
		
		if (json.values){

			var jsonArray = json.values

			for (var i = 0; i < jsonArray.length; i++) {
               resultStr += jsonArray[i].value + "\n";
            }
         }

        if (json.value){
            resultStr = json.value;
        }
		
		/*
		* Add your code here...
		*/
		alert(resultStr);
    }
With this script, you can access the service bus and the desired topic directly. In the example code, the topic "testtopic" is subscribed to and checked periodically. If there is an update, the JavaScript function "checkReload(json, poller)" is called automatically. You can perform the actions of your choice there. Please refer to the following documents for more information
Additional server configurations do not need to be made in this case. Different from the polling document, additional Spring configurations are not required here.

function stopPolling()
    {
            unregisterAjaxPolling("ixBrokerPolling&topicName=testtopic&numberOfValues=1", "testId");
    }


    function errorOnPolling()
    {
            Notifier.status.notify("Text: Error on retrieving reload information", "Titel: Polling", "ID: MyId");
    }
This function deactivates the polling registration when you leave the "All Entries" page and thus ends the polling.

5. More information

  1. Intrexx Industrial
  2. Test MQTT Broker
  3. MQTT in Intrexx
  4. Polling
  5. Increased perfomance by only refreshing specific elements on a page
  6. Complete Java Sample Code for a seamless integration in the Processes module (event source, event handler and action)
  7. Configuration of an MQTT Message Broker
  8. http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html