Streaming PostgreSQL Updates to Kafka with Debezium
This guide will help you get up and running with Kafka Connect to stream PostgreSQL database changes to a Kafka topic. It will guide you through the installation and configuration of Kafka, Kafka Connect, Debezium & PostgreSQL.
Note: This guide has only been tested using Docker Desktop for Mac. Results may vary using other Kubernetes cluster types.
This guide assumes that you have the following:
- Access to a Kubernetes Cluster
- Helm (w/ Tiller) installed on the Kubernetes Cluster
- Helm Incubator repository enabled
- cURL, Postman or another HTTP client
If you do not meet the prerequisites, please see the following links:
- Getting Started with Kubernetes with Docker on Mac
- Install Helm
- Enable Helm Incubator repository
- Install Postman
Initial Setup
To keep this tutorial isolated from other application running in your Kubernetes cluster and to cleanup easier, we will create a separate namespace for the new resources.
$ kubectl create namespace kafka-connect-tutorial
(Optional) You may configure your Kubernetes context's default namespace to kafka-connect-tutorial
using the following command:
$ kubectl config set-context --current --namespace kafka-connect-tutorial
Install Kafka & Zookeeper
This section will guide you through the installation of Kafka & Zookeeper. You will also deploy a Kafka client pod to interact with the Kafka Cluster, as well as configure 3 Kafka Topics that will be used by Kafka Connect.
-
Install Kafka & Zookeeper to your namespace using the Incubator Helm Chart.
$ helm install --name kafka --namespace kafka-connect-tutorial incubator/kafka --set external.enabled=true
-
Deploy a Kafka Connect client container to your cluster by creating a file in your workspace named
kafka-client-deploy.yaml
with the following contents:# kafka-client-deploy.yaml apiVersion: v1 kind: Pod metadata: name: kafka-client spec: containers: - name: kafka-client image: confluentinc/cp-kafka:5.0.1 command: - sh - -c - "exec tail -f /dev/null"
-
Execute the following command to deploy the Kafka Client Pod:
$ kubectl create -f kafka-client-deploy.yaml -n kafka-connect-tutorial
-
Create the Kafka Connect Topics using the following commands:
$ kubectl -n kafka-connect-tutorial exec kafka-client -- kafka-topics --zookeeper kafka-zookeeper:2181 --topic connect-offsets --create --partitions 1 --replication-factor 1
$ kubectl -n kafka-connect-tutorial exec kafka-client -- kafka-topics --zookeeper kafka-zookeeper:2181 --topic connect-configs --create --partitions 1 --replication-factor 1
$ kubectl -n kafka-connect-tutorial exec kafka-client -- kafka-topics --zookeeper kafka-zookeeper:2181 --topic connect-status --create --partitions 1 --replication-factor 1
Install Kafka Connect
This section will guide you through the installation of Kafka Connect using the Debezium Kafka Connect Docker Image. As part of this installation, you will create a NodePort service to expose the Kafka Connect API. This service will be available on port 30500 of your cluster nodes. If you are using Docker Desktop, this will be http://localhost:30500.
- Create a file named
kafka-connect-deploy.yaml
in your workspace and add the following contents:# kafka-connect-deploy.yaml apiVersion: apps/v1 kind: Deployment metadata: name: kafkaconnect-deploy labels: app: kafkaconnect spec: replicas: 1 selector: matchLabels: app: kafkaconnect template: metadata: labels: app: kafkaconnect spec: containers: - name: kafkaconnect-container image: debezium/connect:0.10.0.CR1 readinessProbe: httpGet: path: / port: 8083 livenessProbe: httpGet: path: / port: 8083 env: - name: BOOTSTRAP_SERVERS value: kafka:9092 - name: GROUP_ID value: "1" - name: OFFSET_STORAGE_TOPIC value: connect-offsets - name: CONFIG_STORAGE_TOPIC value: connect-configs - name: STATUS_STORAGE_TOPIC value: connect-status ports: - containerPort: 8083 --- apiVersion: v1 kind: Service metadata: name: kafkaconnect-service labels: app: kafkaconnect-service spec: type: NodePort ports: - name: kafkaconnect protocol: TCP port: 8083 nodePort: 30500 selector: app: kafkaconnect
- Deploy Kafka Connect with the following command:
$ kubectl create -f kafka-connect-deploy.yaml --namespace kafka-connect-tutorial
Install PostgreSQL
This section will guide you through the installation of PostgreSQL using the Stable Helm Chart. You will also add some additional configuration for PostgreSQL necessary for Debezium to read the PostgreSQL transaction log. PostgreSQL will be available on port 30600 of your cluster nodes. If you are using Docker Desktop, this will be http://localhost:30600.
-
Create a PostgreSQL configuration necessary for Debezium. Create a file in your workspace named
extended.conf
with the following contents:# extended.conf wal_level = logical max_wal_senders = 1 max_replication_slots = 1
-
Create a ConfigMap from the
extended.conf
file with the following command:$ kubectl create configmap --namespace kafka-connect-tutorial --from-file=extended.conf postgresql-config
-
Install PostgreSQL using the Stable Helm Chart with the following command:
$ helm install --name postgres --namespace kafka-connect-tutorial --set extendedConfConfigMap=postgresql-config --set service.type=NodePort --set service.nodePort=30600 --set postgresqlPassword=passw0rd stable/postgresql
Add Sample Data to PostgreSQL
-
Open a shell in the Postgres container.
$ kubectl exec --namespace kafka-connect-tutorial -it postgres-postgresql-0 -- /bin/sh
-
Login to Postgres with the following command, entering the password
passw0rd
when prompted.$ psql --user postgres
-
Create a table named
containers
.CREATE TABLE containers(containerid VARCHAR(30) NOT NULL,type VARCHAR(20),status VARCHAR(20),brand VARCHAR(50),capacity DECIMAL,CREATIONDATE TIMESTAMP DEFAULT CURRENT_TIMESTAMP,UPDATEDATE TIMESTAMP DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (containerid));
-
Insert data into the table.
INSERT INTO containers (containerid, type, status, brand, capacity) VALUES ('C01','Reefer','Operational','containerbrand',20), ('C02','Dry','Operational','containerbrand',20), ('C03','Dry','Operational','containerbrand',40), ('C04','FlatRack','Operational','containerbrand',40), ('C05','OpenTop','Operational','containerbrand',40), ('C06','OpenSide','Operational','containerbrand',40), ('C07','Tunnel','Operational','containerbrand',40), ('C08','Tank','Operational','containerbrand',40), ('C09','Thermal','Operational','containerbrand',20);
Configure the Debezium PostgreSQL connector
This section will show you how to configure the Debezium PostgreSQL connector.
- Using your HTTP client (cURL shown), make the following request to the Kafka Connect API. This will configure a new Debezium PostgreSQL connector. This connector monitors the
pgoutput
stream for operations on thecontainers
table.
Note: If you are not using Docker Desktop, please set localhost
to the hostname/IP of one of your cluster nodes.
Note: If you did not follow the Add Sample Data to PostgreSQL section, replace "public.containers"
with the name of your table.
curl -X POST \ http://localhost:30500/connectors \ -H 'Content-Type: application/json' \ -d '{ "name": "containers-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "postgres-postgresql", "database.port": "5432", "database.user": "postgres", "database.password": "passw0rd", "database.dbname": "postgres", "database.server.name": "postgres", "table.whitelist": "public.containers" } }'
-
List the Kafka Topics, showing your newly created topic
$ kubectl -n kafka-connect-tutorial exec kafka-client -- kafka-topics --zookeeper kafka-zookeeper:2181 --list
-
Tail the Kafka
postgres.public.containers
topic to show database transactions being written to the topic from Kafka Connect.
Note: Change postgres.public.containers
if you are not using the sample database data
$ kubectl -n kafka-connect-tutorial exec kafka-client -- kafka-console-consumer --topic postgres.public.containers --from-beginning --bootstrap-server kafka:9092
You may continue to make Create, Update and Delete transactions to the containers
table, these changes will appear as messages in the Kafka topic.
Cleanup
This section will help you remove all of the resources created during this tutorial.
-
Delete the Kafka Helm Release
$ helm delete kafka --purge
-
Delete the PostgreSQL Helm Release
$ helm delete postgres --purge
-
Delete the
kafka-connect-tutorial
namespace$ kubectl delete namespace kafka-connect-tutorial