Kafka Connect to IBM COS
- Scenario Prerequisites
- Create Event Streams Topic
- Create Producer Application
- Create an IBM COS Service and COS Bucket
- Create IBM COS Service Credentials
- Set up the Kafka Connect Cluster
- Build and Inject IBM COS Sink Connector
- Apply IBM COS Sink Connector
Now that you have an Event Streams instance installed on Cloud Pak for Integration on top of OpenShift Container Platform the goal of this story is to show a possible use case that we can use with this technology. With IBM Event Streams we have access to the powerful capabilities of Kafka in addition to all the monitoring and logging capabilities that IBM provides on top of that with Event Streams.
We will create a simple Quarkus (a super sonic and sub-atomic Kubernetes native framework for Java) application that utilizes MicroProfile Reactive Messaging in order for us to send a stream of data to our Event Streams/Kafka topic. We will then create a Kafka Connect cluster using the Strimzi Operator. Lastly we’ll send messages to an Event Streams topic from our Quarkus application which then triggers the IBM COS Connector to grab messages and place into an IBM COS Bucket.
OpenShift Container Platform Cluster
- This scenario will assume you have a 4.x Cluster as we will make use of Operators, though this one is 4.3 specifically.
Cloud Pak for Integration
- This will assume you have probably at least a 2019.4.1 or 2020.x.x release of the Cloud Pak for Integration installed on OpenShift. This story will also assume you have followed the installation instructions for Event Streams outlined in the 2020-2 product documentation or from the Cloud Pak Playbook and have a working Event Streams instance.
- Java Development Kit (JDK) v1.8+ (Java 8+)
- The scenario uses Maven v3.6.3
- Ideally v4.0+ (Note - the gradle shadowJar command might not work on Java versions newer to Java 8)
An IDE of your choice
- Visual Studio Code is used in this scenario.
- We will need to clone repositories.
An IBM Cloud Account (free)
- A free (Lite) IBM Cloud Object Storage trial Service account IBM Cloud Object Storage
In this section, we will need to create a single
INBOUND topic. The
INBOUND topic is where our Quarkus application will produce to and where our IBM Cloud Object Storage Sink Connector will pull data from to send to the COS Bucket. Review the Common pre-requisites section to understand how to create a topic in IBM Event Streams.
In this section, we are going to create a producer application to send messages into the IBM Event Streams topic we created in the previou section. This application will be a Java application based on Quarkus, a supersonic subatomic Java framework, which will also make use of the MicroProfile Reactive Messaging specification.
Create the Quarkus project.mvn io.quarkus:quarkus-maven-plugin:1.6.0.Final:create \-DprojectGroupId=org.acme \-DprojectArtifactId=quarkus-kafka \-Dextensions="kafka,resteasy-jsonb"
Create the following Java file in the following path.src/main/java/org/acme/kafka/producer/Producer.java
Producer.javafile, add the following code:package org.acme.kafka.producer;import io.reactivex.Flowable;import io.smallrye.reactive.messaging.kafka.KafkaRecord;import org.eclipse.microprofile.reactive.messaging.Outgoing;import javax.enterprise.context.ApplicationScoped;import java.util.Random;
@Outgoingannotation is for specifying the name of the channel, but it will default to that channel’s name if a topic name is not provided in the
application.propertiesfile. We will address that a little bit later.
What does this
@Outgoingannotation indicates that we’re sending to a channel (or topic) and we’re not expecting any data.
generate()function returns an RX Java 2 Flowable Object emmitted every 5 seconds.
Flowableobject returns a
KafkaRecordof type key type Integer and value type String.
applications.propertiesfile that was automatically generated when the Quarkus project was created at
src/main/resourceswith the following code.quarkus.http.port=8080quarkus.log.console.enable=truequarkus.log.console.level=INFO# Event Streams Connection detailsmp.messaging.connector.smallrye-kafka.bootstrap.servers=REPLACE_WITH_YOUR_BOOTSTRAP_URLmp.messaging.connector.smallrye-kafka.security.protocol=SASL_SSLmp.messaging.connector.smallrye-kafka.ssl.protocol=TLSv1.2
REPLACE_WITH_YOUR_BOOTSTRAP_URL: Your IBM Event Streams bootstrap url.
REPLACE_WITH_YOUR_PKCS12_CERTIFICATE_LOCATION: The location where you downloaded your PCKS12 TLS certificate to.
REPLACE_WITH_YOUR_PKCS12_CERTIFICATE_PASSWORD: Your PCKS12 TLS certificate password.
REPLACE_WITH_YOUR_SCRAM_USERNAME: Your SCRAM service credentials username.
REPLACE_WITH_YOUR_SCRAM_PASSWORD: Your SCRAM service credentials password.
REPLACE_WITH_YOUR_TOPIC: Name of the topic you created above.
Review the Common pre-requisites instructions if you don’t know how to find out any of the config properties above.
Run the producer application../mvnw quarkus:dev
Since the code sends a message every 5 seconds, you can leave it on for a bit. Check out messages are making it into the yopic using your IBM Event Streams user interface. You can click the message under “Indexed Timestamp” to see the contents and details of the message.
In this section, we are going to see how to create an IBM Cloud Obeject Storage (IBM COS) Service in your IBM Cloud account and a bucket within your IBM COS Service. We assume you already have an IBM Cloud account already and, if not, you can sign up for one here at IBM Cloud.
Once you are inside your IBM Cloud account, traverse to the
Catalogsection. In the search type in
IBM Cloud Object Storage
Name your IBM COS Service with something unique. Since this is a free account, we can stick with the
Now that the IBM Cloud Object Storage Service is created, traverse to it and let’s create a new bucket. On the
Create Bucketscreen pick
When selecting options for the bucket, name your bucket something unique. For
Regional. For location select an area from the drop-down that you want. IMPORTANT: for
Standard. The IBM COS Sink connector seems to not play well with buckets that are created with the
Smart TierStorage Class. Leave everything else as-is and hit
Now that we have created our IBM Cloud Object Storage Service and bucket created, we now need to create the Service Credential so that we can connect to it.
Inside your IBM COS Service, select
Service Credentialsand then click the
Name your credential and select
Role:drop-down menu and click
Expand your newly created Service Credential and write down the values for
"resource_instance_id". You will need this later in the Build and Apply IBM COS Sink Connector section.
In this section, we are going to see how to deploy a Kafka Connect cluster on OpenShift which will be the engine running the source and sink connector we decide to use for our use case. IMPORTANT: We assume you have deployed your IBM Event Streams instance with an internal TLS secured listener which your Kafka Connect cluster will use to connect. For more detail about listeners, check the IBM Event Streams documentation here.
If you inspect your IBM Event Streams instance by executing the following command:
oc get EventStreams YOUR_IBM_EVENT_STREAMS_INSTANCE_NAME -o yaml
You should see a
Now, follow the next steps in order to get your Kafka Connect cluster deployed:
Go to you IBM Event Streams dashboard, click on the
Find more on the toolboxoption.
Click on the
Set upbutton for the
Set up a Kafka Connect environmentoption.
Download Kafka Connect ZIPbutton.
The above downloads a zip file which contains a
kafka-connect-s2i.yamlfile. Open that yaml file and take note of the
cloudpakIdvalues as you will need these in the following step.
Instead of using the previous yaml file, create a new
kafka-connect-s2i.yamlfile with the following contents:apiVersion: eventstreams.ibm.com/v1beta1kind: KafkaConnectS2Imetadata:name: YOUR_KAFKA_CONNECT_CLUSTER_NAMEannotations:eventstreams.ibm.com/use-connector-resources: "true"spec:logging:type: external
where you will need to replace the following placeholders with the appropriate values for you IBM Event Streams cluster and service credentials:
YOUR_KAFKA_CONNECT_CLUSTER_NAME: A name you want to provide your Kafka Connect cluster and resources with.
YOUR_INTERNAL_BOOTSTRAP_ADDRESS: This is the internal bootstrap address of your IBM Event Streams instance. You can review how to find this url here. Use the internal bootstrap address which should be in the form of
YOUR_TLS_CREDENTIALS_SECRET: This is the name you give to your TLS credentials for your internal IBM Event Streams listener when you click on
Generate TLS credentials:
YOUR_CLUSTER_TLS_CERTIFICATE_SECRET: This is the secret name where IBM Event Streams stores the TLS certificate for establishing secure communications. This secret name is in the form of
YOUR_IBM_EVENT_STREAMS_INSTANCE_NAME-cluster-ca-cert. You can always use
oc get secretsto list all the secrets.
YOUR_PRODUCT_ID: This is the
productIDvalue you noted down earlier.
YOUR_CLOUDPAK_ID: This is the
cloudpakIDvalue you noted earlier.
Deploy your Kafka Connect cluster by executingoc apply -f kafkaconnect-s2i.yaml
If you list the pods, you should see three new pods: one for the Kafka Connect build task, another for the Kafka Connect deploy task and the actual Kafka Connect cluster pod.oc get podsNAME READY STATUS RESTARTS AGEYOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-1-build 0/1 Completed 0 18mYOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-1-deploy 0/1 Completed 0 17mYOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-1-xxxxx 1/1 Running 0 17m
The IBM COS Sink Connector source code is availabe at this repository here.
IMPORTANT: Make sure you have Java 8 installed on your workstation and that is the default Java version of your system since the IBM COS Sink Connector can only be built with that version of Java.
Clone the Kafka Connect IBM COS Source Connector repository and then change your folder.git clone https://github.com/ibm-messaging/kafka-connect-ibmcos-sink.gitcd kafka-connect-ibmcos-sink/
IMPORTANT Part Two: Depending on your Gradle version you have installed on your machine you will need to update the connector’s gradle build file. For example Gradle v7.x the
build.gradle file should look something like this as the
compile() method is deprecated in newer versions. You can downgrade your Gradle version if you so choose. This is the shadowJar repository for versioning information.
/** Copyright 2019 IBM Corporation** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0*
Build the connect using
The newly built connector binaries are in the
build/libs/folder. Move it into a
connectorsfolder for ease of use.mkdir connectorscp build/libs/kafka-connect-ibmcos-sink-*-all.jar connectors/
Now that we have the connector in the
connectors/folder, we somehow need embed it into our Kakfa Connect cluster. For that, we need to trigger another build for our Kafka Connect cluster but this time specifying the files we want to get embedded. What the followin command does is it builds a new image with your provided connectors/plugins and triggers a new deployment for your Kafka Connect cluster.oc start-build connect-cluster-101-connect --from-dir ./connectors/ --follow
Since the last commands triggers a new build, we should now see three new pods for the build task, the deploy task and the resulting Kafka Connect cluster. Also, we should see the previous Kafka Connect cluster pod if gone.oc get podsNAME READY STATUS RESTARTS AGEYOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-1-build 0/1 Completed 0 31mYOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-1-deploy 0/1 Completed 0 31mYOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-2-build 0/1 Completed 0 18mYOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-2-deploy 0/1 Completed 0 17mYOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-2-xxxxx 1/1 Running 0 17m
Once the new Kafka Connect cluster pod is up and running we can actually exec into itoc exec -it YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-2-xxxxx bash
to see if there is any connector already set up, which should not be the casecurl localhost:8083/connectors
In this section, we are going to see how to set up, apply or register the IBM COS Sink Connector we embedded into our Kafka Connect cluster so that it starts sending messages from our
INBOUND topic into our IBM Cloud Object Storage bucket.
Create a new file named
kafka-cos-sink-connector.yamland past the following code in it.apiVersion: eventstreams.ibm.com/v1alpha1kind: KafkaConnectormetadata:name: cos-sink-connectorlabels:eventstreams.ibm.com/cluster: YOUR_KAFKA_CONNECT_CLUSTER_NAMEspec:class: com.ibm.eventstreams.connect.cossink.COSSinkConnectortasksMax: 1
YOUR_KAFKA_CONNECT_CLUSTER_NAME: is the name you gave previously to your Kakfa Connect cluster.
TOPIC_NAME: is the name of the topic you created in IBM Event Streams at the beginning of this lab.
IBM_COS_API_KEY: is your IBM Cloud Object Storage service credentials
apikeyvalue. Review first sections of this lab if you don’t remember where and how to find this value.
IBM_COS_BUCKET_LOCATION: is your IBM Cloud Object Storage bucket location. Review first sections of this lab if you don’t remember where and how to find this value (it usually is in the form of something like
IBM_COS_RESILIENCY: is your IBM Cloud Object Storage resiliency option. Review first sections of this lab if you don’t remember where and how to find this value (it should be
IBM_COS_CRM: is your IBM Cloud Object Storage CRN. Review first sections of this lab if you don’t remember where and how to find this value. It usually ends with a double
::at the end of it. IMPORTANT: you might need to retain the double quotation marks here as the CRN has colons in it and may collide with yaml syntax.
Apply the yaml which will create a
KafkaConnnectorcustom resource behind the scenes and register/set up the IBM COS Sink Connector in your Kafka Connect cluster.oc apply -f kafka-cos-sink-connector.yaml
The initialization of the connector can take a minute or two. You can check the status of the connector to see if everything connected succesfully.oc describe kafkaconnector cos-sink-connector
When the IBM COS Sink connector is successfully up and running you should see something similar to the below.
You should also see a new connector being registered if you exec into the Kafka Connect cluster pod and query for the existing connectors again:oc exec -it YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-2-xxxxx bashbash-4.4$ curl localhost:8083/connectors["cos-sink-connector"]
Finally, we can check whether the messages from our IBM Event Streams topic are getting propagated to our IBM Cloud Object Storage bucket. If you go to your IBM COS bucket, you should find some files in it. The name of the file inside the bucket has starting offset and ending offset. You can download one of these object files to make sure that the value inside matches the value inside your