Skip to main contentIBM Garage Event-Driven Reference Architecture

IBM Event Streams Schema Registry from IBM CloudPak for Integration

This documentation aims to be a introductory hands-on lab on the IBM Event Streams Schema Registry installed throught the IBM Cloud Pak for Integration V2020.2.X+ on an Openshift cluster.

Index

Requirements

This lab requires the following components to work against:

  1. An IBM Event Streams V10 instance installed through the IBM CloudPak for Integration V2020.2.X or greater.
  2. An IBM Cloud Shell - https://www.ibm.com/cloud/cloud-shell

IBM Cloud Shell

Here we are going to set up our IBM Cloud Shell with all the tools required to carry out this lab.

Start your IBM Cloud Shell by pointing your browser to https://cloud.ibm.com/shell

shell

IBM Cloud Pak CLI

Cloudctl is a command line tool to manage Container Application Software for Enterprises (CASEs). This CLI will allow us to manage Cloud Pak related components as well as software, like IBM Event Streams, installed through any IBM Cloud Pak.

In order to install it, execute the following commands in your IBM Cloud Shell:

  1. Download the IBM Cloud Pak CLI - curl -L https://github.com/IBM/cloud-pak-cli/releases/latest/download/cloudctl-linux-amd64.tar.gz -o cloudctl-linux-amd64.tar.gz
  2. Untar it - tar -xvf cloudctl-linux-amd64.tar.gz
  3. Rename it for ease of use - mv cloudctl-linux-amd64 cloudctl
  4. Include it to the PATH environment variable - export PATH=$PATH:$PWD
  5. Make sure your IBM Cloud Pak CLI is in the path- which cloudctl
  6. Make sure your IBM Cloud Pak CLI works - cloudctl help
shell2

Note: If you are not using the IBM Cloud Shell to run the lab, be aware that the cloudctl CLI requires the kubectl CLI. To install the kubectl CLI on your personal environment, follow the instructions here

Event Streams plugin for IBM Cloud Pak CLI

This plugin will allow us to manage IBM Event Streams.

In order to install it, execute the following commands in your IBM Cloud Shell:

  1. Download the Event Streams plugin for IBM Cloud Pak CLI - curl -L http://ibm.biz/es-cli-linux -o es-plugin
  2. Install it - cloudctl plugin install es-plugin
  3. Make sure it works - cloudctl es help
shell3

Git

IBM Cloud Shell comes with Git already installed out of the box.

Vi

IBM Cloud Shell comes with Vi already installed out of the box.

Python 3

IBM Cloud Shell comes with Python 3 already installed out of the box. However, we need to install the following modules that will be used later on in this tutorial when we run a Python application to work with Avro, Schemas and messages. These modules are confluent_kafka and avro-python3

In order to install these modules, execute the following command in your IBM Cloud Shell:

  1. Install the modules - python3 -mpip install avro-python3 confluent_kafka --user
shell4

Congrats! you have now your IBM Cloud Shell ready to start working.

Schema Registry

diagram

One of the most common technologies used in the industry these days to define, serialize and deserialize messages flowing through your Kafka topics is Apache Avro (https://avro.apache.org/docs/current/). To learn more about Apache Avro, how to define Apache Avro data schemas and more, we strongly recommend to read through our documentation on Avro and data schemas here

IBM Event Streams development team has developed a Schema Registry to work along your Kafka cluster to provide a place to store descriptions of the message formats used by your producers and consumers. The benefit of storing these descriptions is that you are able to validate that your producing and consuming applications will correctly inter-operate. The Schema Registry will also provide the ability for schemas to evolve in time.

Accessing the Schema Registry

UI

To access the schema registry, we first need to log into IBM Event Streams.

  1. Point your browser to your IBM Event Streams instace’s user interface url and introduce your credentials

    login
  2. Once you are logged into your IBM Event Streams instance, you simply need to click on the Schema Registry button on the main left hand vertical menu bar:

    4

CLI

We can also interact with the Schema Registry through the IBM Event Streams CLI. In order to do so, we first need to log in with the IBM Cloud Pak CLI:

  1. Log into your cluster with the IBM CloudPak CLI

    cloudctl login -a https://cp-console.apps.eda-solutions.gse-ocp.net --skip-ssl-validation
    Username> admin
    Password>
    Authenticating...
    OK
    Targeted account mycluster Account
  2. Initialize the Event Streams CLI plugin

    cloudctl es init
    IBM Cloud Platform Common Services endpoint: https://cp-console.apps.eda-solutions.gse-ocp.net
    Namespace: integration
    Name: es-1
    IBM Cloud Pak for Integration UI address: No instance of Cloud Pak for Integration has been found. Please check that you have access to it.
    Event Streams API endpoint: https://es-1-ibm-es-admapi-external-integration.apps.eda-solutions.gse-ocp.net
    Event Streams API status: OK
    Event Streams UI address: https://es-1-ibm-es-ui-integration.apps.eda-solutions.gse-ocp.net

    (*)The above information will later be used in the IBM Event Streams Credentials section as these are neeeded by the Python application we will work with.

  3. Make sure you can access the IBM Event Streams Schema Registry:

    cloudctl es schemas
    No schemas were found.
    OK

Schemas

In this section we will finally get our hands dirty with the IBM Event Steams Schema Registry capability by working with Apache Avro schemas and the Schema Registry.

Create a schema

Let’s see how can we create a schema to start playing with.

UI

The IBM EVent Streams user interface allow us to create schemas only from json or Avro schema avsc files.

  1. Create an Avro schema file avsc with your schema:

    echo '{
    "type":"record",
    "name":"demoSchema_UI_USER1",
    "namespace": "schemas.demo.ui",
    "fields":[
    {"name": "eventKey","type":"string"},
    {"name": "message","type":"string"}]
    }' > demoshema-ui.avsc
  2. On the IBM Event Streams Schema Registry User Interface, Click on Add schema button on the top right corner.

  3. Click on Upload definition button on the left hand side and select the demoschema-ui.avsc file we just created.

  4. You should now see you Avro schema loaded in the UI with two tabs, definition and preview to make sure your schema looks as desired:

    5
  5. Click on Add schema button at the top right corner and you should now see that schema listed among your other schemas.

CLI

  1. Create another Avro schema avsc file with a different schema:

    echo '{
    "type":"record",
    "name":"demoSchema_CLI_USER1",
    "namespace": "schemas.demo.cli",
    "fields":[
    {"name": "eventKey","type":"string"},
    {"name": "message","type":"string"}]
    }' > demoshema-cli.avsc
  2. Create a schema by executing the following command:

    cloudctl es schema-add --file demoshema-cli.avsc
    Schema demoSchema_CLI_USER1 is active.
    Version Version ID Schema State Updated Comment
    1.0.0 1 demoSchema_CLI_USER1 active Thu, 25 Jun 2020 11:30:42 UTC
    Added version 1.0.0 of schema demoSchema_CLI_USER1 to the registry.
    OK

List schemas

UI

In order to list the schemas in the UI you just simply need to open up the Schema Registry User Interface and schemas will get listed in there automatically. You also have a search tool bar at the top. You can also see more details about your schema by clicking the drop down arrow on its left:

6

CLI

  1. Execute the following command to list the schemas in your Schema Registry:

    cloudctl es schemas
    Schema State Latest version Latest version ID Updated
    demoSchema_CLI_USER1 active 1.0.0 1 Fri, 24 Jul 2020 13:55:49 UTC
    demoSchema_UI_USER1 active 1.0.0 1 Fri, 24 Jul 2020 13:55:51 UTC
    OK

Delete schemas

UI

  1. Click on the schema you want to delete.

  2. Click on the Manage schema tab at the top.

  3. Click on Remove schema

    7

CLI

To remove a schema using the CLI, simply execute the following command and confirm:

cloudctl es schema-remove demoSchema_CLI_USER1
Remove schema demoSchema_CLI_USER1 and all versions? [y/n]> y
Schema demoSchema_CLI_USER1 and all versions removed.
OK

Create new schema version

To create a new version of a schema,

  1. Let’s first create again the previous two schemas:

    cloudctl es schema-add --file demoshema-ui.avsc
    Schema demoSchema_UI_USER1 is active.
    Version Version ID Schema State Updated Comment
    1.0.0 1 demoSchema_UI_USER1 active Fri, 24 Jul 2020 13:59:55 UTC
    Added version 1.0.0 of schema demoSchema_UI_USER1 to the registry.
    OK
    cloudctl es schema-add --file demoshema-cli.avsc
    Schema demoSchema_CLI_USER1 is active.
    Version Version ID Schema State Updated Comment
    1.0.0 1 demoSchema_CLI_USER1 active Fri, 24 Jul 2020 14:00:45 UTC
    Added version 1.0.0 of schema demoSchema_CLI_USER1 to the registry.
    OK
  2. Add a new attribute to the schemas by editing their Avro schema avsc files:

    cat demoshema-ui.avsc
    {
    "type":"record",
    "name":"demoSchema_UI_USER1",
    "namespace": "schemas.demo.ui",
    "fields":[
    {"name": "eventKey","type":"string"},
    {"name": "message","type":"string"},

UI

  1. Click on the schema you want to create a new version for.

  2. Click on the Add new version button on the left hand side.

  3. Click on Upload definition button on the left hand side.

  4. Select the Avro schema avsc file and click ok.

    8

Full compatibility for data schemas means that messages that have been serialized with an earlier version of a schema can be deserialized with a later version. To be compatible, fields in later versions of a schema cannot be removed, and any new schema field must have a default value. More on data schema compatibility on the section Data Evolution towards the end of this lab.

As explained in the error notification above, we need to add a default value for our new attribute in our data schema so that messages serialized with an older version of the data schema which won’t contain this new attribute can later be deserialized with the newer version of the data schema that expects such attribute. By providing a default value, we allow deserializers to consume messages that do not contain newer attributes.

  1. Add a default value for the new attribute:

    cat demoshema-ui.avsc
    {
    "type":"record",
    "name":"demoSchema_UI_USER1",
    "namespace": "schemas.demo.ui",
    "fields":[
    {"name": "eventKey","type":"string"},
    {"name": "message","type":"string"},
  2. Repeat the steps for adding a new version of a schema above.

  3. This time you should see that the schema is valid:

    9
  4. However, it still does not let us add this new version to the data schema until we actually provide a version for it. Click on the Add + link on the right of the version attribute of the schema and give it 2.0.0 for example (hit enter for the version to take the value you type in).

  5. Click on Add schema.

  6. You should now see the two versions for your data schema on the left hand side.

    10
  7. If you go back to the Schema Registry page where all your schemas are listed, you should now see that the latest version for your data schema is 2.0.0 now.

CLI

  1. If we try to add the new version of the schema from its demoschema-cli.avsc Avro schema file, we will get the same error as in the previous UI example:

    cloudctl es schema-add --file demoshema-cli.avsc
    FAILED
    Event Streams API request failed:
    Error response from server. Status code: 400. Avro schema is not compatible with latest schema version: Compatibility type 'MUTUAL_READ' does not hold between 1 schema(s) in the chronology because: Schema[0] has incompatibilities: ['READER_FIELD_MISSING_DEFAULT_VALUE: attribute1' at '/fields/2'].
    Unable to add version 1.0.0 of schema demoSchema_CLI_USER1 to the registry.
  2. Add the default value for the new attribute in your Avro schema avsc file and try to add that new version of the schema:

    cloudctl es schema-add --file demoshema-cli.avsc
    FAILED
    Event Streams API request failed:
    Error response from server. Status code: 409. Schema version name already exists
    Unable to add version 1.0.0 of schema demoSchema_CLI_USER1 to the registry.
  3. We see that we still have an error because we have not specified a new version value. Specify a new version value when adding this new version of the schema:

    cloudctl es schema-add --file demoshema-cli.avsc --version 2.0.0
    Schema demoSchema_CLI_USER1 is active.
    Version Version ID Schema State Updated Comment
    1.0.0 1 demoSchema_CLI_USER1 active Fri, 24 Jul 2020 14:00:45 UTC
    2.0.0 2 demoSchema_CLI_USER1 active Fri, 24 Jul 2020 14:09:37 UTC
    Added version 2.0.0 of schema demoSchema_CLI_USER1 to the registry.

Get latest version of a schema

UI

In order to see the latest version of a data schema using the UI, we just need to go to the Schema Registry web user interface and click on the expand arrow buttton that is on the left:

11

CLI

In order to see the latest version of a data schema using the CLI, we simply need to run the following command:

cloudctl es schema demoSchema_CLI_USER1 --version 2
{
"type": "record",
"name": "demoSchema_CLI_USER1",
"namespace": "schemas.demo.cli",
"fields": [
{
"name": "eventKey",

(*) The version you specify is actually the version ID (2) rather than the version name we gave to the newer schema version (2.0.0):

cloudctl es schema demoSchema_CLI_USER1
Schema demoSchema_CLI_USER1 is active.
Version Version ID Schema State Updated Comment
1.0.0 1 demoSchema_CLI_USER1 active Fri, 24 Jul 2020 14:00:45 UTC
2.0.0 2 demoSchema_CLI_USER1 active Fri, 24 Jul 2020 14:09:37 UTC
OK

Get specific version of a schema

UI

To see a specific version of a schema, go to the Schema Registry web user interface and click on the schema you want to see the version for. You will now see how many version of the schema you have and you can click on any of these in order to see more details about it.

12

CLI

To see a specific version of a schema using the CLI, simply run the following command with the version ID you would like to get retrieved:

cloudctl es schema demoSchema_CLI_USER1 --version 1
{
"type": "record",
"name": "demoSchema_CLI_USER1",
"namespace": "schemas.demo.cli",
"fields": [
{
"name": "eventKey",

Listing all versions of a schema

UI

To list all versions of schema in the Schema Registry user interface, you simply need to click on the data schema you want and a new page will display these:

12

CLI

In order to display all versions of a schema, run the following command:

cloudctl es schema demoSchema_CLI_USER1
Schema demoSchema_CLI_USER1 is active.
Version Version ID Schema State Updated Comment
1.0.0 1 demoSchema_CLI_USER1 active Fri, 24 Jul 2020 14:00:45 UTC
2.0.0 2 demoSchema_CLI_USER1 active Fri, 24 Jul 2020 14:09:37 UTC
OK

Deleting a version of a schema

UI

In order to delete a version of a schema using the Schema Registry user interface,

  1. Click on the data schema you want a version of it deleted for.

  2. Select the version you want to delete on the left hand side.

  3. Click on Manage version button that is on the top right corner within the main box in the center of the page.

  4. Click on Remove version.

    13

CLI

In order to delete a version of a schema through the CLI, execute the following command:

cloudctl es schema-remove demoSchema_CLI_USER1 --version 1
Remove version with ID 1 of schema demoSchema_CLI_USER1? [y/n]> y
Version with ID 1 of schema demoSchema_CLI_USER1 removed.
OK

We can see only version 2 now:

cloudctl es schema demoSchema_CLI_USER1
Schema demoSchema_CLI_USER1 is active.
Version Version ID Schema State Updated Comment
2.0.0 2 demoSchema_CLI_USER1 active Fri, 24 Jul 2020 14:09:37 UTC
OK

IBM Event Streams Credentials

We have seen how to interact with the IBM Event Streams Schema Registry in order to create, delete, update, etc schemas that our applications will theoretically used for data correctness and application robusteness. However, the first thing that we need to set up in our IBM Event Streams instance are these applications’ service credentials to be able to interact with IBM Event Streams and its Schema Registry. For doing so, we can either use either the GUI or the CLI.

GUI

  1. Go to you IBM Event Streams instance console

    1
  2. Click on Connect to this cluster

    2

In this panel, you will find

  1. The Botstrap server to connect your applications to in order to send and receive messages from your IBM Event Streams instance. We can see we have one external listener (whith SCRAM-SHA authentication) and one internal listener (Depending your IBM Event Streams installation you might have different listeners and authentications for these).

  2. The Schema Registry url your applications will need to work with Apache Avro data schemas.

  3. A Generate SCRAM credentials button to generate the SCRAM credentials your applications will need to authenticate with.

  4. A Certificates section to download either the Java PKCS12 or the PEM certificates (or both) that your applications will need in order to be able to establish the communitaction with your IBM Event Streams instance.

    3

To generate the SCRAM credentials needed by your application to get authenticated against IBM Event Streams to be able to produce and consume messages as well as to create, delete, etc topics and schemas, we need to create a KafkaUser (this happens behind the scenes) which we will set some permissions and get the corresponding SCRAM usernname and password for to be used in our applications kafka clients configuration:

  1. Click on Generate SCRAM credentials

  2. Enter a user name for your credentials and click next (leave the last option selected: Produce messages, consume messages and create topics and schemas so that we give full access to our user for simplicity)

    scram1
  3. Select all topics and click next

    scram2
  4. Select all consumer groups and click next

    scram3
  5. Select all transactional IDs and click next

    scram4

Once you have created your new KafkaUser, you get the SCRAM credentials displayed on the screen:

scram5

CLI

  1. Log into your cluster with the IBM CloudPak CLI

    cloudctl login -a https://cp-console.apps.eda-solutions.gse-ocp.net --skip-ssl-validation
    Username> admin
    Password>
    Authenticating...
    OK
    Targeted account mycluster Account
  2. Initialize the Event Streams CLI plugin

    cloudctl es init
    IBM Cloud Platform Common Services endpoint: https://cp-console.apps.eda-solutions.gse-ocp.net
    Namespace: integration
    Name: es-1
    IBM Cloud Pak for Integration UI address: No instance of Cloud Pak for Integration has been found. Please check that you have access to it.
    Event Streams API endpoint: https://es-1-ibm-es-admapi-external-integration.apps.eda-solutions.gse-ocp.net
    Event Streams API status: OK
    Event Streams UI address: https://es-1-ibm-es-ui-integration.apps.eda-solutions.gse-ocp.net

We can see above the Event Streams bootstrap address and Schema Registry url that our applications will need in order to connect to this Event Streams instance

To be able to establish communication and authenticate against your IBM Event Streams instance, you will need the PEM certificate and an the SCRAM credentials:

  1. To download your PEM certificate, you can use the following command:

    cloudctl es certificates --format pem
    Certificate successfully written to /home/ALMARAZJ/es-cert.pem.
    OK
  2. To generate your SCRAM credentials, we first need to create a KafkaUser, you can use the following command:

    cloudctl es kafka-user-create --name my-user1 --consumer --producer --schema-topic-create --all-topics --all-groups --all-txnids --auth-type scram-sha-512
    KafkaUser name Authentication Authorization Username Secret
    my-user1 scram-sha-512 simple EntityOperator has not created corresponding username EntityOperator has not created corresponding secret
    Resource type Name Pattern type Host Operation
    topic * literal * Read
    topic __schema_ prefix * Read
    topic * literal * Write

When a KafkaUser custom resource is created, the Entity Operator within Event Streams will create the principal in ZooKeeper with appropriate ACL entries. It will also create a Kubernetes Secret that contains the Base64-encoded SCRAM password for the scram-sha-512 authentication type, or the Base64-encoded certificates and keys for the tls authentication type.

  1. Retrieve the username and the secret name containing the password of your SCRAM credentials for your KafkaUser:

    oc get kafkauser my-user1 --namespace integration -o jsonpath='{"username: "}{.status.username}{"\nsecret-name: "}{.status.secret}{"\n"}'
    username: my-user1
    secret-name: my-user1
  2. Retrieve the password of your SCRAM credentials from the secret above:

    oc get secret my-user1 --namespace integration -o jsonpath='{.data.password}' | base64 --decode
    *****

Environment variables

Now that we have generated the appropriate IBM Event Streams credentials for applications to be able to establish communication and authenticate against our IBM Event Streams instance, we are going to set some environment variables that will be used by our Python application:

  1. KAFKA_BROKERS which should take the value of bootstrap server:

    export KAFKA_BROKERS=es-1-kafka-bootstrap-integration.apps.eda-solutions.gse-ocp.net:443
  2. SCRAM_USERNAME which should take the value of the SCRAM username you have generated:

    export SCRAM_USERNAME=my-user1
  3. SCRAM_PASSWORD which should take the value of the SCRAM password you have generated:

    export SCRAM_PASSWORD=*****
  4. PEM_CERT which should take the value of the location where the PEM certificate is within your IBM Cloud Shell:

    export PEM_CERT=~/es-cert.pem

    (*) Don’t forget to download both the PEM certificate to your IBM Cloud Shell through the CLI or upload it to your IBM Cloud Shell from your laptop if you used the UI to get the certificate. Review previous section if needed.

  5. SCHEMA_REGISTRY_URL which should be a combination of the SCRAM username, the SCRAM password and the Schema Registry url in the form of:

    https://<SCRAM_username>:<SCRAM_password>@<Schema_Registry_url>

    export SCHEMA_REGISTRY_URL=https://${SCRAM_USERNAME}:${SCRAM_PASSWORD}@es-1-ibm-es-schema-external-integration.apps.eda-solutions.gse-ocp.net

Python Application

The Python application we have built to see how to produce and consume messages (either plain messages or Avro encoded messages based on Avro Data Schemas) to and from an IBM Event Streams instance installed through the IBM Cloud Pak for Integration is public at the following GitHub repo: https://github.com/ibm-cloud-architecture/refarch-eda-tools/tree/master/labs/es-cp4i-schema-lab-v10

Clone

In order to use and work with this Python application, the first thing we need to do is to clone the GitHub repository where it is published.

  1. Clone the github repository on your workstation on the location of your choice:

    git clone https://github.com/ibm-cloud-architecture/refarch-eda-tools.git
    Cloning into 'refarch-eda-tools'...
    remote: Enumerating objects: 185, done.
    remote: Counting objects: 100% (185/185), done.
    remote: Compressing objects: 100% (148/148), done.
    remote: Total 185 (delta 23), reused 176 (delta 16), pack-reused 0
    Receiving objects: 100% (185/185), 6.17 MiB | 4.61 MiB/s, done.
    Resolving deltas: 100% (23/23), done.
  2. Change directory into refarch-eda-tools/labs/es-cp4i-schema-lab-v10 to find the assets we will we working from now on for the python demo environment and few other scripts/applications:

    cd refarch-eda-tools/labs/es-cp4i-schema-lab-v10
    $ ls -all
    total 240
    drwxr-xr-x 9 user staff 288 20 May 19:33 .
    drwxr-xr-x 3 user staff 96 20 May 19:33 ..
    -rw-r--r-- 1 user staff 112578 20 May 19:33 README.md
    drwxr-xr-x 5 user staff 160 20 May 19:33 avro_files
    drwxr-xr-x 6 user staff 192 20 May 19:33 kafka

In the next sections, we are going to briefly explain the implementation of this Python application so that you understand what is being done behind the scenes and more importantly, if you are a developer, how to do so.

Python Avro Producer

In this section we describe the Python scripts we will be using in order to be able to produce avro messages to a Kafka topic.

Produce Message

The python script that we will use to send an avro message to a Kafka topic is ProduceAvroMessage.py where we have the following:

  1. A function to parse the arguments:

    def parseArguments():
    global TOPIC_NAME
    print("The arguments for this script are: " , str(sys.argv))
    if len(sys.argv) == 2:
    TOPIC_NAME = sys.argv[1]
    else:
    print("[ERROR] - The produceAvroMessage.py script expects one argument: The Kafka topic to publish the message to")
    exit(1)
  2. A function to create the event to be sent:

    def createEvent():
    print('Creating event...')
    key = {"key": 1}
    value = {"message" : "This is a test message"}
    print("DONE")
    return json.dumps(value), json.dumps(key)
  3. The main where we will:

    1. Parse the arguments
    2. Get the Avro schemas for the key and value of the event
    3. Create the Event to be sent
    4. Print it out for reference
    5. Create the Kafka Avro Producer and configure it
    6. Send the event
    if __name__ == '__main__':
    # Get the Kafka topic name
    parseArguments()
    # Get the avro schemas for the message's key and value
    event_value_schema = getDefaultEventValueSchema(DATA_SCHEMAS)
    event_key_schema = getDefaultEventKeySchema(DATA_SCHEMAS)
    # Create the event
    event_value, event_key = createEvent()
    # Print out the event to be sent

As you can see, this python code depends on an Avro Utils for loading the Avro schemas and a Kafka Avro Producer to send the messages. These are explained next.

Avro Utils

This script, called avroEDAUtils.py, contains some very simple utility functions to be able to load Avro schemas from their avsc files in order to be used by the Kafka Avro Producer.

  1. A function to get the key and value Avro schemas for the messages to be sent:

    def getDefaultEventValueSchema(schema_files_location):
    # Get the default event value data schema
    known_schemas = avro.schema.Names()
    default_event_value_schema = LoadAvsc(schema_files_location + "/default_value.avsc", known_schemas)
    return default_event_value_schema
    def getDefaultEventKeySchema(schema_files_location):
    # Get the default event key data schema
    known_schemas = avro.schema.Names()

    (*) Where known_schemas is an Avro schema dictionary where all Avro schemas read get stored in order to be able to read nested Avro schemas afterwards. See the python script in detail for examples of this.

  2. A function to open a file, read its content as an Avro schema and store it in the Avro schema dictionary:

    def LoadAvsc(file_path, names=None):
    # Load avsc file
    # file_path: path to schema file
    # names(optional): avro.schema.Names object
    file_text = open(file_path).read()
    json_data = json.loads(file_text)
    schema = avro.schema.SchemaFromJSONData(json_data, names)
    return schema

Kafka Avro Producer

This script, called KcAvroProducer.py, will actually be the responsible for creating the Kafka Avro Producer, initialize and configure it and provide the publish method:

  1. Initialize and prepare the Kafka Producer

    class KafkaProducer:
    def __init__(self,kafka_brokers = "",scram_username = "",scram_password = "",schema_registry_url = ""):
    self.kafka_brokers = kafka_brokers
    self.scram_username = scram_username
    self.scram_password = scram_password
    self.schema_registry_url = schema_registry_url
    def prepareProducer(self,groupID = "pythonproducers",key_schema = "", value_schema = ""):
  2. Publish method

    def publishEvent(self, topicName, value, key):
    # Produce the Avro message
    # Important: value DOES NOT come in JSON format from ContainerAvroProducer.py. Therefore, we must convert it to JSON format first
    self.producer.produce(topic=topicName,value=json.loads(value),key=json.loads(value)[key], callback=self.delivery_report)
    # Flush
    self.producer.flush()

Python Avro Consumer

In this section we describe the python scripts we will be using in order to be able to consume Avro messages from a Kafka topic.

Consume Message

The python script that we will use to consume an Avro message from a Kafka topic is ConsumeAvroMessage.py where we have the following:

  1. A function to parse arguments:

    # Parse arguments to get the container ID to poll for
    def parseArguments():
    global TOPIC_NAME
    print("The arguments for the script are: " , str(sys.argv))
    if len(sys.argv) != 2:
    print("[ERROR] - The ConsumeAvroMessage.py script expects one arguments: The Kafka topic to events from.")
    exit(1)
    TOPIC_NAME = sys.argv[1]
  2. The main where we will:

    1. Parse the arguments to get the topic to read from
    2. Create the Kafka Consumer and configure it
    3. Poll for next avro message
    4. Close the Kafka consumer
    if __name__ == '__main__':
    # Parse arguments
    parseArguments()
    # Create the Kafka Avro consumer
    kafka_consumer = KafkaConsumer(KAFKA_BROKERS,SCRAM_USERNAME,SCRAM_PASSWORD,TOPIC_NAME,SCHEMA_REGISTRY_URL)
    # Prepare the consumer
    kafka_consumer.prepareConsumer()
    # Consume next Avro event
    kafka_consumer.pollNextEvent()

As you can see, this python code depends on a Kafka Avro Consumer to consume messages. This is explained next.

Kafka Avro Consumer

This script, called KcAvroConsumer.py, will actually be the responsible for creating the Kafka Avro Consumer, initialize and configure it and provide the poll next event method:

  1. Initialize and prepare the new Kafka consumer:

    class KafkaConsumer:
    def __init__(self, kafka_brokers = "", scram_username = "",scram_password = "", topic_name = "", schema_registry_url = "", autocommit = True):
    self.kafka_brokers = kafka_brokers
    self.scram_username = scram_username
    self.scram_password = scram_password
    self.topic_name = topic_name
    self.schema_registry_url = schema_registry_url
    self.kafka_auto_commit = autocommit
  2. Poll next event method:

    # Prints out the message
    def traceResponse(self, msg):
    print('[Message] - Next message consumed from {} partition: [{}] at offset {} with key {} and value {}'
    .format(msg.topic(), msg.partition(), msg.offset(), msg.key(), msg.value() ))
    # Polls for next event
    def pollNextEvent(self):
    # Poll for messages
    msg = self.consumer.poll(timeout=10.0)

Schemas and Messages

In this section we are going to see how Schema Registry works when you have an application that produces and consumes messages based on Avro data schemas. The application we are going to use for this is the python application presented above in the Python Avro Producer and Python Avro Consumer sections.

IMPORTANT: Before start using our Python application we must set the PYTHONPATH environment variable to point to where we have all the Python scripts that make up our application in order for Python to find these at execution time.

  1. Set the PYTHONPATH variable to the location where you cloned the GitHub repository containing the Python application we are going to be working with
export PYTHONPATH=~/refarch-eda-tools/labs/es-cp4i-schema-lab-v10

Produce a message

In order to produce a message, we execute the ProduceAvroMessage.py. This script, as you could see in the Python Avro Producer section, is sending the event with key {'key': '1'} and value {'message': 'This is a test message'} according to the schemas defined in default_key.avsc and default_value.avsc for the key and value of the event respectively.

python3 ProduceAvroMessage.py test-schema-user1
@@@ Executing script: ProduceAvroMessage.py
The arguments for the script are: ['ProduceAvroMessage.py', 'test-schema-user1']
Creating event...
DONE
--- Event to be published: ---
{"key": 1}
{"message": "This is a test message"}

We can see our new message delivered in the test-schema-user1 topic by

  1. Go into the topics page in the IBM Event Streams UI

    14
  2. Click on the topic and then on the Messages tab at the top. Finally, click on a message to see it displayed on the right hand side of the screen

    15

IMPORTANT: As you can see, we got the test-schema-user1 topic auto-created when we produced the message. The reason for this is that

  1. Kafka is set out of the box to let applications to auto-create topics.
  2. We created the SCRAM credentials for our application to allow the application to create topics.

On a production-like environment, you don’t want developers creating applications that auto-create topics in your IBM Event Streams instance without any control. For that, we would configure Kafka to forbid topic auto-creation (https://kafka.apache.org/documentation/#auto.create.topics.enable) as well as thoroughly created the SCRAM credentials with the most strict but appropriate permissions that our application needs.

IMPORTANT: Similar to the auto-creation of topics, we can see below that our application got the Avro data schemas for both the key and value of the message produced auto-registered. This is because many client libraries come with a SerDes property to allow them to auto-register the Avro data schemas (https://docs.confluent.io/current/clients/confluent-kafka-python/#avroserializer). However, on a production-like environment we don’t want applications to auto-register schemas without any control but yet we can not leave it to the developers to set the auto-registration property off on their libraries. Instead, we would create the SCRAM credentials with the most strict but appropriate permissions that our application needs.

If we look now at the schemas our schema registry has:

cloudctl es schemas
Schema State Latest version Latest version ID Updated
demoSchema_CLI_USER1 active 2.0.0 2 Fri, 24 Jul 2020 14:09:37 UTC
demoSchema_UI_USER1 active 2.0.0 2 Fri, 24 Jul 2020 14:06:27 UTC
test-schema-user1-key-d89uk active 1 1 Fri, 24 Jul 2020 15:41:46 UTC
test-schema-user1-value-tv5efr active 1 1 Fri, 24 Jul 2020 15:41:45 UTC
OK

we see two schemas, test-schema-user1-key-d89uk and test-schema-user1-value-tv5efr, which in fact correspond to the Avro data schema used for the key (default_key.avsc) and the value (default_value.avsc) of events sent to the test-schema-user1 topic in the ProduceAvroMessage.py as explained before sending the message.

To make sure of what we are saying, we can inspect those schemas:

cloudctl es schema test-schema-user1-key-d89uk --version 1
{
"type": "record",
"name": "defaultKey",
"namespace": "ibm.eda.default",
"fields": [
{
"type": "int",
cloudctl es schema test-schema-user1-value-tv5efr --version 1
{
"type": "record",
"name": "defaultValue",
"namespace": "ibm.eda.default",
"fields": [
{
"type": "string",

If I now decided that my events should contain another attribute, I would modify the event value schema (default_value.avsc) to reflect that as well as ProduceAvroMessage.py to send that new attribute in the event it sends:

python3 ProduceAvroMessage.py test-schema-user1
@@@ Executing script: ProduceAvroMessage.py
The arguments for the script are: ['ProduceAvroMessage.py', 'test-schema-user1']
Creating event...
DONE
--- Event to be published: ---
{"key": 1}
{"message": "This is a test message", "anotherAttribute": "Just another test string"}

I can see that an event with a new attribute has been sent:

16

And I can also see that the new shcema has got registered as well:

cloudctl es schemas
Schema State Latest version Latest version ID Updated
demoSchema_CLI_USER1 active 2.0.0 2 Fri, 24 Jul 2020 14:09:37 UTC
demoSchema_UI_USER1 active 2.0.0 2 Fri, 24 Jul 2020 14:06:27 UTC
test-schema-user1-key-d89uk active 1 1 Fri, 24 Jul 2020 15:41:46 UTC
test-schema-user1-value-a5bbaa active 1 1 Fri, 24 Jul 2020 15:54:37 UTC
test-schema-user1-value-tv5efr active 1 1 Fri, 24 Jul 2020 15:41:45 UTC
OK

If I inspect that new schema, I see my new attribute in it:

cloudctl es schema test-schema-user1-value-a5bbaa --version 1
{
"type": "record",
"name": "defaultValue",
"namespace": "ibm.eda.default",
"fields": [
{
"type": "string",

Create a non-compliant message

Let’s see what happens if we send a message that does not comply with its Avro data schema. Let’s say that I send the following message:

key = {"key": 1}
value = {"message" : 12345}

and this is the output of that attempt:

python3 ProduceAvroMessage.py test-schema-user1
@@@ Executing script: ProduceAvroMessage.py
The arguments for the script are: ['ProduceAvroMessage.py', 'test-schema-user1']
Creating event...
DONE
--- Event to be published: ---
{"key": 1}
{"message": 12345}

As we can see, the attempt failed as the Avro producer will check the message against the Avro data schema defined for the topic we want to send the message to and yield that this message does not comply (the message value attribute we are sending is an integer rather than a string and we are missing the second attribute).

Therefore, using Avro schemas with IBM Event Streams give us the ability to build our system with robustness protecting downstream data consumers from malformed data, as only valid data will be permitted in the topic.

Consume a message

In order to consume a message, we execute the ConsumeAvroMessage.py within the /tmp/lab/src folder in our python demo environment:

python3 ConsumeAvroMessage.py test-schema-user1
@@@ Executing script: ConsumeAvroMessage.py
The arguments for this script are: ['ConsumeAvroMessage.py', 'test-schema-user1']
--- This is the configuration for the Avro consumer: ---
{'bootstrap.servers': 'es-1-kafka-bootstrap-integration.apps.eda-solutions.gse-ocp.net:443', 'group.id': 'pythonconsumers', 'auto.offset.reset': 'earliest', 'schema.registry.url': 'https://my-user1:*****@es-1-ibm-es-schema-external-integration.apps.eda-solutions.gse-ocp.net', 'enable.auto.commit': True, 'security.protocol': 'SASL_SSL', 'sasl.mechanisms': 'SCRAM-SHA-512', 'sasl.username': 'my-user1', 'sasl.password': '*****', 'ssl.ca.location': '/home/ALMARAZJ/es-cert.pem', 'schema.registry.ssl.ca.location': '/home/ALMARAZJ/es-cert.pem'}
---------------------------------------------------
[Message] - Next message consumed from test-schema-user1 partition: [0] at offset 0 with key {'key': 1} and value {'message': 'This is a test message'}

As you can see, our script was able to read the Avro messages from the test-schema-user1 topic and map that back to their original structure thanks to the Avro schemas:

[Message] - Next message consumed from test-schema partition: [0] at offset 0 with key {'key': 1} and value {'message': 'This is a test message'}
[Message] - Next message consumed from test-schema partition: [0] at offset 1 with key {'key': 1} and value {'message': 'This is a test message', 'anotherAttribute': 'Just another test string'}

Data Evolution

So far we have more or less seen what Avro is, what an Avro data schema is, what a schema registry is and how this all works together. From creating an Avro data schema for your messages/events to comply with to how the schema registry and Avro data schemas work together. And we have also seen the code for doing all this, from the python code to send and receive Avro encoded messages based on their Avro data schemas to the rich CLI IBM Event Streams provides to interact with.

However, we have said little about the need for data to evolve. When you design an Event Driven architecture for your application (by applying Event Storming or Domain Driven Design for example), it is very hard to come up with data structures/schemas that will not need to evolve/change in time. That is, your data, like your use or business cases, may need to evolve. As a result, Avro data schemas must be somehow flexible to allow your data to evolve along with your application and use cases.

But it is not as easy as adding or removing data that travels in your events/messages or modifying the type of such data. And one of the reasons for this is that Kafka (or any other type of event backbone) is many times used as the source of truth. That is, a place that you can trust as to what has happened. Hence, Kafka will serve as the source of truth where all the events (that is, data) that happened (which could be bank transactions, communications, etc) will get stored (sometimes up to hundreds of years) and will be able to be replayed if needed. As a result, there must be a data schema management and data schema evolution put in place that allow the compatibility of old and new data schemas and, in fact, old and new data at the end of the day.

The IBM Event Streams Schema Registry enforces full compatibility when creating a new version of a schema. Full compatibility means that old data can be read with the new data schema, and new data can also be read with the last data schema.

In data formats like Avro, you can define fields with default values. In that case adding or removing a field with a default value is a fully compatible change. Let’s see then how this affects when you want your data to evolve in a way that it needs to add a new attribute or delete an existing attribute.

But let’s see what that means in terms of adding and removing attributes from your data schema.

Adding a new attribute

Although we have already seen this in the adding a new version of a schema section, let’s try to add a new version of our test-schema-value schema where we have a new attribute. Remember, our default_schema.avsc already contains a new attribute than the original one but that it got registered as a new schema rather than as a new version of the original one. Let’s reuse that Avro schema file to register it as a new version (INFO: you might need to copy/download that file to your local workstation in order to be able to then upload it to the IBM Event Streams through its UI)

When doing so from the UI, we see the following error:

17

The reason, as alread explained in the add a new version of a schema section, is because full compatibility dictates that you can only add new attributes to a schema if these have a default value. Reason being that a receiver should be able to deserialize messages produced with an older schema using the newer schema. Because old messages were written with an older schema that did not contain our new attribute, those messages won’t have that attribute so we need to provide a default value for it in our never version of the schema so that the receiver is able to deserialize those older messages with the newer schema.

If we add the default value for the new attribute, we see that our newer version is now compatible:

18

and that it gets registered fine:

19

Removing an existing attribute

What if we now wanted to remove the original message attribute from our schema. Let’s remove it from the default_value.avsc file and try to register that new version:

20

We, again, get the same error. And the reason is because receivers must be able to read and deserialize messages produced with the newer schema (that is, without the message attribute) but with the older schema (that is, with the schema version that enforces the existence of the message attribute).

In order to work this around, what we need to do is to register first an intermediate schema that defines a default value for the message attribute:

21

Once we have a default value for the message attribute, we can register a new version of the schema that finally removes that attribute:

22

Security

As we have already mentioned during the this tutorial, we need to pay attention to the permissions we give to users, groups, applications (and thefore the clients they used to interact with IBM Event Streams), etc since we don’t want everyone and everything to be, for instance, creating or deleting topics, schemas, etc.

You can secure your IBM Event Streams resources in a fine-grained manner by managing the access each user and application has to each resource. Within IBM Event Streams, you can secure access to the following resource types, where the names in parentheses are the resource type names used in Access Control List (ACL) rules:

  • Topics (topic): you can control the ability of users and applications to create, delete, read, and write to a topic.
  • Consumer groups (group): you can control an application’s ability to join a consumer group.
  • Transactional IDs (transactionalId): you can control the ability to use the transaction capability in Kafka.

Note: Schemas in the Event Streams Schema Registry are a special case and are secured using the resource type of topic combined with a prefix of _schema. You can control the ability of users and applications to create, delete, read, and update schemas.

You can find more information about how to secure your IBM Event Streams resources in the official documentation at: https://ibm.github.io/event-streams/security/managing-access/