Skip to main contentIBM Garage Event-Driven Reference Architecture

Real-time Inventory, powered by AMQ Streams

What you will learn

  • Use Quarkus, with reactive programming API like Mutiny, with Kafka API to produce messages to Kafka
  • Same Quarkus app can generate messages to RabbitMQ using the AMQP API
  • Same Quarkus app can generate messages to IBM MQ using JMS
  • Use Quarkus and Kafka Streams to compute aggregates to build an inventory view from the stream of sale order events
  • Use Tekton to build each service
  • Use the RabbitMQ source connector from IBM Event messaging open source contribution
  • Use the IBM MQ sink connector from IBM Event messaging open source contribution
  • Use the JDBC sink connector from IBM Event messaging open source contribution

Overview

This scenario implements a simple real time inventory management solution based on real life MVPs we developed in 2020. Stores are sending their sale transactions to a central messaging platform, based on queues or topics, and with the adoption of loosely coupled microservices, real time analytics Kafka is part of the architecture. Adopting Kafka Connect helps to integrate with existing applications without any changes to their base code.

hl view

This scenario addresses multiple use cases which build an end-to-end modern data pipeline solution from source to different potential sinks:

  • The store simulator injects directly sell events to Kafka to the items topic
  • The store simulator can also generate message to IBM MQ using JMS API or to RabbitMQ using AMQP protocol. So we can demonstrate Kafka connectors with RabbitMQ source or IBM MQ source.
  • When messages are sourced to Queues, then a Kafka Source Connector is used to propagate message to items topics.
  • The Inventory computation is done using Kafka Stream component which produces inventory items and keep, in-memory a stock per items. This item inventory aggregator is exposed by API and uses the interactive query capability of Kafka Streams to get item stock via HTTP.
  • Different sink connectors can be plugged into this item.inventory topic to move data to DB2, Elastic Search, Cloud Object Storage…
  • To compute store inventory, another aggregator is developed to generate aggregates per store on the store.inventory topic.

We try to make the business scenario, easily demonstrable by enablind developer’s laptop execution with docker compose or use a simple free OpenShift Cluster on IBM cloud.

In real life, an as-is solution will include back-end applications to manage the warehouses inventory, connected to a fulfillment home built application, combined with store applications and servers, e-commerce suite, and a set of SOA services acting as backend systems.

as is soa

We may have integration flows to do data mapping, most of those calls are synchronous and to get one item availability, a lot of SOAP calls are done, increasing latency, and risk of failure. There is an interesting video from Scott Havens explaining the needs from transitioning from a synchronous architecture to an event driven asynchronous one when scaling is a must. This lab reflects this approach.

Component view

At the high level Kafka Connect is used to integrate external systems into Kafka. For example external systems can inject item sale messages to queue, from which a first Kafka source connector publishes the messages to a Kafka topic, which then will be processed by a series of event-driven microservices down to a final topic, which will be used by Sink connectors to send records to other external systems.

1

All the components of this scenario are ready to run on OpenShift, but we are also providing different docker compose files to run all of those components on your local computer. The important body of knowledge of this scenario is related to the programming model we used, and the Kafka Connect configuration and code.

Solution anatomy

  1. The store simulator application is a Quarkus app, which generates item sales to different possible messaging middlewares ( RabbitMQ, MQ or directly to Kafka). The code of this application is in this https://github.com/ibm-cloud-architecture/refarch-eda-store-simulator. If you want to browse the code, the main readme of this project includes how to package and run this app with docker compose, and explains how the code works. The docker image is quay.io/ibmcase/eda-store-simulator/
  2. The item inventory aggregator is a Kafka Stream Quarkus application, done with Kafka Stream API. The source code is in the refarch-eda-item-inventory project. Consider this more as a black box in the context of the scenario, it consumes items events, aggregate them, expose APIs on top of Kafka Streams interactive queries and publish inventory events on item.inventory topic.
  3. The store inventory aggregator is a Kafka Stream Quarkus application, done with Kafka Stream API. The source code is in the refarch-eda-store-inventory project the output is in store.inventory topic.
  4. The mock up Inventory mainframe application is not implemented and we will use the MQ tools to view the message in the item.inventory MQ queue.

General pre-requisites

  • Docker and docker compose to run the solution locally.
  • git CLI.
  • Get access to an OpenShift Cluster
  • OpenShift CLI on your local environment.
  • jq on your local environment.
  • Clone the Inventory lab repository:
oc clone https://github.com/ibm-cloud-architecture/eda-lab-inventory.git

Lab 1: Demonstrate real-time inventory with AMQ Streams

In this lab we will deploy the solution using AMQ Streams on OpenShift. The diagram looks like the following:

inventory components

The Kafka version is AMQ Streams 2.7.

The deployment to OpenShift using Kafka AMQ Streams operator, and gitops approach is described in dedicated lab you can read the details in the Kafka Stream use case section.

Here are the simplest steps to deploy the solution in minimum configuration:

  1. Login to the OpenShift cluster (4.6 version): oc login --token=sha256~d... --server=https://....containers.cloud.ibm.com...

  2. Work from the cloned repository: cd eda-lab-inventory

  3. Define the environment variables for your deployment in the: scripts/env-AMQStreams.sh. You mostly want to change the following variables or use our default settings:

    YOUR_PROJECT_NAME=rt-inventory

    as of now the admin user and password are not used… may be in the future.

  4. Start the deployment with the script ./scripts/deployInventorySolutionWithAMQStreams.sh --skip-login. This should create AMQ streams cluster, and deploy the 4 applications.

    oc project rt-inventory
    oc get pods
    NAME READY STATUS RESTARTS AGE
    amq-kafka-cruise-control-6d6bf8b774-99rwl 2/2 Running 0 4d
    amq-kafka-entity-operator-75f7bc8f5c-x4vkt 3/3 Running 0 4d
    amq-kafka-kafka-0 1/1 Running 0 4d
    amq-kafka-kafka-1 1/1 Running 0 4d
    amq-kafka-kafka-2 1/1 Running 0 4d

Demonstration script for the solution

We will first go over the demonstration using the store simulator then using and end to end test script

  1. Go to the Simulator user interface using the route like: https://store-simulator-rt-inventory.dte-ocp46-73awfj-915b3b336cabec458a7c7ec2aa7c625f-0000.us-east.containers.appdomain.cloud

    To get this route use the following command:

    oc get routes store-simulator -o jsonpath="{.spec.host}" && echo
    simulator home

    The simulator will send random sell events for the stores as listed in the Stores table (the content may change in future release):

    stores view
  2. Send some events by selecting the toggle Kafka and then the number of records to send:

    run simulation

    once started a table should be displayed to present the records sent to Kafka.

  3. Let assess if we can see the item stock cross stores: use the item-aggregator route, something like item-aggregator-rt-inventory.dte-ocp46-73awfj-915b3b336cabec458a7c7ec2aa7c625f-0000.us-east.containers.appdomain.cloud but completed with ‘/q/swagger-ui’ as we want to access the API

    To get this route use the following command:

    oc get routes item-aggregator -o jsonpath="{.spec.host}" && echo

    Select the get /api/v1/items/{itemID} operation:

    items stock req

    Use one of the following item id: [Item_1, Item_2, Item_3, Item_4, Item_5, Item_6, Item_7]. You should get the current stock cross stores

    item stock response
  4. Let assess a store stock, for that we access the store aggregator URL: store-aggregator-rt-inventory.dte-ocp46-73awfj-915b3b336cabec458a7c7ec2aa7c625f-0000.us-east.containers.appdomain.cloud with the /q/swagger-ui suffix.

    To get this route use the following command:

    oc get routes store-aggregator -o jsonpath="{.spec.host}" && echo

    Then use the GET on the api/v1/stores/inventory/{storeID}, and enter one of the available store: [Store_1, Store_2, Store_3, Store_4, Store_5]

    store stock req

    The response should look like:

Lab 2: Deploy the MQ Source Connector

This lab uses the store simulator to generate events to MQ queue. The component view may look like the following figure:

lab2 mq kafka

The major difference with previous lab, is the addition of the IBM MQ Source Kafka connector to move records from Items queue to items Kafka topic.

This lab is defined in the MQ to Kafka repository with instructions on how to run it locally or on OpenShift.

Lab 3: Deploy the MQ Sink Connector

Following the MQ Sink Connector use case, we can deploy the MNQ Sink connector to OpenShift, connecting your source Kafka environment and topic to the target MQ queue manager.

! This lab needs a refresh

Lab 4: Deploy the JDBC Sink Connector

Same approach as abovee but now, we can follow the JDBC Sink Connector use case to deploy the JDBC sink connector runtime, connecting your source Kafka environment and topic to the target DB2 database. This project uses a simple Inventory App to present the content of the DB2 tables. The component view looks like in the following diagram:

4

! This lab needs a refresh

Lab 5: RabbitMQ Source Connector to Kafka items topic

Follow the RabbitMQ Source Connector lab to deploy the Kafka connector runtime, and deploy the RabbitMQ source connector configuration so it can get messages from RabbitMQ items queue to the target Kafka items topic.

2