Skip to main contentIBM Garage Event-Driven Reference Architecture

Getting a starting environment to develop EDA solution

We are presenting in this note how to get development and deployment environments to start developing Event Driven microservice solution. We assume OpensShift deployment and Java as the main programming language.

For Kafka deployment Cloud Pak for integration offers choices, from proprietary solution like Confluent, to open source real Kubernetes Operator like Strimzi or AMQ Streams.

Infrastructure for dev Integration test

  • OpenShift
    • OpenShift Container Platform Installation Documentation
    • OpenShift Container Platform is flexible and can be installed in a number of different environments - onprem, cloud, and hybrid environments.
    • OpenShift v4.4.x and newer is required for CP4I2020.2.1 (and subsequently Event Streams v10.0) and CP4Apps v4.2.x
  • Cloud Pak Operators
  • Cloud Pak for Integration
    • Install Cloud Pak for Integration operator using the Operator hub catalog
    • Install Cloud Pak for Integration platform navigator from the operator hub catalog
    • The previous steps should have installed the common services if they were not installed before. So get the admin password via the platform-auth-idp-credentials secret in the ibm-common-services project. oc get secret platform-auth-idp-credentials -o jsonpath='{.data.admin_password}' | base64 --decode
    • More information on CP4I Installation

Deploying Event Streams

  • The instructions are in the product documentation, and are very simple using the IBM Event Streams operator. Select minimal configuration with persistence.

  • Here is an example of the yaml. Note that there are a few sample yamls that come after you install the Event Streams Operator. This yaml is for Event Streams v10.0 -

apiVersion: eventstreams.ibm.com/v1beta1
kind: EventStreams
metadata:
name: minimal-prod
namespace: cp4i
spec:
version: 10.0.0
license:
accept: false

Deploy Strimzi

  • Strimzi is a very powerful and useful distributed Kafka deployment built for use with Kubernetes and OpenShift.
  • On OpenShift Container Platform v4.0.x to 4.3.x as well as pre-Event Streams v10 the Strimzi Operator will serve most of our needs. It can serve as the base so that we can utilize technologies like KafkaConnect, KafkaConnectS2i, KafkaConnector, Mirror Maker2 and other Custom Resources.
  • You can install it via Operator Hub
  • You can also install the Strimzi and it’s Cluster Operator by applying a yaml file through CLI.

Java Developer Environment

  • Go with Quarkus so all being set with maven plugin.
  • You can scaffold your application through the Quarkus Web UI which will allow you to pick and choose your project dependencies. You may also do it through the CLI like so:
mvn io.quarkus:quarkus-maven-plugin:1.8.1.Final:create \
-DprojectGroupId=ibm.garage \
-DprojectArtifactId=your-application \
-Dextensions="kafka,kafka-streams,quarkus-kafka-streams"
  • If you already have your project created and you know the name of an extension you want ot add, you can do it through the CLI like the following -
./mvnw quarkus:add-extension -Dextensions="kafka"
  • Here’s a very simple Quarkus Producer application utilizing MicroProfile Reactive Messaging that sends messages to Event Stream v10 and newer:

Producer.java

package com.ibm.garage.infrastructure;
import io.reactivex.Flowable;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.util.Random;
import java.util.concurrent.TimeUnit;
  • Now we have a simple Quarkus consumer, also using MicroProfile Reactive Messaging and printing the message.

Consumer.java

package com.ibm.garage.infrastructure;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.enterprise.context.ApplicationScoped;
  • Quarkus does it’s configuration via an application.properties file within the src/main/resources/ path. A sample properties file.
quarkus.http.port=8080
quarkus.log.console.enable=true
quarkus.log.console.level=INFO
# Base ES Connection Details
mp.messaging.connector.smallrye-kafka.bootstrap.servers=${BOOTSTRAP_SERVERS}
mp.messaging.connector.smallrye-kafka.security.protocol=SASL_SSL
mp.messaging.connector.smallrye-kafka.ssl.protocol=TLSv1.2
mp.messaging.connector.smallrye-kafka.sasl.mechanism=SCRAM-SHA-512
  • There are a few environment variables we need to pass to our properties before we can run it. Replace the values which can be retrieved from the Event Streams on CP4I UI.
export BOOTSTRAP_SERVERS=your-external-bootstrap-address:xxxx \
export SCRAM_USERNAME=your-scram-username \
export SCRAM_PASSWORD=your-scram-password \
export TOPIC_NAME=your-topic \
export CERT_LOCATION=/your-path-to-cert/es-cert.p12 \
export CERT_PASSWORD=your-cert-password
  • To run your applications run the following which will allow hot-reloading (if that’s a functionality you need)
./mvnw quarkus:dev
  • Kafka Strimzi image for docker and docker-compose to get up and running quickly. We have different docker composes files for you to start with:
    • One Broker, one Zookeeper, kafka 2.5
    • TODO

Python Developer Environment

  • There are a few Python packages but the Confluent Kafka Python package can serve our needs.
  • For Python environments you can use the Confluent Python package and install the dependency with pip.

pip install confluent-kafka

  • The following is a very simple Python Producer that will serve as the scaffold:

KafkaProducer.py

import json, os
from confluent_kafka import KafkaError, Producer
class KafkaProducer:
def __init__(self, groupID = "KafkaProducer"):
# Get the producer configuration
self.producer_conf = self.getProducerConfiguration(groupID)
# Create the producer
  • Now that we have the Producer we need to actually send Events.

ProducePlainMessage.py

import argparse
from KafkaProducer import KafkaProducer
if __name__ == '__main__':
# Parse arguments
parser = argparse.ArgumentParser(description="Message Producer Example")
parser.add_argument('-t', dest="topic", required=True, help="Topic name")
args = parser.parse_args()
  • The following is a simple Python consumer

KafkaConsumer.py

import json,os
from confluent_kafka import Consumer, KafkaError
class KafkaConsumer:
def __init__(self, topic_name = "kafka-producer", groupID = 'KafkaConsumer', autocommit = True):
# Get the consumer configuration
self.consumer_conf = self.getConsumerConfiguration(groupID, autocommit)
  • Now for the plain Kafka Python consumer

ConsumePlainMessage.py

import argparse
from KafkaConsumer import KafkaConsumer
####################### MAIN #######################
if __name__ == '__main__':
# Parse arguments
parser = argparse.ArgumentParser(description="Message Consumer Example")
parser.add_argument('-t', dest="topic", required=True, help="Topic name")
  • In KafkaProducer.py as well as KafkaConsumer.py we will need to provide environment variables so that our producer can parse it and actually connect to an Event Streams intance.
export KAFKA_BROKERS=your-brokers \
export KAFKA_USER=your-scram-username \
export KAFKA_PASSWORD=your-scram-password \
export KAFKA_CERT=your-cert-path
  • To run your Producer and send a simple message -
python ProducePlainMessage.py -t <your-topic-name>
  • To consume
python ConsumePlainMessage.py -t <your-topic-name>

Node.js Developer Environment

  • The following code is based off this application here

  • We will make use of the node-rdkafka library for our purposes. Therefore we will need to install the dependency for the project first:

npm install node-rdkafka --save
  • Here is a simple Node.js producer application that will produce 20 messages to the Kafka topic.

producer.js

var Kafka = require('node-rdkafka');
var kafka_options = {
//'debug': 'all',
'metadata.broker.list': process.env.KAFKA_BROKERS,
'security.protocol': 'sasl_ssl',
'sasl.mechanisms': 'SCRAM-SHA-512',
'sasl.username': process.env.SCRAM_USERNAME,
'sasl.password': process.env.SCRAM_PASSWORD,
  • Below is a simple Node.js consumer:

consumer.js

var Kafka = require('node-rdkafka');
var kafka_options = {
//'debug': 'all',
'metadata.broker.list': process.env.KAFKA_BROKERS,
'security.protocol': 'sasl_ssl',
'sasl.mechanisms': 'SCRAM-SHA-512',
'sasl.username': process.env.SCRAM_USERNAME,
'sasl.password': process.env.SCRAM_PASSWORD,
  • Like the Python environments, we will need a SCRAM Username, password, and the PEM Certificate. We will need a few environment variables to provide to our application so that it can connect to Event Streams.
export KAFKA_BROKERS=your-external-bootstrap-server-address \
export SCRAM_USERNAME=your-scram-username \
export SCRAM_PASSWORD=your-scram-password \
export TOPIC_NAME=your-topic-name \
export PEM_PATH=/path-to-your-pem-certificate/es-cert.pem
  • To run these applications after exporting the necessary environment variables:
node producer.js
node consumer.js

Golang Developer Environment

  • Similar to the Python Developer environment, we can leverage the Confluent-kafka-go library.
  • If using Go/Golang v 1.13 and newer you can get it using Go Modules by importing via github.
    • import "github.com/confluentinc/confluent-kafka-go/kafka"
    • go build ./...
  • Otherwise if you cannot use Go modules you can manually install it
    • go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
    • import "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
    • Note that our sample producer and consumer uses this option.
  • Below is a simple Go Kafka producer that sends 7 Test messages.

producer.go

package main
import (
"fmt"
"os"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
  • Also we have a simple consumer application:

consumer.go

package main
import (
"fmt"
"os"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
  • Like all the previous developer environments (Java, Python and Node.js) we will need to set environment variables for our application. Replace the values below with your own.
export KAFKA_BROKERS=your-external-bootstrap-address:443 \
export SCRAM_USERNAME=your-scram-username \
export SCRAM_PASSWORD=your-scram-pasword \
export PEM_PATH=/path-to-pem-cert/es-cert.pem \
export TOPIC_NAME=your-topic
  • Now you can test the applications.
go run consumer.go
go run producer.go