Kafka to IBM MQ with Kafka Connector
This extended scenario supports different labs going from simple to more complex and addresses how to integrate IBM MQ with Event Streams Kafka as part of Cloud Pak for Integration using Kafka Connect with IBM MQ Kafka Connectors.
To run the same Lab with Kafka Confluent see the ibm-cloud-architecture/eda-lab-mq-to-kafka readme file.
- Pre-requisites to all labs
- Lab 1: MQ source to Event Streams Using the Admin Console
- Lab 2: MQ source to Event Streams using GitOps
- Lab 3: MQ Sink from Kafka
- Lab 4: MQ Connector with Kafka Confluent
Audience
We assume readers have good knowledge of OpenShift to login, to naivgate into the Administrator console and use OC and GIT CLIs.
Pre-requisites to all labs
- Access to an OpenShift Cluster and Console
- Login to the OpenShift Console and get the
access token
to useoc login
. - Access to git cli from your workstation
- Clone Lab repository
git clone https://github.com/ibm-cloud-architecture/eda-lab-mq-to-kafka.git
Lab 1: MQ source to Event Streams Using the Admin Console
In this lab we assume Cloud Pak for Integration is deployed with Event Streams cluster created and at least one MQ Broker created. If not see this note to install Event Streams operator and defining one Kafka cluster using the OpenShift Console and this note to install one MQ Operator and one MQ Broker.
The following figure illustrates what we will deploy:

- The Sell Store simulator is a separate application available in the refarch-eda-store-simulator public github and the docker image is accessible from the quay.io registry.
- The Event Streams cluster is named
dev
Get setup
Get Cloud Pak for Integration
admin
user passwordoc get secret platform-auth-idp-credentials -o jsonpath='{.data.admin_password}' -n ibm-common-services | base64 --decode && echo ""Access to the Event Streams console
oc get routes#oc get route dev-ibm-es-ui -o jsonpath='{.spec.host}' && echo ""Use this URL to access to the Event Stream console:
Using the Topic menu, create the
items
topic with 1 partition, use default retention time, and 3 replicasand generate TLS credentials with the name
tls-mq-user
with theProduce messages, consume messages and create topics and schemas
permissions,on
items
topic:All consumer groups, as we may reuse this user for consumers.
You can download the certificates, but in fact it is not necessary as we will use the created secrets to configure MQ Kafka connector.
Get MQ Console route from the namespace where MQ brokers run
oc get routes -n cp4i | grep mq-webAccess to MQ Broker Console > Manage to see the Broker configured
Add the local
items
queueVerify MQ App channels defined
Create an OpenShift project named:
mq-es-demo
with the command:oc new-project mq-es-demo
Clone the git
ibm-cloud-architecture/eda-lab-mq-to-kafka
repository:git clone https://github.com/ibm-cloud-architecture/eda-lab-mq-to-kafkaUpdate the configMap named
store-simulator-cm
in folderkustomize/apps/store-simulator/overlays
for the store simulator:Workloads > ConfigMaps > Create
and use the parameters from MQ_HOST and MQ_CHANNEL.apiVersion: v1kind: ConfigMapmetadata:name: store-simulator-cmdata:APP_VERSION: 0.0.6APP_TARGET_MESSAGING: IBMMQMQ_HOST: store-mq-ibm-mq.cp4i.svcMQ_PORT: "1414"
Deploy Store simulator
Deploy to the OpenShift project
oc project mq-es-demooc apply -k kustomize/apps/store-simulator/overlaysAccess to the application web interface:
oc get route store-simulator -o jsonpath='{.spec.host}' && echo ""
Deploy Kafka Connect
The Event Streams MQ Source connector product documentation describes what need to be done in details. Basically we have to do two things:
- Build the connector image with the needed jars for all the connectos we want to use. For example we need jar files for the MQ connector and jars for the MQ client api.
- Start a specific connector with the parameters to connect to the external system and to kafka.
To make the MQ source connector immediatly useable for the demonstration, we have build a connector image at quay.io/ibmcase/demomqconnect
and the kafkaconnect.yaml is
in this file.
To deploy the kafka connect framework do:
oc apply -f https://raw.githubusercontent.com/ibm-cloud-architecture/eda-lab-mq-to-kafka/main/kustomize/environment/kconnect/es-mq/kafka-connect.yaml
Then start the MQ source connector:
oc apply -f https://raw.githubusercontent.com/ibm-cloud-architecture/eda-lab-mq-to-kafka/main/kustomize/environment/kconnect/es-mq/mq-src-connector.yaml
Demonstration scripts
The solution has multiple stores as presented in the
stores
menu.In the
simulator
menu, select the IBMMQ backend, and send some messages randomly using the left button after selecting the number of messages to send or run the predefined scenario with the right button:In MQ Broker Console go to the
items
queue to see the messages generated from the simulatorIn Event Streams console goes to the
items
topic to see the same messages produced by the Kafka MQ Source connector.
Lab 2: MQ source to Event Streams using GitOps
This labs uses GitOps approach with OpenShift GitOps product (ArgoCD) to deploy IBM Event Streams, MQ, the Store Simulator app, and Kafka Connect in the minimum of work. It can be used on a OpenShift Cluster witout any previously deployed Cloud Pak for Integration opersators. Basically as a SRE you will jumpstart the GitOps operator and then starts ArgoCD.
Clone the
store-mq-gitops
repogit clone https://github.com/ibm-cloud-architecture/store-mq-gitopsFollow the instructions from the repository main Readme file.
Lab 3: MQ Sink from Kafka
A may be less used Kafka connector will be to use the MQ Sink connector to get data from Kafka to MQ. This lab addresses such integration. The target deployment looks like in the following diagram:
Kafka is runnning in its own namespace, and we are using Event Streams from Cloud Pack for Integration.
Setup MQ Channel
Open a shell on the remote container or use the user interface to define the communication channel to use for the Kafka Connection.
- Using CLI: Change the name of the Q manager to reflect what you defined in MQ configuration.
runmqsc EDAQMGR1# Define a app channel using server connectionDEFINE CHANNEL(KAFKA.CHANNEL) CHLTYPE(SVRCONN)# Set the channel authentication rules to accept connections requiring userid and passwordSET CHLAUTH(KAFKA.CHANNEL) TYPE(BLOCKUSER) USERLIST('nobody')SET CHLAUTH('*') TYPE(ADDRESSMAP) ADDRESS('*') USERSRC(NOACCESS)SET CHLAUTH(KAFKA.CHANNEL) TYPE(ADDRESSMAP) ADDRESS('*') USERSRC(CHANNEL) CHCKCLNT(REQUIRED)# Set identity of the client connectionsALTER AUTHINFO(SYSTEM.DEFAULT.AUTHINFO.IDPWOS) AUTHTYPE(IDPWOS) ADOPTCTX(YES)
- As an alternate you can use the MQ administration console.
TBD
Setup Kafka Connect Cluster
Connectors can be added to a Kafka Connect environment using OpenShift CLI commands and the source to image customer resource. We will use the Strimzi operator console to setup kafka connect environment.
Once you downloaded the zip file, which is a yaml manifest, define the configuration for a KafkaConnectS2I instance. The major configuration settings are the server certificate settings and the authentication using Mutual TLS authentication, something like:
spec:bootstrapServers: sandbox-rp-kafka-bootstrap.eventstreams.svc:9093tls:trustedCertificates:- secretName: sandbox-rp-cluster-ca-certcertificate: ca.crtauthentication:type: tlscertificate: user.crt
If you change the name of the connect cluster in the metadata, modify also the name: spec.template.pod.metadata.annotations. productChargedContainers
accordingly.
The secrets used above, need to be accessible from the project where the connector is deployed. The simple way to do so is to copy the source certificates from the Event streams project to your current project with the commands like:
oc get secret sandbox-rp-cluster-ca-cert -n eventstreams --export -o yaml | oc apply -f -oc get secret sandbox-rp-tls-cred -n eventstreams --export -o yaml | oc apply -f -
If you do not have a TLS client certificate from a TLS user, use this note to create one.
- Deploy the connector cluster
oc apply -f kafka-connect-s2i.yaml
An instance of this custom resource represents a Kafka Connect distributed worker cluster. In this mode, workload balancing is automatic, scaling is dynamic, and tasks and data are fault-tolerant. Each connector is represented by another custom resource called KafkaConnector.
oc describe kafkaconnects2i eda-connect-cluster
Add the mq-sink connector
The product documentation details the available MQ connectors and the configuration process. Using the event streams console, the process is quite simple to get a connector configuration as json file. Here is an example of the final form to generate the json file:
- Once the json is downloaded, complete the settings
{"name": "mq-sink","config":{"connector.class": "com.ibm.eventstreams.connect.mqsink.MQSinkConnector","tasks.max": "1","topics": "inventory","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter",
To run the connector within the cluster, we need to connector jar. You can download this jar file from the Event Stream adming console
> Toolkit > Add connector to your Kafka Connect environment > Add Connector > IBM MQ Connectors
,
or as an alternate, we cloned the mq-sink code, so a mvn package
command under kafka-connect-mq-sink
folder will build the jar. Copy this jar under cp4i/my-plugins
folder.
- Build the connector with source to image component.
With the correct credentials for IBM EventStreams and IBM MQ, Kafka Connect should connect to both services and pull data from the EventStreams topic configured to the MQ Queue configured. You will see signs of success in the container output (via oc logs, or in the UI):
+ curl -X POST -H Content-Type: application/json http://localhost:8083/connectors --data @/opt/kafka-connect-mq-sink/config/mq-sink.json...{"name":"mq-sink-connector","config":{"connector.class":"com.ibm.eventstreams.connect.mqsink.MQSinkConnector","tasks.max":"1","topics":"inventory","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.storage.StringConverter","mq.queue.manager":"QM1","mq.connection.name.list":"mq-service(1414)","mq.user.name":"admin","mq.password":"passw0rd","mq.user.authentication.mqcsp":"true","mq.channel.name":"KAFKA.CHANNEL","mq.queue":"INVENTORY","mq.message.builder":"com.ibm.eventstreams.connect.mqsink.builders.DefaultMessageBuilder","name":"mq-sink-connector"},"tasks":[{"connector":"mq-sink-connector","task":0}],"type":"sink"}...[2020-06-23 04:26:26,054] INFO Creating task mq-sink-connector-0 (org.apache.kafka.connect.runtime.Worker:419)...[2020-06-23 04:26:26,449] INFO Connection to MQ established (com.ibm.eventstreams.connect.mqsink.JMSWriter:229)[2020-06-23 04:26:26,449] INFO WorkerSinkTask{id=mq-sink-connector-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:306)
You should now have the Kafka Connector MQ Sink running on OpenShift.
MQ Sink Connector on virtual or baremetal server, MQ and Event Streams on IBM Cloud
In this second option, we are using our own laptop for the baremetal dedployment, but the current solution will work the same on virtual server.
Pre-requisites
We assume that you have an instance of IBM Event Streams already running on IBM Cloud or on OpenShift with at least administrator credentials created. The credentials will be needed later on for configuring the Kafka Connect framework to be able to connect and work with your IBM Event Streams instance. Also, this scenario requires a topic called inventory
in your IBM Event Streams instance. For gathering the credentials and creating the topic required for this scenario, please review the Common pre-requisites. IMPORTANT: if you are sharing your IBM Event Streams instance, append some unique identifier to the topic you create in IBM Event Streams.
Create Local IBM MQ Instance
In this section we are going to use Docker to create a local IBM MQ instance to simulate an IBM MQ instance somewhere in our datacenter.
Create a data directory to mount onto the container.
mkdir qm1dataRun the IBM MQ official Docker image by execting the following command.
docker run \--name mq \--detach \--publish 1414:1414 \--publish 9443:9443 \--publish 9157:9157 \--volume qm1data:/mnt/mqm \--env LICENSE=accept \--env MQ_QMGR_NAME=QM1 \where we can see that out container will be called
mq
, it will run indetached
mode (i.e. in the background), it will expose the ports IBM MQ uses for communication and the image we are actually runningibmcom/mq
.You could make sure your IBM MQ Docker image is running by listing the Docker containers in your workstation.
$ docker psCONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMESa7b2a115a3c6 ibmcom/mq "runmqdevserver" 6 minutes ago Up 6 minutes 0.0.0.0:1414->1414/tcp, 0.0.0.0:9157->9157/tcp, 0.0.0.0:9443->9443/tcp mqYou should also be able to log into the MQ server on port 9443 (https://localhost:9443) with default user
admin
and passwordpassw0rd
.
Now, we need to configure our local IBM MQ instance Queue Manager in order to define a server connection (KAFKA.CHANNEL
) with authentication (user admin
, password admin
) and the queue (INVENTORY
) where our messages from Kafka will be sinked to. And we are going to so it using the IBM MQ CLI.
Get into the IBM MQ Docker container we have started above by executing the following command that will give us a
bash
interactive terminal:docker exec -ti mq bashNow that we are in the container, start the queue manager
QM1
by executing:strmqm QM1Start the
runmqsc
tool to configure the queue manager by executing:runmqsc QM1Create a server-connection channel called
KAFKA.CHANNEL
by executing:DEFINE CHANNEL(KAFKA.CHANNEL) CHLTYPE(SVRCONN)Set the channel authentication rules to accept connections requiring userid and password by executing:
SET CHLAUTH(KAFKA.CHANNEL) TYPE(BLOCKUSER) USERLIST('nobody')SET CHLAUTH('*') TYPE(ADDRESSMAP) ADDRESS('*') USERSRC(NOACCESS)SET CHLAUTH(KAFKA.CHANNEL) TYPE(ADDRESSMAP) ADDRESS('*') USERSRC(CHANNEL) CHCKCLNT(REQUIRED)Set the identity of the client connections based on the supplied context, the user ID by executing:
ALTER AUTHINFO(SYSTEM.DEFAULT.AUTHINFO.IDPWOS) AUTHTYPE(IDPWOS) ADOPTCTX(YES)Refresh the connection authentication information by executing:
REFRESH SECURITY TYPE(CONNAUTH)Create the
INVENTORY
queue for the connector to use by executing:DEFINE QLOCAL(INVENTORY)Authorize
admin
to connect to and inquire the queue manager by executing:SET AUTHREC OBJTYPE(QMGR) PRINCIPAL('admin') AUTHADD(CONNECT,INQ)Finally authorize
admin
to use the queue by executing:SET AUTHREC PROFILE(INVENTORY) OBJTYPE(QUEUE) PRINCIPAL('admin') AUTHADD(ALLMQI)End
runmqsc
by executing:END
You should see the following output:
9 MQSC commands read.No commands have a syntax error.One valid MQSC command could not be processed.
Exit the container by executing:
exit
If you check your IBM MQ dashboard you should see your newly created INVENTORY
queue:
Create MQ Kafka Connector Sink
The MQ Connector Sink can be downloaded from this Github. The Github site includes exhaustive instructions for further detail on this connector and its usage.
Clone the repository with the following command:
git clone https://github.com/ibm-messaging/kafka-connect-mq-sink.gitChange directory into the
kafka-connect-mq-sink
directory:cd kafka-connect-mq-sinkBuild the connector using Maven:
mvn clean packageCreate a directory (if it does not exist yet) to contain the Kafka Connect framework configuration and cd into it.
mkdir configcd configCreate a configuration file called
connect-distributed.properties
for the Kafka Connect framework with the following properties in it:# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.bootstrap.servers=REPLACE_WITH_YOUR_BOOTSTRAP_URLssl.enabled.protocols=TLSv1.2ssl.protocol=TLSssl.truststore.location=/opt/kafka/config/es-cert.p12ssl.truststore.password=REPLACE_WITH_YOUR_PKCS12_CERTIFICATE_PASSWORDssl.truststore.type=PKCS12security.protocol=SASL_SSLsasl.mechanism=SCRAM-SHA-512IMPORTANT: You must replace all occurences of the following placeholders in the properties file above with the appropriate values for the Kafka Connect framework to work with your IBM Event Streams instance:
REPLACE_WITH_YOUR_BOOTSTRAP_URL
: Your IBM Event Streams bootstrap url.REPLACE_WITH_YOUR_PKCS12_CERTIFICATE_PASSWORD
: Your PCKS12 TLS certificate password.REPLACE_WITH_YOUR_SCRAM_USERNAME
: Your SCRAM service credentials username.REPLACE_WITH_YOUR_SCRAM_PASSWORD
: Your SCRAM service credentials password.REPLACE_WITH_UNIQUE_IDENTIFIER
: A unique identifier so that the resources your kafka connect cluster will create on your IBM Event Streams instance don’t collide with other users’ resources (Note: there are 4 placeholders of this type. Replace them all).
Review the Common pre-requisites instructions if you don’t know how to find out any of the config properties above.
Download the IBM Event Streams TLS certificate so that your Kafka Connect framework local instance can establish secure communication with your IBM Event Streams instance. IMPORTANT: download the PKCS12 certificate. How to get the certificate in the Common pre-requisites section.
Create a log4j configuration file named
connect-log4j.properties
based on the template below.log4j.rootLogger=DEBUG, stdoutlog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%nlog4j.logger.org.apache.kafka=INFOBack out of the
config
directory to thekafka-connect-mq-sink
directory:cd ..Build the Docker image for your Kafka Connect framework that will contain the IBM MQ Sink connector and all the properties files you have created and tailored earlier so that your Kafka Connect framework can work with your IBM Event Streams instance we have set up previously in this exercise (mind the dot at the end of the command. It is necessary):
docker build -t kafkaconnect-with-mq-sink:1.3.0 .Run the Kafka Connect MQ Sink container.
docker run \--name mq-sink \--detach \--volume $(pwd)/config:/opt/kafka/config \--publish 8083:8083 \--link mq:mq \kafkaconnect-with-mq-sink:1.3.0Now that we have a Kafka Connect framework running with the configuration to connect to our IBM Event Streams instance and the IBM MQ Sink Connector jar file in it, is to create a JSON file called
mq-sink.json
to startup an instance of the IBM MQ Sink connector in our Kafka Connect framework with the appropriate config to work read messages from the IBM Event Streams topic we desire and sink those to the IBM MQ instance we have running locally. IMPORTANT: If you are sharing your IBM Event Streams instance and followed the instructions on this readme, you should have appended a unique identifier to the name of the topic (inventory
) that you are meant to create in IBM Event Streams. As a result, modify the line"topics": "inventory"
in the following JSON object accordignly.{"name": "mq-sink","config":{"connector.class": "com.ibm.eventstreams.connect.mqsink.MQSinkConnector","tasks.max": "1","topics": "inventory","key.converter": "org.apache.kafka.connect.storage.StringConverter",Last piece of the puzzle is to tell our Kafka Connect framework to create and startup the IBM MQ Sink connector based on the configuration we have in the previous json file. To do so, execute the following
POST
request.curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors --data "@./mq-sink.json"# The response returns the metadata about the connector{"name":"mq-sink","config":{"connector.class":"com.ibm.eventstreams.connect.mqsink.MQSinkConnector","tasks.max":"1","topics":"inventory","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.storage.StringConverter","mq.queue.manager":"QM1","mq.connection.name.list":"ibmmq(1414)","mq.user.name":"admin","mq.password":"passw0rd","mq.user.authentication.mqcsp":"true","mq.channel.name":"KAFKA.CHANNEL","mq.queue":"INVENTORY","mq.message.builder":"com.ibm.eventstreams.connect.mqsink.builders.DefaultMessageBuilder","name":"mq-sink"},"tasks":[{"connector":"mq-sink","task":0}],"type":"sink"}You can query your Kafka Connect framework to make sure of this. Execute the following command that should return all the connectors up and running in your Kafka Connect cluster.
curl localhost:8083/connectors["mq-sink"]
You should now have a working MQ Sink connector getting messages from your topic in IBM Event Streams and sending these to your local IBM MQ instance. In order to check that out working, we first need to send few messages to that IBM Event Streams topic and then check out our INVENTORY
queue in our local IBM MQ instance.
In order to send messages to IBM Event Streams, we are going to use the IBM Event Streams Starter application. You can find the instruction either in the IBM Event Streams official documentation [here] or on the IBM Event Streams dashboard:
Follow the instructions to run the IBM Event Streams starter application from your workstation. IMPORTANT: When generating the properties for your IBM Event Streams starter application, please choose to connect to an existing topic and select the topic you created previously as part of this exercise so that the messages we send into IBM Event Streams end up in the appropriate topic that is being monitored by your IBM MQ Sink connector runnning on the Kafka Connect framework.
Once you have your application running, open it up in your web browser, click on
Start producing
, let the application produce a couple of messages and then click onStop producing
Check out your those messages got into the Kafka topic
Check out your messages in the Kafka topic have already reached your
INVENTORY
queue in your local IBM MQ instance
You could also inspect the logs of your Kafka Connect Docker container running on your workstation:
docker logs mq-sink...[2021-01-19 19:44:17,110] DEBUG Putting record for topic inventory, partition 0 and offset 0 (com.ibm.eventstreams.connect.mqsink.MQSinkTask:89)[2021-01-19 19:44:17,110] DEBUG Value schema Schema{STRING} (com.ibm.eventstreams.connect.mqsink.builders.DefaultMessageBuilder:60)[2021-01-19 19:44:18,292] DEBUG Flushing up to topic inventory, partition 0 and offset 1 (com.ibm.eventstreams.connect.mqsink.MQSinkTask:110)[2021-01-19 19:44:18,292] INFO WorkerSinkTask{id=mq-sink-0} Committing offsets asynchronously using sequence number 636: {inventory-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:346)[2021-01-19 19:44:18,711] DEBUG Putting record for topic inventory, partition 0 and offset 1 (com.ibm.eventstreams.connect.mqsink.MQSinkTask:89)[2021-01-19 19:44:18,711] DEBUG Value schema Schema{STRING} (com.ibm.eventstreams.connect.mqsink.builders.DefaultMessageBuilder:60)[2021-01-19 19:44:20,674] DEBUG Putting record for topic inventory, partition 0 and offset 2 (com.ibm.eventstreams.connect.mqsink.MQSinkTask:89)
To cleanup your environment, you can do
Remove the connector from your Kafka Connect framework instance (this isn’t really needed if you are going to stop and remove the Kafka Connect Docker container)
curl -X DELETE http://localhost:8083/connectors/mq-sinkStop both IBM MQ and Kakfa Connect Docker containers running on your workstation
docker stop mq mq-sinkRemove both IBM MQ and Kakfa Connect Docker containers from your workstation
docker rm mq mq-sink
Lab 4: MQ Connector with Kafka Confluent
Clone the git
ibm-cloud-architecture/eda-lab-mq-to-kafka
repository and follow the readme.git clone https://github.com/ibm-cloud-architecture/eda-lab-mq-to-kafka