Kafka Connect to IBM COS

!!! Updated 12/10/2021


Now that you have an Event Streams instance installed on Cloud Pak for Integration on top of OpenShift Container Platform the goal of this story is to show a possible use case that we can use with this technology. With IBM Event Streams we have access to the powerful capabilities of Kafka in addition to all the monitoring and logging capabilities that IBM provides on top of that with Event Streams.

We will create a simple Quarkus (a super sonic and sub-atomic Kubernetes native framework for Java) application that utilizes MicroProfile Reactive Messaging in order for us to send a stream of data to our Event Streams/Kafka topic. We will then create a Kafka Connect cluster using the Strimzi Operator. Lastly we’ll send messages to an Event Streams topic from our Quarkus application which then triggers the IBM COS Connector to grab messages and place into an IBM COS Bucket.

Architecture Diagram

Scenario Prerequisites

OpenShift Container Platform Cluster

  • This scenario will assume you have a 4.7+ Cluster as we will make use of Operators.

Cloud Pak for Integration

  • Updated with 2021.0.x release of the Cloud Pak for Integration installed on OpenShift. This story will also assume you have followed the installation instructions for Event Streams outlined in the 2021-3 product documentation or from the Cloud Pak Playbook and have a working Event Streams instance.


  • Java Development Kit (JDK) v1.11+ (Java 11+)


  • The scenario uses Maven v3.6.3


  • Ideally v4.0+ (Note - the gradle shadowJar command might not work on Java versions newer to Java 8)

An IDE of your choice

  • Visual Studio Code is used in this scenario.


  • We will need to clone repositories.

An IBM Cloud Account (free)

A Reactive messaging producer app

In this section, we are going to create a producer application to send messages into the IBM Event Streams topic we created in the previou section. This application will be a Java application based on Quarkus, a supersonic subatomic Java framework, which will also make use of the MicroProfile Reactive Messaging specification.

You have two choices for this application, reuse the existing code as-is or start by using quarkus CLI.

  • Clone the eda-quickstarts repository.

    git clone
    cd cos-tutorial
    quarkus dev
  • If you want to start on your own, then create the Quarkus project.

    quarkus create app cos-tutorial
    cd cos-tutorial
    quarkus ext add reactive-messaging-kafka, mutiny, openshift

    and get inspiration from the

  • Review the file.

    • The @Channel annotation indicates that we’re sending to a channel defined with reactive messaging in the
    • The startDemo() function generate n orders and send to the channel.
  • The reactive messaging is defined in the The setting are using the a kafka cluster named dev, we encourage to keep this name or you need to modify a lot of yaml files.

  • See the COS tutorial README to run the producer application locally with quarkus dev.

Deploy the application

The kustomize folder contains yaml manifests to deploy Event Streams and the application to OpenShift.

  • If you want to deploy Event Streams use the following commands:

    oc new-project eda-cos
    oc apply -k kustomize/services/es
  • Deploy the app to your OpenShift project. Modify the configmap.yaml in kustomize/apps/eda-cos-demo/base folder to point to your Kafka Bootstrap URL

    apiVersion: v1
    kind: ConfigMap
    name: eda-cos-demo-cm
    APP_VERSION: 0.0.2

    then deploy:

    oc apply -k kustomize/apps/eda-cos-demo/base

Create an IBM COS Service and COS Bucket

In this section, we are going to see how to create an IBM Cloud Obeject Storage (IBM COS) Service in your IBM Cloud account and a bucket within your IBM COS Service. We assume you already have an IBM Cloud account already and, if not, you can sign up for one here at IBM Cloud.

  1. Once you are inside your IBM Cloud account, traverse to the Catalog section. In the search type in IBM Cloud Object Storage

    IBM COS Catalog Search
  2. Name your IBM COS Service with something unique. Since this is a free account, we can stick with the Lite Plan.

    IBM COS Create COS Service
  3. Now that the IBM Cloud Object Storage Service is created, traverse to it and let’s create a new bucket. On the Create Bucket screen pick Custom Bucket.

    IBM COS Custom Bucket
  4. When selecting options for the bucket, name your bucket something unique. For Resiliency let’s select Regional. For location select an area from the drop-down that you want. IMPORTANT: for Storage Class select Standard. The IBM COS Sink connector seems to not play well with buckets that are created with the Smart Tier Storage Class. Leave everything else as-is and hit Create Bucket.

    IBM COS Custom Bucket Settings

Create IBM COS Service Credentials

Now that we have created our IBM Cloud Object Storage Service and bucket created, we now need to create the Service Credential so that we can connect to it.

  1. Inside your IBM COS Service, select Service Credentials and then click the New Credential button.

    IBM COS Service Credential
  2. Name your credential and select Manager from the Role: drop-down menu and click Add.

    IBM COS SC Settings
  3. Expand your newly created Service Credential and write down the values for "apikey" and "resource_instance_id". You will need this later in the Build and Apply IBM COS Sink Connector section.

    Expanded Service Cred

Set up the Kafka Connect Cluster

In this section, we are going to see how to deploy a Kafka Connect cluster on OpenShift which will be the engine running the source and sink connector we decide to use for our use case. IMPORTANT: We assume you have deployed your IBM Event Streams instance with an internal TLS secured listener which your Kafka Connect cluster will use to connect. For more detail about listeners, check the IBM Event Streams documentation here.

If you inspect your IBM Event Streams instance by executing the following command:


You should see a Tls listener:


Now, follow the next steps in order to get your Kafka Connect cluster deployed:

  1. Go to you IBM Event Streams dashboard, click on the Find more on the toolbox option.

  2. Click on the Set up button for the Set up a Kafka Connect environment option.

  3. Click on Download Kafka Connect ZIP button.

  4. The above downloads a zip file which contains a kafka-connect-s2i.yaml file. Open that yaml file and take note of the productID and cloudpakId values as you will need these in the following step.

  5. Instead of using the previous yaml file, create a new kafka-connect-s2i.yaml file with the following contents:

    kind: KafkaConnectS2I
    annotations: "true"
    type: external

    where you will need to replace the following placeholders with the appropriate values for you IBM Event Streams cluster and service credentials:

    • YOUR_KAFKA_CONNECT_CLUSTER_NAME: A name you want to provide your Kafka Connect cluster and resources with.
    • YOUR_INTERNAL_BOOTSTRAP_ADDRESS: This is the internal bootstrap address of your IBM Event Streams instance. You can review how to find this url here. Use the internal bootstrap address which should be in the form of YOUR_IBM_EVENT_STREAMS_INSTANCE_NAME-kafka-bootstrap.eventstreams.svc:9093: connect6
    • YOUR_TLS_CREDENTIALS_SECRET: This is the name you give to your TLS credentials for your internal IBM Event Streams listener when you click on Generate TLS credentials: connect7
    • YOUR_CLUSTER_TLS_CERTIFICATE_SECRET: This is the secret name where IBM Event Streams stores the TLS certificate for establishing secure communications. This secret name is in the form of YOUR_IBM_EVENT_STREAMS_INSTANCE_NAME-cluster-ca-cert. You can always use oc get secrets to list all the secrets.
    • YOUR_PRODUCT_ID: This is the productID value you noted down earlier.
    • YOUR_CLOUDPAK_ID: This is the cloudpakID value you noted earlier.
  6. Deploy your Kafka Connect cluster by executing

    oc apply -f kafkaconnect-s2i.yaml
  7. If you list the pods, you should see three new pods: one for the Kafka Connect build task, another for the Kafka Connect deploy task and the actual Kafka Connect cluster pod.

    oc get pods
    YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-1-build 0/1 Completed 0 18m
    YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-1-deploy 0/1 Completed 0 17m
    YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-1-xxxxx 1/1 Running 0 17m

Build and Inject IBM COS Sink Connector

The IBM COS Sink Connector source code is availabe at this repository here.

IMPORTANT: Make sure you have Java 8 installed on your workstation and that is the default Java version of your system since the IBM COS Sink Connector can only be built with that version of Java.

  1. Clone the Kafka Connect IBM COS Source Connector repository and then change your folder.

    git clone
    cd kafka-connect-ibmcos-sink/

IMPORTANT Part Two: Depending on your Gradle version you have installed on your machine you will need to update the connector’s gradle build file. For example Gradle v7.x the build.gradle file should look something like this as the compile() method is deprecated in newer versions. You can downgrade your Gradle version if you so choose. This is the shadowJar repository for versioning information.

* Copyright 2019 IBM Corporation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
  1. Build the connect using Gradle.

    gradle shadowJar
  2. The newly built connector binaries are in the build/libs/ folder. Move it into a connectors folder for ease of use.

    mkdir connectors
    cp build/libs/kafka-connect-ibmcos-sink-*-all.jar connectors/
  3. Now that we have the connector in the connectors/ folder, we somehow need embed it into our Kakfa Connect cluster. For that, we need to trigger another build for our Kafka Connect cluster but this time specifying the files we want to get embedded. What the followin command does is it builds a new image with your provided connectors/plugins and triggers a new deployment for your Kafka Connect cluster.

    oc start-build connect-cluster-101-connect --from-dir ./connectors/ --follow
  4. Since the last commands triggers a new build, we should now see three new pods for the build task, the deploy task and the resulting Kafka Connect cluster. Also, we should see the previous Kafka Connect cluster pod if gone.

    oc get pods
    YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-1-build 0/1 Completed 0 31m
    YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-1-deploy 0/1 Completed 0 31m
    YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-2-build 0/1 Completed 0 18m
    YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-2-deploy 0/1 Completed 0 17m
    YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-2-xxxxx 1/1 Running 0 17m
  5. Once the new Kafka Connect cluster pod is up and running we can actually exec into it

    oc exec -it YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-2-xxxxx bash
  6. to see if there is any connector already set up, which should not be the case

    curl localhost:8083/connectors

Apply IBM COS Sink Connector

In this section, we are going to see how to set up, apply or register the IBM COS Sink Connector we embedded into our Kafka Connect cluster so that it starts sending messages from our INBOUND topic into our IBM Cloud Object Storage bucket. proceed.

  1. Create a new file named kafka-cos-sink-connector.yaml and past the following code in it.

    kind: KafkaConnector
    name: cos-sink-connector
    tasksMax: 1


    • YOUR_KAFKA_CONNECT_CLUSTER_NAME: is the name you gave previously to your Kakfa Connect cluster.
    • TOPIC_NAME: is the name of the topic you created in IBM Event Streams at the beginning of this lab.
    • IBM_COS_API_KEY: is your IBM Cloud Object Storage service credentials apikey value. Review first sections of this lab if you don’t remember where and how to find this value.
    • IBM_COS_BUCKET_LOCATION: is your IBM Cloud Object Storage bucket location. Review first sections of this lab if you don’t remember where and how to find this value (it usually is in the form of something like us-east or eu-gb for example).
    • IBM_COS_RESILIENCY: is your IBM Cloud Object Storage resiliency option. Review first sections of this lab if you don’t remember where and how to find this value (it should be regional).
    • IBM_COS_CRM: is your IBM Cloud Object Storage CRN. Review first sections of this lab if you don’t remember where and how to find this value. It usually ends with a double :: at the end of it. IMPORTANT: you might need to retain the double quotation marks here as the CRN has colons in it and may collide with yaml syntax.
  2. Apply the yaml which will create a KafkaConnnector custom resource behind the scenes and register/set up the IBM COS Sink Connector in your Kafka Connect cluster.

    oc apply -f kafka-cos-sink-connector.yaml
  3. The initialization of the connector can take a minute or two. You can check the status of the connector to see if everything connected succesfully.

    oc describe kafkaconnector cos-sink-connector
  4. When the IBM COS Sink connector is successfully up and running you should see something similar to the below.

    IBM COS Sink Connector success
  5. You should also see a new connector being registered if you exec into the Kafka Connect cluster pod and query for the existing connectors again:

    oc exec -it YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-2-xxxxx bash
    bash-4.4$ curl localhost:8083/connectors
  6. Finally, we can check whether the messages from our IBM Event Streams topic are getting propagated to our IBM Cloud Object Storage bucket. If you go to your IBM COS bucket, you should find some files in it. The name of the file inside the bucket has starting offset and ending offset. You can download one of these object files to make sure that the value inside matches the value inside your INBOUND topic.

    End to End Success

Connect with mTLS

Update the file if you are not connecting to Event Streams via PLAINTEXT security protocol.

# Event Streams Connection details
* `REPLACE_WITH_YOUR_BOOTSTRAP_URL`: Your IBM Event Streams bootstrap url.
* `REPLACE_WITH_YOUR_PKCS12_CERTIFICATE_LOCATION`: The location where you downloaded your PCKS12 TLS certificate to.
* `REPLACE_WITH_YOUR_SCRAM_USERNAME`: Your SCRAM service credentials username.
* `REPLACE_WITH_YOUR_SCRAM_PASSWORD`: Your SCRAM service credentials password.
* `REPLACE_WITH_YOUR_TOPIC`: Name of the topic you created above.

Review the Common pre-requisites instructions if you don’t know how to find out any of the config properties above.