IBM Event Streams Schema Registry from IBM CloudPak for Integration
This documentation aims to be a introductory hands-on lab on the IBM Event Streams Schema Registry installed throught the IBM Cloud Pak for Integration V2020.2.X+ on an Openshift cluster.
Index
- Requirements
- IBM Cloud Shell
- Schema Registry
- Schemas
- IBM Event Streams Credentials
- Python Application
- Python Avro Producer
- Python Avro Consumer
- Schemas and Messages
- Data Evolution
- Security
Requirements
This lab requires the following components to work against:
- An IBM Event Streams V10 instance installed through the IBM CloudPak for Integration V2020.2.X or greater.
- An IBM Cloud Shell - https://www.ibm.com/cloud/cloud-shell
IBM Cloud Shell
Here we are going to set up our IBM Cloud Shell with all the tools required to carry out this lab.
Start your IBM Cloud Shell by pointing your browser to https://cloud.ibm.com/shell

IBM Cloud Pak CLI
Cloudctl is a command line tool to manage Container Application Software for Enterprises (CASEs). This CLI will allow us to manage Cloud Pak related components as well as software, like IBM Event Streams, installed through any IBM Cloud Pak.
In order to install it, execute the following commands in your IBM Cloud Shell:
- Download the IBM Cloud Pak CLI -
curl -L https://github.com/IBM/cloud-pak-cli/releases/latest/download/cloudctl-linux-amd64.tar.gz -o cloudctl-linux-amd64.tar.gz
- Untar it -
tar -xvf cloudctl-linux-amd64.tar.gz
- Rename it for ease of use -
mv cloudctl-linux-amd64 cloudctl
- Include it to the PATH environment variable -
export PATH=$PATH:$PWD
- Make sure your IBM Cloud Pak CLI is in the path-
which cloudctl
- Make sure your IBM Cloud Pak CLI works -
cloudctl help

Note: If you are not using the IBM Cloud Shell to run the lab, be aware that the cloudctl
CLI requires the kubectl
CLI. To install the kubectl
CLI on your personal environment, follow the instructions here
Event Streams plugin for IBM Cloud Pak CLI
This plugin will allow us to manage IBM Event Streams.
In order to install it, execute the following commands in your IBM Cloud Shell:
- Download the Event Streams plugin for IBM Cloud Pak CLI -
curl -L http://ibm.biz/es-cli-linux -o es-plugin
- Install it -
cloudctl plugin install es-plugin
- Make sure it works -
cloudctl es help

Git
IBM Cloud Shell comes with Git already installed out of the box.
Vi
IBM Cloud Shell comes with Vi already installed out of the box.
Python 3
IBM Cloud Shell comes with Python 3 already installed out of the box. However, we need to install the following modules that will be used later on in this tutorial when we run a Python application to work with Avro, Schemas and messages. These modules are confluent_kafka
and avro-python3
In order to install these modules, execute the following command in your IBM Cloud Shell:
- Install the modules -
python3 -mpip install avro-python3 confluent_kafka --user

Congrats! you have now your IBM Cloud Shell ready to start working.
Schema Registry

One of the most common technologies used in the industry these days to define, serialize and deserialize messages flowing through your Kafka topics is Apache Avro (https://avro.apache.org/docs/current/). To learn more about Apache Avro, how to define Apache Avro data schemas and more, we strongly recommend to read through our documentation on Avro and data schemas here
IBM Event Streams development team has developed a Schema Registry to work along your Kafka cluster to provide a place to store descriptions of the message formats used by your producers and consumers. The benefit of storing these descriptions is that you are able to validate that your producing and consuming applications will correctly inter-operate. The Schema Registry will also provide the ability for schemas to evolve in time.
Accessing the Schema Registry
UI
To access the schema registry, we first need to log into IBM Event Streams.
Point your browser to your IBM Event Streams instace’s user interface url and introduce your credentials
Once you are logged into your IBM Event Streams instance, you simply need to click on the Schema Registry button on the main left hand vertical menu bar:
CLI
We can also interact with the Schema Registry through the IBM Event Streams CLI. In order to do so, we first need to log in with the IBM Cloud Pak CLI:
Log into your cluster with the IBM CloudPak CLI
cloudctl login -a https://cp-console.apps.eda-solutions.gse-ocp.net --skip-ssl-validationUsername> adminPassword>Authenticating...OKTargeted account mycluster AccountInitialize the Event Streams CLI plugin
cloudctl es initIBM Cloud Platform Common Services endpoint: https://cp-console.apps.eda-solutions.gse-ocp.netNamespace: integrationName: es-1IBM Cloud Pak for Integration UI address: No instance of Cloud Pak for Integration has been found. Please check that you have access to it.Event Streams API endpoint: https://es-1-ibm-es-admapi-external-integration.apps.eda-solutions.gse-ocp.netEvent Streams API status: OKEvent Streams UI address: https://es-1-ibm-es-ui-integration.apps.eda-solutions.gse-ocp.net(*)The above information will later be used in the IBM Event Streams Credentials section as these are neeeded by the Python application we will work with.
Make sure you can access the IBM Event Streams Schema Registry:
cloudctl es schemasNo schemas were found.OK
Schemas
In this section we will finally get our hands dirty with the IBM Event Steams Schema Registry capability by working with Apache Avro schemas and the Schema Registry.
Create a schema
Let’s see how can we create a schema to start playing with.
UI
The IBM EVent Streams user interface allow us to create schemas only from json or Avro schema avsc files.
Create an Avro schema file avsc with your schema:
echo '{"type":"record","name":"demoSchema_UI_USER1","namespace": "schemas.demo.ui","fields":[{"name": "eventKey","type":"string"},{"name": "message","type":"string"}]}' > demoshema-ui.avscOn the IBM Event Streams Schema Registry User Interface, Click on Add schema button on the top right corner.
Click on Upload definition button on the left hand side and select the
demoschema-ui.avsc
file we just created.You should now see you Avro schema loaded in the UI with two tabs, definition and preview to make sure your schema looks as desired:
Click on Add schema button at the top right corner and you should now see that schema listed among your other schemas.
CLI
Create another Avro schema avsc file with a different schema:
echo '{"type":"record","name":"demoSchema_CLI_USER1","namespace": "schemas.demo.cli","fields":[{"name": "eventKey","type":"string"},{"name": "message","type":"string"}]}' > demoshema-cli.avscCreate a schema by executing the following command:
cloudctl es schema-add --file demoshema-cli.avscSchema demoSchema_CLI_USER1 is active.Version Version ID Schema State Updated Comment1.0.0 1 demoSchema_CLI_USER1 active Thu, 25 Jun 2020 11:30:42 UTCAdded version 1.0.0 of schema demoSchema_CLI_USER1 to the registry.OK
List schemas
UI
In order to list the schemas in the UI you just simply need to open up the Schema Registry User Interface and schemas will get listed in there automatically. You also have a search tool bar at the top. You can also see more details about your schema by clicking the drop down arrow on its left:
CLI
Execute the following command to list the schemas in your Schema Registry:
cloudctl es schemasSchema State Latest version Latest version ID UpdateddemoSchema_CLI_USER1 active 1.0.0 1 Fri, 24 Jul 2020 13:55:49 UTCdemoSchema_UI_USER1 active 1.0.0 1 Fri, 24 Jul 2020 13:55:51 UTCOK
Delete schemas
UI
Click on the schema you want to delete.
Click on the Manage schema tab at the top.
Click on Remove schema
CLI
To remove a schema using the CLI, simply execute the following command and confirm:
cloudctl es schema-remove demoSchema_CLI_USER1Remove schema demoSchema_CLI_USER1 and all versions? [y/n]> ySchema demoSchema_CLI_USER1 and all versions removed.OK
Create new schema version
To create a new version of a schema,
Let’s first create again the previous two schemas:
cloudctl es schema-add --file demoshema-ui.avscSchema demoSchema_UI_USER1 is active.Version Version ID Schema State Updated Comment1.0.0 1 demoSchema_UI_USER1 active Fri, 24 Jul 2020 13:59:55 UTCAdded version 1.0.0 of schema demoSchema_UI_USER1 to the registry.OKcloudctl es schema-add --file demoshema-cli.avscSchema demoSchema_CLI_USER1 is active.Version Version ID Schema State Updated Comment1.0.0 1 demoSchema_CLI_USER1 active Fri, 24 Jul 2020 14:00:45 UTCAdded version 1.0.0 of schema demoSchema_CLI_USER1 to the registry.OKAdd a new attribute to the schemas by editing their Avro schema avsc files:
cat demoshema-ui.avsc{"type":"record","name":"demoSchema_UI_USER1","namespace": "schemas.demo.ui","fields":[{"name": "eventKey","type":"string"},{"name": "message","type":"string"},
UI
Click on the schema you want to create a new version for.
Click on the Add new version button on the left hand side.
Click on Upload definition button on the left hand side.
Select the Avro schema avsc file and click ok.
Full compatibility for data schemas means that messages that have been serialized with an earlier version of a schema can be deserialized with a later version. To be compatible, fields in later versions of a schema cannot be removed, and any new schema field must have a default value. More on data schema compatibility on the section Data Evolution towards the end of this lab.
As explained in the error notification above, we need to add a default value for our new attribute in our data schema so that messages serialized with an older version of the data schema which won’t contain this new attribute can later be deserialized with the newer version of the data schema that expects such attribute. By providing a default value, we allow deserializers to consume messages that do not contain newer attributes.
Add a default value for the new attribute:
cat demoshema-ui.avsc{"type":"record","name":"demoSchema_UI_USER1","namespace": "schemas.demo.ui","fields":[{"name": "eventKey","type":"string"},{"name": "message","type":"string"},Repeat the steps for adding a new version of a schema above.
This time you should see that the schema is valid:
However, it still does not let us add this new version to the data schema until we actually provide a version for it. Click on the Add + link on the right of the version attribute of the schema and give it
2.0.0
for example (hit enter for the version to take the value you type in).Click on Add schema.
You should now see the two versions for your data schema on the left hand side.
If you go back to the Schema Registry page where all your schemas are listed, you should now see that the latest version for your data schema is
2.0.0
now.
CLI
If we try to add the new version of the schema from its
demoschema-cli.avsc
Avro schema file, we will get the same error as in the previous UI example:cloudctl es schema-add --file demoshema-cli.avscFAILEDEvent Streams API request failed:Error response from server. Status code: 400. Avro schema is not compatible with latest schema version: Compatibility type 'MUTUAL_READ' does not hold between 1 schema(s) in the chronology because: Schema[0] has incompatibilities: ['READER_FIELD_MISSING_DEFAULT_VALUE: attribute1' at '/fields/2'].Unable to add version 1.0.0 of schema demoSchema_CLI_USER1 to the registry.Add the default value for the new attribute in your Avro schema avsc file and try to add that new version of the schema:
cloudctl es schema-add --file demoshema-cli.avscFAILEDEvent Streams API request failed:Error response from server. Status code: 409. Schema version name already existsUnable to add version 1.0.0 of schema demoSchema_CLI_USER1 to the registry.We see that we still have an error because we have not specified a new version value. Specify a new version value when adding this new version of the schema:
cloudctl es schema-add --file demoshema-cli.avsc --version 2.0.0Schema demoSchema_CLI_USER1 is active.Version Version ID Schema State Updated Comment1.0.0 1 demoSchema_CLI_USER1 active Fri, 24 Jul 2020 14:00:45 UTC2.0.0 2 demoSchema_CLI_USER1 active Fri, 24 Jul 2020 14:09:37 UTCAdded version 2.0.0 of schema demoSchema_CLI_USER1 to the registry.
Get latest version of a schema
UI
In order to see the latest version of a data schema using the UI, we just need to go to the Schema Registry web user interface and click on the expand arrow buttton that is on the left:
CLI
In order to see the latest version of a data schema using the CLI, we simply need to run the following command:
cloudctl es schema demoSchema_CLI_USER1 --version 2{"type": "record","name": "demoSchema_CLI_USER1","namespace": "schemas.demo.cli","fields": [{"name": "eventKey",
(*) The version you specify is actually the version ID (2) rather than the version name we gave to the newer schema version (2.0.0):
cloudctl es schema demoSchema_CLI_USER1Schema demoSchema_CLI_USER1 is active.Version Version ID Schema State Updated Comment1.0.0 1 demoSchema_CLI_USER1 active Fri, 24 Jul 2020 14:00:45 UTC2.0.0 2 demoSchema_CLI_USER1 active Fri, 24 Jul 2020 14:09:37 UTCOK
Get specific version of a schema
UI
To see a specific version of a schema, go to the Schema Registry web user interface and click on the schema you want to see the version for. You will now see how many version of the schema you have and you can click on any of these in order to see more details about it.
CLI
To see a specific version of a schema using the CLI, simply run the following command with the version ID you would like to get retrieved:
cloudctl es schema demoSchema_CLI_USER1 --version 1{"type": "record","name": "demoSchema_CLI_USER1","namespace": "schemas.demo.cli","fields": [{"name": "eventKey",
Listing all versions of a schema
UI
To list all versions of schema in the Schema Registry user interface, you simply need to click on the data schema you want and a new page will display these:
CLI
In order to display all versions of a schema, run the following command:
cloudctl es schema demoSchema_CLI_USER1Schema demoSchema_CLI_USER1 is active.Version Version ID Schema State Updated Comment1.0.0 1 demoSchema_CLI_USER1 active Fri, 24 Jul 2020 14:00:45 UTC2.0.0 2 demoSchema_CLI_USER1 active Fri, 24 Jul 2020 14:09:37 UTCOK
Deleting a version of a schema
UI
In order to delete a version of a schema using the Schema Registry user interface,
Click on the data schema you want a version of it deleted for.
Select the version you want to delete on the left hand side.
Click on Manage version button that is on the top right corner within the main box in the center of the page.
Click on Remove version.
CLI
In order to delete a version of a schema through the CLI, execute the following command:
cloudctl es schema-remove demoSchema_CLI_USER1 --version 1Remove version with ID 1 of schema demoSchema_CLI_USER1? [y/n]> yVersion with ID 1 of schema demoSchema_CLI_USER1 removed.OK
We can see only version 2 now:
cloudctl es schema demoSchema_CLI_USER1Schema demoSchema_CLI_USER1 is active.Version Version ID Schema State Updated Comment2.0.0 2 demoSchema_CLI_USER1 active Fri, 24 Jul 2020 14:09:37 UTCOK
IBM Event Streams Credentials
We have seen how to interact with the IBM Event Streams Schema Registry in order to create, delete, update, etc schemas that our applications will theoretically used for data correctness and application robusteness. However, the first thing that we need to set up in our IBM Event Streams instance are these applications’ service credentials to be able to interact with IBM Event Streams and its Schema Registry. For doing so, we can either use either the GUI or the CLI.
GUI
Go to you IBM Event Streams instance console
Click on Connect to this cluster
In this panel, you will find
The Botstrap server to connect your applications to in order to send and receive messages from your IBM Event Streams instance. We can see we have one external listener (whith SCRAM-SHA authentication) and one internal listener (Depending your IBM Event Streams installation you might have different listeners and authentications for these).
The Schema Registry url your applications will need to work with Apache Avro data schemas.
A Generate SCRAM credentials button to generate the SCRAM credentials your applications will need to authenticate with.
A Certificates section to download either the Java PKCS12 or the PEM certificates (or both) that your applications will need in order to be able to establish the communitaction with your IBM Event Streams instance.
To generate the SCRAM credentials needed by your application to get authenticated against IBM Event Streams to be able to produce and consume messages as well as to create, delete, etc topics and schemas, we need to create a KafkaUser (this happens behind the scenes) which we will set some permissions and get the corresponding SCRAM usernname and password for to be used in our applications kafka clients configuration:
Click on Generate SCRAM credentials
Enter a user name for your credentials and click next (leave the last option selected: Produce messages, consume messages and create topics and schemas so that we give full access to our user for simplicity)
Select all topics and click next
Select all consumer groups and click next
Select all transactional IDs and click next
Once you have created your new KafkaUser, you get the SCRAM credentials displayed on the screen:

CLI
Log into your cluster with the IBM CloudPak CLI
cloudctl login -a https://cp-console.apps.eda-solutions.gse-ocp.net --skip-ssl-validationUsername> adminPassword>Authenticating...OKTargeted account mycluster AccountInitialize the Event Streams CLI plugin
cloudctl es initIBM Cloud Platform Common Services endpoint: https://cp-console.apps.eda-solutions.gse-ocp.netNamespace: integrationName: es-1IBM Cloud Pak for Integration UI address: No instance of Cloud Pak for Integration has been found. Please check that you have access to it.Event Streams API endpoint: https://es-1-ibm-es-admapi-external-integration.apps.eda-solutions.gse-ocp.netEvent Streams API status: OKEvent Streams UI address: https://es-1-ibm-es-ui-integration.apps.eda-solutions.gse-ocp.net
We can see above the Event Streams bootstrap address and Schema Registry url that our applications will need in order to connect to this Event Streams instance
To be able to establish communication and authenticate against your IBM Event Streams instance, you will need the PEM certificate and an the SCRAM credentials:
To download your PEM certificate, you can use the following command:
cloudctl es certificates --format pemCertificate successfully written to /home/ALMARAZJ/es-cert.pem.OKTo generate your SCRAM credentials, we first need to create a KafkaUser, you can use the following command:
cloudctl es kafka-user-create --name my-user1 --consumer --producer --schema-topic-create --all-topics --all-groups --all-txnids --auth-type scram-sha-512KafkaUser name Authentication Authorization Username Secretmy-user1 scram-sha-512 simple EntityOperator has not created corresponding username EntityOperator has not created corresponding secretResource type Name Pattern type Host Operationtopic * literal * Readtopic __schema_ prefix * Readtopic * literal * Write
When a KafkaUser custom resource is created, the Entity Operator within Event Streams will create the principal in ZooKeeper with appropriate ACL entries. It will also create a Kubernetes Secret that contains the Base64-encoded SCRAM password for the scram-sha-512 authentication type, or the Base64-encoded certificates and keys for the tls authentication type.
Retrieve the username and the secret name containing the password of your SCRAM credentials for your KafkaUser:
oc get kafkauser my-user1 --namespace integration -o jsonpath='{"username: "}{.status.username}{"\nsecret-name: "}{.status.secret}{"\n"}'username: my-user1secret-name: my-user1Retrieve the password of your SCRAM credentials from the secret above:
oc get secret my-user1 --namespace integration -o jsonpath='{.data.password}' | base64 --decode*****
Environment variables
Now that we have generated the appropriate IBM Event Streams credentials for applications to be able to establish communication and authenticate against our IBM Event Streams instance, we are going to set some environment variables that will be used by our Python application:
KAFKA_BROKERS which should take the value of bootstrap server:
export KAFKA_BROKERS=es-1-kafka-bootstrap-integration.apps.eda-solutions.gse-ocp.net:443SCRAM_USERNAME which should take the value of the SCRAM username you have generated:
export SCRAM_USERNAME=my-user1SCRAM_PASSWORD which should take the value of the SCRAM password you have generated:
export SCRAM_PASSWORD=*****PEM_CERT which should take the value of the location where the PEM certificate is within your IBM Cloud Shell:
export PEM_CERT=~/es-cert.pem(*) Don’t forget to download both the PEM certificate to your IBM Cloud Shell through the CLI or upload it to your IBM Cloud Shell from your laptop if you used the UI to get the certificate. Review previous section if needed.
SCHEMA_REGISTRY_URL which should be a combination of the SCRAM username, the SCRAM password and the Schema Registry url in the form of:
https://<SCRAM_username>:<SCRAM_password>@<Schema_Registry_url>
export SCHEMA_REGISTRY_URL=https://${SCRAM_USERNAME}:${SCRAM_PASSWORD}@es-1-ibm-es-schema-external-integration.apps.eda-solutions.gse-ocp.net
Python Application
The Python application we have built to see how to produce and consume messages (either plain messages or Avro encoded messages based on Avro Data Schemas) to and from an IBM Event Streams instance installed through the IBM Cloud Pak for Integration is public at the following GitHub repo: https://github.com/ibm-cloud-architecture/refarch-eda-tools/tree/master/labs/es-cp4i-schema-lab-v10
Clone
In order to use and work with this Python application, the first thing we need to do is to clone the GitHub repository where it is published.
Clone the github repository on your workstation on the location of your choice:
git clone https://github.com/ibm-cloud-architecture/refarch-eda-tools.gitCloning into 'refarch-eda-tools'...remote: Enumerating objects: 185, done.remote: Counting objects: 100% (185/185), done.remote: Compressing objects: 100% (148/148), done.remote: Total 185 (delta 23), reused 176 (delta 16), pack-reused 0Receiving objects: 100% (185/185), 6.17 MiB | 4.61 MiB/s, done.Resolving deltas: 100% (23/23), done.Change directory into
refarch-eda-tools/labs/es-cp4i-schema-lab-v10
to find the assets we will we working from now on for the python demo environment and few other scripts/applications:cd refarch-eda-tools/labs/es-cp4i-schema-lab-v10$ ls -alltotal 240drwxr-xr-x 9 user staff 288 20 May 19:33 .drwxr-xr-x 3 user staff 96 20 May 19:33 ..-rw-r--r-- 1 user staff 112578 20 May 19:33 README.mddrwxr-xr-x 5 user staff 160 20 May 19:33 avro_filesdrwxr-xr-x 6 user staff 192 20 May 19:33 kafka
In the next sections, we are going to briefly explain the implementation of this Python application so that you understand what is being done behind the scenes and more importantly, if you are a developer, how to do so.
Python Avro Producer
In this section we describe the Python scripts we will be using in order to be able to produce avro messages to a Kafka topic.
Produce Message
The python script that we will use to send an avro message to a Kafka topic is ProduceAvroMessage.py where we have the following:
A function to parse the arguments:
def parseArguments():global TOPIC_NAMEprint("The arguments for this script are: " , str(sys.argv))if len(sys.argv) == 2:TOPIC_NAME = sys.argv[1]else:print("[ERROR] - The produceAvroMessage.py script expects one argument: The Kafka topic to publish the message to")exit(1)A function to create the event to be sent:
def createEvent():print('Creating event...')key = {"key": 1}value = {"message" : "This is a test message"}print("DONE")return json.dumps(value), json.dumps(key)The main where we will:
- Parse the arguments
- Get the Avro schemas for the key and value of the event
- Create the Event to be sent
- Print it out for reference
- Create the Kafka Avro Producer and configure it
- Send the event
if __name__ == '__main__':# Get the Kafka topic nameparseArguments()# Get the avro schemas for the message's key and valueevent_value_schema = getDefaultEventValueSchema(DATA_SCHEMAS)event_key_schema = getDefaultEventKeySchema(DATA_SCHEMAS)# Create the eventevent_value, event_key = createEvent()# Print out the event to be sent
As you can see, this python code depends on an Avro Utils for loading the Avro schemas and a Kafka Avro Producer to send the messages. These are explained next.
Avro Utils
This script, called avroEDAUtils.py, contains some very simple utility functions to be able to load Avro schemas from their avsc files in order to be used by the Kafka Avro Producer.
A function to get the key and value Avro schemas for the messages to be sent:
def getDefaultEventValueSchema(schema_files_location):# Get the default event value data schemaknown_schemas = avro.schema.Names()default_event_value_schema = LoadAvsc(schema_files_location + "/default_value.avsc", known_schemas)return default_event_value_schemadef getDefaultEventKeySchema(schema_files_location):# Get the default event key data schemaknown_schemas = avro.schema.Names()(*) Where
known_schemas
is an Avro schema dictionary where all Avro schemas read get stored in order to be able to read nested Avro schemas afterwards. See the python script in detail for examples of this.A function to open a file, read its content as an Avro schema and store it in the Avro schema dictionary:
def LoadAvsc(file_path, names=None):# Load avsc file# file_path: path to schema file# names(optional): avro.schema.Names objectfile_text = open(file_path).read()json_data = json.loads(file_text)schema = avro.schema.SchemaFromJSONData(json_data, names)return schema
Kafka Avro Producer
This script, called KcAvroProducer.py, will actually be the responsible for creating the Kafka Avro Producer, initialize and configure it and provide the publish method:
Initialize and prepare the Kafka Producer
class KafkaProducer:def __init__(self,kafka_brokers = "",scram_username = "",scram_password = "",schema_registry_url = ""):self.kafka_brokers = kafka_brokersself.scram_username = scram_usernameself.scram_password = scram_passwordself.schema_registry_url = schema_registry_urldef prepareProducer(self,groupID = "pythonproducers",key_schema = "", value_schema = ""):Publish method
def publishEvent(self, topicName, value, key):# Produce the Avro message# Important: value DOES NOT come in JSON format from ContainerAvroProducer.py. Therefore, we must convert it to JSON format firstself.producer.produce(topic=topicName,value=json.loads(value),key=json.loads(value)[key], callback=self.delivery_report)# Flushself.producer.flush()
Python Avro Consumer
In this section we describe the python scripts we will be using in order to be able to consume Avro messages from a Kafka topic.
Consume Message
The python script that we will use to consume an Avro message from a Kafka topic is ConsumeAvroMessage.py where we have the following:
A function to parse arguments:
# Parse arguments to get the container ID to poll fordef parseArguments():global TOPIC_NAMEprint("The arguments for the script are: " , str(sys.argv))if len(sys.argv) != 2:print("[ERROR] - The ConsumeAvroMessage.py script expects one arguments: The Kafka topic to events from.")exit(1)TOPIC_NAME = sys.argv[1]The main where we will:
- Parse the arguments to get the topic to read from
- Create the Kafka Consumer and configure it
- Poll for next avro message
- Close the Kafka consumer
if __name__ == '__main__':# Parse argumentsparseArguments()# Create the Kafka Avro consumerkafka_consumer = KafkaConsumer(KAFKA_BROKERS,SCRAM_USERNAME,SCRAM_PASSWORD,TOPIC_NAME,SCHEMA_REGISTRY_URL)# Prepare the consumerkafka_consumer.prepareConsumer()# Consume next Avro eventkafka_consumer.pollNextEvent()
As you can see, this python code depends on a Kafka Avro Consumer to consume messages. This is explained next.
Kafka Avro Consumer
This script, called KcAvroConsumer.py, will actually be the responsible for creating the Kafka Avro Consumer, initialize and configure it and provide the poll next event method:
Initialize and prepare the new Kafka consumer:
class KafkaConsumer:def __init__(self, kafka_brokers = "", scram_username = "",scram_password = "", topic_name = "", schema_registry_url = "", autocommit = True):self.kafka_brokers = kafka_brokersself.scram_username = scram_usernameself.scram_password = scram_passwordself.topic_name = topic_nameself.schema_registry_url = schema_registry_urlself.kafka_auto_commit = autocommitPoll next event method:
# Prints out the messagedef traceResponse(self, msg):print('[Message] - Next message consumed from {} partition: [{}] at offset {} with key {} and value {}'.format(msg.topic(), msg.partition(), msg.offset(), msg.key(), msg.value() ))# Polls for next eventdef pollNextEvent(self):# Poll for messagesmsg = self.consumer.poll(timeout=10.0)
Schemas and Messages
In this section we are going to see how Schema Registry works when you have an application that produces and consumes messages based on Avro data schemas. The application we are going to use for this is the python application presented above in the Python Avro Producer and Python Avro Consumer sections.
IMPORTANT: Before start using our Python application we must set the PYTHONPATH environment variable to point to where we have all the Python scripts that make up our application in order for Python to find these at execution time.
- Set the PYTHONPATH variable to the location where you cloned the GitHub repository containing the Python application we are going to be working with
export PYTHONPATH=~/refarch-eda-tools/labs/es-cp4i-schema-lab-v10
Produce a message
In order to produce a message, we execute the ProduceAvroMessage.py
. This script, as you could see in the Python Avro Producer section, is sending the event with key {'key': '1'}
and value {'message': 'This is a test message'}
according to the schemas defined in default_key.avsc and default_value.avsc for the key and value of the event respectively.
python3 ProduceAvroMessage.py test-schema-user1@@@ Executing script: ProduceAvroMessage.pyThe arguments for the script are: ['ProduceAvroMessage.py', 'test-schema-user1']Creating event...DONE--- Event to be published: ---{"key": 1}{"message": "This is a test message"}
We can see our new message delivered in the test-schema-user1
topic by
Go into the topics page in the IBM Event Streams UI
Click on the topic and then on the Messages tab at the top. Finally, click on a message to see it displayed on the right hand side of the screen
IMPORTANT: As you can see, we got the test-schema-user1
topic auto-created when we produced the message. The reason for this is that
- Kafka is set out of the box to let applications to auto-create topics.
- We created the SCRAM credentials for our application to allow the application to create topics.
On a production-like environment, you don’t want developers creating applications that auto-create topics in your IBM Event Streams instance without any control. For that, we would configure Kafka to forbid topic auto-creation (https://kafka.apache.org/documentation/#auto.create.topics.enable) as well as thoroughly created the SCRAM credentials with the most strict but appropriate permissions that our application needs.
IMPORTANT: Similar to the auto-creation of topics, we can see below that our application got the Avro data schemas for both the key and value of the message produced auto-registered. This is because many client libraries come with a SerDes property to allow them to auto-register the Avro data schemas (https://docs.confluent.io/current/clients/confluent-kafka-python/#avroserializer). However, on a production-like environment we don’t want applications to auto-register schemas without any control but yet we can not leave it to the developers to set the auto-registration property off on their libraries. Instead, we would create the SCRAM credentials with the most strict but appropriate permissions that our application needs.
If we look now at the schemas our schema registry has:
cloudctl es schemasSchema State Latest version Latest version ID UpdateddemoSchema_CLI_USER1 active 2.0.0 2 Fri, 24 Jul 2020 14:09:37 UTCdemoSchema_UI_USER1 active 2.0.0 2 Fri, 24 Jul 2020 14:06:27 UTCtest-schema-user1-key-d89uk active 1 1 Fri, 24 Jul 2020 15:41:46 UTCtest-schema-user1-value-tv5efr active 1 1 Fri, 24 Jul 2020 15:41:45 UTCOK
we see two schemas, test-schema-user1-key-d89uk
and test-schema-user1-value-tv5efr
, which in fact correspond to the Avro data schema used for the key
(default_key.avsc) and the value
(default_value.avsc) of events sent to the test-schema-user1
topic in the ProduceAvroMessage.py as explained before sending the message.
To make sure of what we are saying, we can inspect those schemas:
cloudctl es schema test-schema-user1-key-d89uk --version 1{"type": "record","name": "defaultKey","namespace": "ibm.eda.default","fields": [{"type": "int",
cloudctl es schema test-schema-user1-value-tv5efr --version 1{"type": "record","name": "defaultValue","namespace": "ibm.eda.default","fields": [{"type": "string",
If I now decided that my events should contain another attribute, I would modify the event value schema (default_value.avsc) to reflect that as well as ProduceAvroMessage.py
to send that new attribute in the event it sends:
python3 ProduceAvroMessage.py test-schema-user1@@@ Executing script: ProduceAvroMessage.pyThe arguments for the script are: ['ProduceAvroMessage.py', 'test-schema-user1']Creating event...DONE--- Event to be published: ---{"key": 1}{"message": "This is a test message", "anotherAttribute": "Just another test string"}
I can see that an event with a new attribute has been sent:
And I can also see that the new shcema has got registered as well:
cloudctl es schemasSchema State Latest version Latest version ID UpdateddemoSchema_CLI_USER1 active 2.0.0 2 Fri, 24 Jul 2020 14:09:37 UTCdemoSchema_UI_USER1 active 2.0.0 2 Fri, 24 Jul 2020 14:06:27 UTCtest-schema-user1-key-d89uk active 1 1 Fri, 24 Jul 2020 15:41:46 UTCtest-schema-user1-value-a5bbaa active 1 1 Fri, 24 Jul 2020 15:54:37 UTCtest-schema-user1-value-tv5efr active 1 1 Fri, 24 Jul 2020 15:41:45 UTCOK
If I inspect that new schema, I see my new attribute in it:
cloudctl es schema test-schema-user1-value-a5bbaa --version 1{"type": "record","name": "defaultValue","namespace": "ibm.eda.default","fields": [{"type": "string",
Create a non-compliant message
Let’s see what happens if we send a message that does not comply with its Avro data schema. Let’s say that I send the following message:
key = {"key": 1}value = {"message" : 12345}
and this is the output of that attempt:
python3 ProduceAvroMessage.py test-schema-user1@@@ Executing script: ProduceAvroMessage.pyThe arguments for the script are: ['ProduceAvroMessage.py', 'test-schema-user1']Creating event...DONE--- Event to be published: ---{"key": 1}{"message": 12345}
As we can see, the attempt failed as the Avro producer will check the message against the Avro data schema defined for the topic we want to send the message to and yield that this message does not comply (the message value attribute we are sending is an integer rather than a string and we are missing the second attribute).
Therefore, using Avro schemas with IBM Event Streams give us the ability to build our system with robustness protecting downstream data consumers from malformed data, as only valid data will be permitted in the topic.
Consume a message
In order to consume a message, we execute the ConsumeAvroMessage.py
within the /tmp/lab/src
folder in our python demo environment:
python3 ConsumeAvroMessage.py test-schema-user1@@@ Executing script: ConsumeAvroMessage.pyThe arguments for this script are: ['ConsumeAvroMessage.py', 'test-schema-user1']--- This is the configuration for the Avro consumer: ---{'bootstrap.servers': 'es-1-kafka-bootstrap-integration.apps.eda-solutions.gse-ocp.net:443', 'group.id': 'pythonconsumers', 'auto.offset.reset': 'earliest', 'schema.registry.url': 'https://my-user1:*****@es-1-ibm-es-schema-external-integration.apps.eda-solutions.gse-ocp.net', 'enable.auto.commit': True, 'security.protocol': 'SASL_SSL', 'sasl.mechanisms': 'SCRAM-SHA-512', 'sasl.username': 'my-user1', 'sasl.password': '*****', 'ssl.ca.location': '/home/ALMARAZJ/es-cert.pem', 'schema.registry.ssl.ca.location': '/home/ALMARAZJ/es-cert.pem'}---------------------------------------------------[Message] - Next message consumed from test-schema-user1 partition: [0] at offset 0 with key {'key': 1} and value {'message': 'This is a test message'}
As you can see, our script was able to read the Avro messages from the test-schema-user1
topic and map that back to their original structure thanks to the Avro schemas:
[Message] - Next message consumed from test-schema partition: [0] at offset 0 with key {'key': 1} and value {'message': 'This is a test message'}[Message] - Next message consumed from test-schema partition: [0] at offset 1 with key {'key': 1} and value {'message': 'This is a test message', 'anotherAttribute': 'Just another test string'}
Data Evolution
So far we have more or less seen what Avro is, what an Avro data schema is, what a schema registry is and how this all works together. From creating an Avro data schema for your messages/events to comply with to how the schema registry and Avro data schemas work together. And we have also seen the code for doing all this, from the python code to send and receive Avro encoded messages based on their Avro data schemas to the rich CLI IBM Event Streams provides to interact with.
However, we have said little about the need for data to evolve. When you design an Event Driven architecture for your application (by applying Event Storming or Domain Driven Design for example), it is very hard to come up with data structures/schemas that will not need to evolve/change in time. That is, your data, like your use or business cases, may need to evolve. As a result, Avro data schemas must be somehow flexible to allow your data to evolve along with your application and use cases.
But it is not as easy as adding or removing data that travels in your events/messages or modifying the type of such data. And one of the reasons for this is that Kafka (or any other type of event backbone) is many times used as the source of truth. That is, a place that you can trust as to what has happened. Hence, Kafka will serve as the source of truth where all the events (that is, data) that happened (which could be bank transactions, communications, etc) will get stored (sometimes up to hundreds of years) and will be able to be replayed if needed. As a result, there must be a data schema management and data schema evolution put in place that allow the compatibility of old and new data schemas and, in fact, old and new data at the end of the day.
The IBM Event Streams Schema Registry enforces full compatibility when creating a new version of a schema. Full compatibility means that old data can be read with the new data schema, and new data can also be read with the last data schema.
In data formats like Avro, you can define fields with default values. In that case adding or removing a field with a default value is a fully compatible change. Let’s see then how this affects when you want your data to evolve in a way that it needs to add a new attribute or delete an existing attribute.
But let’s see what that means in terms of adding and removing attributes from your data schema.
Adding a new attribute
Although we have already seen this in the adding a new version of a schema section, let’s try to add a new version of our test-schema-value
schema where we have a new attribute. Remember, our default_schema.avsc
already contains a new attribute than the original one but that it got registered as a new schema rather than as a new version of the original one. Let’s reuse that Avro schema file to register it as a new version (INFO: you might need to copy/download that file to your local workstation in order to be able to then upload it to the IBM Event Streams through its UI)
When doing so from the UI, we see the following error:
The reason, as alread explained in the add a new version of a schema section, is because full compatibility dictates that you can only add new attributes to a schema if these have a default value. Reason being that a receiver should be able to deserialize messages produced with an older schema using the newer schema. Because old messages were written with an older schema that did not contain our new attribute, those messages won’t have that attribute so we need to provide a default value for it in our never version of the schema so that the receiver is able to deserialize those older messages with the newer schema.
If we add the default value for the new attribute, we see that our newer version is now compatible:
and that it gets registered fine:
Removing an existing attribute
What if we now wanted to remove the original message
attribute from our schema. Let’s remove it from the default_value.avsc
file and try to register that new version:
We, again, get the same error. And the reason is because receivers must be able to read and deserialize messages produced with the newer schema (that is, without the message
attribute) but with the older schema (that is, with the schema version that enforces the existence of the message
attribute).
In order to work this around, what we need to do is to register first an intermediate schema that defines a default value for the message
attribute:
Once we have a default value for the message
attribute, we can register a new version of the schema that finally removes that attribute:
Security
As we have already mentioned during the this tutorial, we need to pay attention to the permissions we give to users, groups, applications (and thefore the clients they used to interact with IBM Event Streams), etc since we don’t want everyone and everything to be, for instance, creating or deleting topics, schemas, etc.
You can secure your IBM Event Streams resources in a fine-grained manner by managing the access each user and application has to each resource. Within IBM Event Streams, you can secure access to the following resource types, where the names in parentheses are the resource type names used in Access Control List (ACL) rules:
- Topics (topic): you can control the ability of users and applications to create, delete, read, and write to a topic.
- Consumer groups (group): you can control an application’s ability to join a consumer group.
- Transactional IDs (transactionalId): you can control the ability to use the transaction capability in Kafka.
Note: Schemas in the Event Streams Schema Registry are a special case and are secured using the resource type of topic combined with a prefix of _schema. You can control the ability of users and applications to create, delete, read, and update schemas.
You can find more information about how to secure your IBM Event Streams resources in the official documentation at: https://ibm.github.io/event-streams/security/managing-access/