Skip to main contentIBM Garage Event-Driven Reference Architecture - Reefer Container Shipment reference implementation

Reactive Messaging

Reactive Systems provide an architecture style to deliver responsive systems. By infusing asynchronous messaging passing at the core of the system, applications enforcing the reactive system’s characteristics are inherently resilient and become more elastic by scaling up and down the number of message consumers. Microservices as part of reactive systems interact using messages. The MicroProfile Reactive Messaging specification aims to deliver applications embracing the characteristics of reactive systems.

Reactive Systems provide an architecture style to deliver responsive systems. By infusing asynchronous messaging passing at the core of the system, applications enforcing the reactive system’s characteristics are inherently resilient and become more elastic by scaling up and down the number of message consumers.

reactive systems

Microservices as part of reactive systems interact using messages. The location and temporal decoupling, promoted by this interaction mechanism, enable numerous benefits such as:

  • Better failure handling as the temporal decoupling enables message brokers to resend or reroute messages in the case of remote service failures.
  • Improved elasticity as under fluctuating load the system can decide to scale up and down some of the microservices.
  • The ability to introduce new features more easily as components are more loosely coupled by receiving and publishing messages.

Use Cases

MicroProfile Reactive Messaging aims to provide a way to connect event-driven microservices. The key characteristics of the specification make it versatile and suitable for building different types of architecture and applications.

First, asynchronous interactions with different services and resources can be implemented using Reactive Messaging. Typically, asynchronous database drivers can be used in conjunction with Reactive Messaging to read and write into a data store in a non-blocking and asynchronous manner.

When building microservices, the CQRS and event-sourcing patterns provide an answer to the data sharing between microservices. Reactive Messaging can also be used as the foundation to CQRS and Event-Sourcing mechanism, as these patterns embrace message-passing as core communication pattern.

IOT applications, dealing with events from various devices, and data streaming applications can also be implemented using Reactive Messaging. The application receives events or messages, process them, transform them, and may forward them to another microservices. It allows for more fluid architecture for building data-centric applications.

MicroProfile Reactive Messaging

MicroProfile Reactive Messaging provides a very easy-to-use way to send, receive, and process messages and is well-suited to writing applications that process streams of events. With MicroProfile Reactive Messaging, you annotate application beans’ methods and, under the covers, OpenLiberty (in our specific case) converts these to reactive streams-compatible publishers, subscribers and processors and connects them up to each other.

reactive messaging

Although sending messages within our application is nice, it’s more useful to be able to send and receive messages from other systems. For this, MicroProfile Reactive Messaging also provides a Connector API to allow your methods to be connected to external messaging systems. Open Liberty includes the liberty-kafka connector for sending and receiving messages from an Apache Kafka broker.

It is important to understand that MicroProfile Reactive Messaging does not contain an implementation itself but only provides the specified API, a TCK and documentation. It is down to application server or external libraries to provide such implementation. For instance, Open Liberty 19.0.0.9 provides a full implementation of MicroProfile Reactive Messaging 1.0. OpenLiberty’s implementation of the MicroProfile Reactive Messaging capabilities is based on SmallRye Reactive Messaging.

Implement

We have based our implementation on MicroProfile as the standardization for building microservices-based applications. The application server we have chosen, that provides full MicroProfile support, is OpenLiberty.

As already introduced, the scoring agent will react to a stream of data, in the form of messaging being sent from an IOT device in the Reefer Containers to Kafka (through an MQTT connector), which is used as our event backbone. The scoring agent will react to that stream of data being sent into a kafka topic through the MicroProfile Reactive Messaging feature that OpenLiberty implements and supports. It will do some computation for each of the messages coming as a stream of data, which in this case is to call the container anomaly predictive system, and send out a container anomaly message a to specific kafka topic where the Reefer Container EDA reference application listens to and will act in consequence.

The main chunk of code that implements the scoring agent use case just described looks like this:

@Incoming("reefer-telemetry")
@Outgoing("containers")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public PublisherBuilder<Message<String>> processTelemetry(Message<String> message) {
// Get the message as String
String input = message.getPayload();
Gson g = new Gson();

We are consuming/producing the data from/to Kafka by simply annotating our method with @Incoming and @Outgoing and leaving the rest of the magic required to create the reactive streams to the MicroProfile Reactive Messaging implementation of the OpenLiberty server. Although there is certain magic in this process, there is still some configuration we need to provide OpenLiberty with in order for it to properly create the reactive streams.

This configuration is provided in the microprofile-configuration.properties file:

# Config specific to reefer-telemetry kafka topic
mp.messaging.incoming.reefer-telemetry.connector=liberty-kafka
mp.messaging.incoming.reefer-telemetry.group.id=reefer-telemetry-reactive
mp.messaging.incoming.reefer-telemetry.topic=reefer-telemetry
mp.messaging.incoming.reefer-telemetry.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.reefer-telemetry.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# Config specific to containers kafka topic
mp.messaging.outgoing.containers.connector=liberty-kafka

We can see the configuration for each of the incoming and outgoing reactive streams based on their names (reefer-telemetry and containers) that match with the Kafka topics these reactive streams will be consuming/producing from/to. We also specify the serializer/deserializer we want MicroProfile Reactive Messaging to apply to our reactive streams for data processing but more importantly is the fact that we are telling OpenLiberty that the connector to be used is the liberty-kafka connector. Otherwise, OpenLiberty would not be able to interact with Kafka.

Finally, we are providing to the liberty-kafka connector the appropriate configuration for OpenLiberty to successfully connect and consume/produce data to Kafka. In the example above we are providing the Kafka bootstrap server but there are other properties that can be set here (such as security) that can be found in the MicroProfile Reactive Messaging Specification