Skip to main contentIBM Garage Event-Driven Reference Architecture

Kafka Connect to IBM COS

Overview

Now that you have an Event Streams instance installed on Cloud Pak for Integration on top of OpenShift Container Platform the goal of this story is to show a possible use case that we can use with this technology. With IBM Event Streams we have access to the powerful capabilities of Kafka in addition to all the monitoring and logging capabilities that IBM provides on top of that with Event Streams.

We will create a simple Quarkus (a super sonic and sub-atomic Kubernetes native framework for Java) application that utilizes MicroProfile Reactive Messaging in order for us to send a stream of data to our Event Streams/Kafka topic. We will then create a Kafka Connect cluster using the Strimzi Operator. Lastly we’ll send messages to an Event Streams topic from our Quarkus application which then triggers the IBM COS Connector to grab messages and place into an IBM COS Bucket.

Architecture Diagram

Scenario Prerequisites

OpenShift Container Platform Cluster

  • This scenario will assume you have a 4.x Cluster as we will make use of Operators, though this one is 4.3 specifically.

Cloud Pak for Integration

  • This will assume you have probably at least a 2019.4.1 or 2020.x.x release of the Cloud Pak for Integration installed on OpenShift. This story will also assume you have followed the installation instructions for Event Streams outlined in the 2020-2 product documentation or from the Cloud Pak Playbook and have a working Event Streams instance.

Java

  • Java Development Kit (JDK) v1.8+ (Java 8+)

Maven

  • The scenario uses Maven v3.6.3

Gradle

  • Ideally v4.0+ (Note - the gradle shadowJar command might not work on Java versions newer to Java 8)

An IDE of your choice

  • Visual Studio Code is used in this scenario.

Git

  • We will need to clone repositories.

An IBM Cloud Account (free)

Create Event Streams Topic

In this section, we will need to create a single INBOUND topic. The INBOUND topic is where our Quarkus application will produce to and where our IBM Cloud Object Storage Sink Connector will pull data from to send to the COS Bucket. Review the Common pre-requisites section to understand how to create a topic in IBM Event Streams.

Create Producer Application

In this section, we are going to create a producer application to send messages into the IBM Event Streams topic we created in the previou section. This application will be a Java application based on Quarkus, a supersonic subatomic Java framework, which will also make use of the MicroProfile Reactive Messaging specification.

  1. Create the Quarkus project.

    mvn io.quarkus:quarkus-maven-plugin:1.6.0.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=quarkus-kafka \
    -Dextensions="kafka,resteasy-jsonb"
  2. Create the following Java file in the following path.

    src/main/java/org/acme/kafka/producer/Producer.java
    Quarkus Project Folder Structure
  3. Within your Producer.java file, add the following code:

    package org.acme.kafka.producer;
    import io.reactivex.Flowable;
    import io.smallrye.reactive.messaging.kafka.KafkaRecord;
    import org.eclipse.microprofile.reactive.messaging.Outgoing;
    import javax.enterprise.context.ApplicationScoped;
    import java.util.Random;

    The @Outgoing annotation is for specifying the name of the channel, but it will default to that channel’s name if a topic name is not provided in the application.properties file. We will address that a little bit later.

    What does this Producer.java code do?

    • The @Outgoing annotation indicates that we’re sending to a channel (or topic) and we’re not expecting any data.
    • The generate() function returns an RX Java 2 Flowable Object emmitted every 5 seconds.
    • The Flowable object returns a KafkaRecord of type key type Integer and value type String.
  4. Update our applications.properties file that was automatically generated when the Quarkus project was created at src/main/resources with the following code.

    quarkus.http.port=8080
    quarkus.log.console.enable=true
    quarkus.log.console.level=INFO
    # Event Streams Connection details
    mp.messaging.connector.smallrye-kafka.bootstrap.servers=REPLACE_WITH_YOUR_BOOTSTRAP_URL
    mp.messaging.connector.smallrye-kafka.security.protocol=SASL_SSL
    mp.messaging.connector.smallrye-kafka.ssl.protocol=TLSv1.2
    • REPLACE_WITH_YOUR_BOOTSTRAP_URL: Your IBM Event Streams bootstrap url.
    • REPLACE_WITH_YOUR_PKCS12_CERTIFICATE_LOCATION: The location where you downloaded your PCKS12 TLS certificate to.
    • REPLACE_WITH_YOUR_PKCS12_CERTIFICATE_PASSWORD: Your PCKS12 TLS certificate password.
    • REPLACE_WITH_YOUR_SCRAM_USERNAME: Your SCRAM service credentials username.
    • REPLACE_WITH_YOUR_SCRAM_PASSWORD: Your SCRAM service credentials password.
    • REPLACE_WITH_YOUR_TOPIC: Name of the topic you created above.

    Review the Common pre-requisites instructions if you don’t know how to find out any of the config properties above.

  5. Run the producer application.

    ./mvnw quarkus:dev
  6. Since the code sends a message every 5 seconds, you can leave it on for a bit. Check out messages are making it into the yopic using your IBM Event Streams user interface. You can click the message under “Indexed Timestamp” to see the contents and details of the message.

    ES Topic Messages

Create an IBM COS Service and COS Bucket

In this section, we are going to see how to create an IBM Cloud Obeject Storage (IBM COS) Service in your IBM Cloud account and a bucket within your IBM COS Service. We assume you already have an IBM Cloud account already and, if not, you can sign up for one here at IBM Cloud.

  1. Once you are inside your IBM Cloud account, traverse to the Catalog section. In the search type in IBM Cloud Object Storage

    IBM COS Catalog Search
  2. Name your IBM COS Service with something unique. Since this is a free account, we can stick with the Lite Plan.

    IBM COS Create COS Service
  3. Now that the IBM Cloud Object Storage Service is created, traverse to it and let’s create a new bucket. On the Create Bucket screen pick Custom Bucket.

    IBM COS Custom Bucket
  4. When selecting options for the bucket, name your bucket something unique. For Resiliency let’s select Regional. For location select an area from the drop-down that you want. IMPORTANT: for Storage Class select Standard. The IBM COS Sink connector seems to not play well with buckets that are created with the Smart Tier Storage Class. Leave everything else as-is and hit Create Bucket.

    IBM COS Custom Bucket Settings

Create IBM COS Service Credentials

Now that we have created our IBM Cloud Object Storage Service and bucket created, we now need to create the Service Credential so that we can connect to it.

  1. Inside your IBM COS Service, select Service Credentials and then click the New Credential button.

    IBM COS Service Credential
  2. Name your credential and select Manager from the Role: drop-down menu and click Add.

    IBM COS SC Settings
  3. Expand your newly created Service Credential and write down the values for "apikey" and "resource_instance_id". You will need this later in the Build and Apply IBM COS Sink Connector section.

    Expanded Service Cred

Set up the Kafka Connect Cluster

In this section, we are going to see how to deploy a Kafka Connect cluster on OpenShift which will be the engine running the source and sink connector we decide to use for our use case. IMPORTANT: We assume you have deployed your IBM Event Streams instance with an internal TLS secured listener which your Kafka Connect cluster will use to connect. For more detail about listeners, check the IBM Event Streams documentation here.

If you inspect your IBM Event Streams instance by executing the following command:

oc get EventStreams YOUR_IBM_EVENT_STREAMS_INSTANCE_NAME -o yaml

You should see a Tls listener:

connect5

Now, follow the next steps in order to get your Kafka Connect cluster deployed:

  1. Go to you IBM Event Streams dashboard, click on the Find more on the toolbox option.

    Toolbox1
  2. Click on the Set up button for the Set up a Kafka Connect environment option.

    Toolbox2
  3. Click on Download Kafka Connect ZIP button.

    Toolbox3
  4. The above downloads a zip file which contains a kafka-connect-s2i.yaml file. Open that yaml file and take note of the productID and cloudpakId values as you will need these in the following step.

    connect4
  5. Instead of using the previous yaml file, create a new kafka-connect-s2i.yaml file with the following contents:

    apiVersion: eventstreams.ibm.com/v1beta1
    kind: KafkaConnectS2I
    metadata:
    name: YOUR_KAFKA_CONNECT_CLUSTER_NAME
    annotations:
    eventstreams.ibm.com/use-connector-resources: "true"
    spec:
    logging:
    type: external

    where you will need to replace the following placeholders with the appropriate values for you IBM Event Streams cluster and service credentials:

    • YOUR_KAFKA_CONNECT_CLUSTER_NAME: A name you want to provide your Kafka Connect cluster and resources with.
    • YOUR_INTERNAL_BOOTSTRAP_ADDRESS: This is the internal bootstrap address of your IBM Event Streams instance. You can review how to find this url here. Use the internal bootstrap address which should be in the form of YOUR_IBM_EVENT_STREAMS_INSTANCE_NAME-kafka-bootstrap.eventstreams.svc:9093: connect6
    • YOUR_TLS_CREDENTIALS_SECRET: This is the name you give to your TLS credentials for your internal IBM Event Streams listener when you click on Generate TLS credentials: connect7
    • YOUR_CLUSTER_TLS_CERTIFICATE_SECRET: This is the secret name where IBM Event Streams stores the TLS certificate for establishing secure communications. This secret name is in the form of YOUR_IBM_EVENT_STREAMS_INSTANCE_NAME-cluster-ca-cert. You can always use oc get secrets to list all the secrets.
    • YOUR_PRODUCT_ID: This is the productID value you noted down earlier.
    • YOUR_CLOUDPAK_ID: This is the cloudpakID value you noted earlier.
  6. Deploy your Kafka Connect cluster by executing

    oc apply -f kafkaconnect-s2i.yaml
  7. If you list the pods, you should see three new pods: one for the Kafka Connect build task, another for the Kafka Connect deploy task and the actual Kafka Connect cluster pod.

    oc get pods
    NAME READY STATUS RESTARTS AGE
    YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-1-build 0/1 Completed 0 18m
    YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-1-deploy 0/1 Completed 0 17m
    YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-1-xxxxx 1/1 Running 0 17m

Build and Inject IBM COS Sink Connector

The IBM COS Sink Connector source code is availabe at this repository here.

IMPORTANT: Make sure you have Java 8 installed on your workstation and that is the default Java version of your system since the IBM COS Sink Connector can only be built with that version of Java.

  1. Clone the Kafka Connect IBM COS Source Connector repository and then change your folder.

    git clone https://github.com/ibm-messaging/kafka-connect-ibmcos-sink.git
    cd kafka-connect-ibmcos-sink/

IMPORTANT Part Two: Depending on your Gradle version you have installed on your machine you will need to update the connector’s gradle build file. For example Gradle v7.x the build.gradle file should look something like this as the compile() method is deprecated in newer versions. You can downgrade your Gradle version if you so choose. This is the shadowJar repository for versioning information.

/*
* Copyright 2019 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
  1. Build the connect using Gradle.

    gradle shadowJar
  2. The newly built connector binaries are in the build/libs/ folder. Move it into a connectors folder for ease of use.

    mkdir connectors
    cp build/libs/kafka-connect-ibmcos-sink-*-all.jar connectors/
  3. Now that we have the connector in the connectors/ folder, we somehow need embed it into our Kakfa Connect cluster. For that, we need to trigger another build for our Kafka Connect cluster but this time specifying the files we want to get embedded. What the followin command does is it builds a new image with your provided connectors/plugins and triggers a new deployment for your Kafka Connect cluster.

    oc start-build connect-cluster-101-connect --from-dir ./connectors/ --follow
  4. Since the last commands triggers a new build, we should now see three new pods for the build task, the deploy task and the resulting Kafka Connect cluster. Also, we should see the previous Kafka Connect cluster pod if gone.

    oc get pods
    NAME READY STATUS RESTARTS AGE
    YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-1-build 0/1 Completed 0 31m
    YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-1-deploy 0/1 Completed 0 31m
    YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-2-build 0/1 Completed 0 18m
    YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-2-deploy 0/1 Completed 0 17m
    YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-2-xxxxx 1/1 Running 0 17m
  5. Once the new Kafka Connect cluster pod is up and running we can actually exec into it

    oc exec -it YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-2-xxxxx bash
  6. to see if there is any connector already set up, which should not be the case

    curl localhost:8083/connectors
    []

Apply IBM COS Sink Connector

In this section, we are going to see how to set up, apply or register the IBM COS Sink Connector we embedded into our Kafka Connect cluster so that it starts sending messages from our INBOUND topic into our IBM Cloud Object Storage bucket. proceed.

  1. Create a new file named kafka-cos-sink-connector.yaml and past the following code in it.

    apiVersion: eventstreams.ibm.com/v1alpha1
    kind: KafkaConnector
    metadata:
    name: cos-sink-connector
    labels:
    eventstreams.ibm.com/cluster: YOUR_KAFKA_CONNECT_CLUSTER_NAME
    spec:
    class: com.ibm.eventstreams.connect.cossink.COSSinkConnector
    tasksMax: 1

    where

    • YOUR_KAFKA_CONNECT_CLUSTER_NAME: is the name you gave previously to your Kakfa Connect cluster.
    • TOPIC_NAME: is the name of the topic you created in IBM Event Streams at the beginning of this lab.
    • IBM_COS_API_KEY: is your IBM Cloud Object Storage service credentials apikey value. Review first sections of this lab if you don’t remember where and how to find this value.
    • IBM_COS_BUCKET_LOCATION: is your IBM Cloud Object Storage bucket location. Review first sections of this lab if you don’t remember where and how to find this value (it usually is in the form of something like us-east or eu-gb for example).
    • IBM_COS_RESILIENCY: is your IBM Cloud Object Storage resiliency option. Review first sections of this lab if you don’t remember where and how to find this value (it should be regional).
    • IBM_COS_CRM: is your IBM Cloud Object Storage CRN. Review first sections of this lab if you don’t remember where and how to find this value. It usually ends with a double :: at the end of it. IMPORTANT: you might need to retain the double quotation marks here as the CRN has colons in it and may collide with yaml syntax.
  2. Apply the yaml which will create a KafkaConnnector custom resource behind the scenes and register/set up the IBM COS Sink Connector in your Kafka Connect cluster.

    oc apply -f kafka-cos-sink-connector.yaml
  3. The initialization of the connector can take a minute or two. You can check the status of the connector to see if everything connected succesfully.

    oc describe kafkaconnector cos-sink-connector
  4. When the IBM COS Sink connector is successfully up and running you should see something similar to the below.

    IBM COS Sink Connector success
  5. You should also see a new connector being registered if you exec into the Kafka Connect cluster pod and query for the existing connectors again:

    oc exec -it YOUR_KAFKA_CONNECT_CLUSTER_NAME-connect-2-xxxxx bash
    bash-4.4$ curl localhost:8083/connectors
    ["cos-sink-connector"]
  6. Finally, we can check whether the messages from our IBM Event Streams topic are getting propagated to our IBM Cloud Object Storage bucket. If you go to your IBM COS bucket, you should find some files in it. The name of the file inside the bucket has starting offset and ending offset. You can download one of these object files to make sure that the value inside matches the value inside your INBOUND topic.

    End to End Success