Skip to main contentIBM Automation Event-Driven Reference Architecture

Kafka Security Overview

Review this video for a refresh on SSL and TLS certificates and keep in mind what the speaker quotes:

  • Any message encrypted with Bob’s public key can only be decrypted with Bob’s private key
  • Anyone with access to Alice’s public key can verify that a message could only have been created by someone with access to Alice’s private key.
tls overview

For a deeper dive into security administration see confluent article and product documentation and Rick’s blogs Part 1 and Part 2

To connect to kafka using Kafka API

The important Kafka client application settings are:

  • the security.protocol which should match the listeners configured in the Kafka cluster. The valid values are:
PLAINTEXT (using PLAINTEXT transport layer & no authentication - default value).
SSL (using SSL transport layer & certificate-based authentication)
SASL_PLAINTEXT (using PLAINTEXT transport layer & SASL-based authentication)
SASL_SSL (using SSL transport layer & SASL-based authentication)

Here is an example from our quickstarts of Quarkus properties for a Kafka Java API:

Understand the Kafka cluster listeners

In Event Streams the following yaml defines the Kafka listener to be used for the different channel: On port 9092 it will be a plain connection and no TLS encryption.

listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true

tls boolean is for the traffic encryption, while authentication.type will define the matching security protocol.

9093 is a mutual TLS authentication with TLS encrypted communication, while 9094 is using scram authentication and TLS encrypted communication

Security configuration

  • ssl.truststore.location and ssl.truststore.password: when doing TLS encryption we need to provide our Kafka clients with the location of a trusted Certificate Authority-based certificate. This file is often provided by the Kafka administrator and is generally unique to the specific Kafka cluster deployment. The certificate is in JKS or PKCS12 format for JVM languages and PEM/ P12 for nodejs or Python. With Strimzi or Event Streams the deployment is including a self signed certicate with the name of the kafka cluster-cluster-ca-secret

Importing a certificate into one’s truststore also means trusting all certificates that are signed by that certificate.

To extract a PEM-based certificate from a JKS-based truststore, you can use the following command:

keytool -exportcert -keypass {truststore-password} -keystore {provided-kafka-truststore.jks} -rfc -file {desired-kafka-cert-output.pem}

To build a PKCS12 from a pem do

openssl pkcs12 -export -in cert.pem -out cert.p12
# if you want jks
keytool -importkeystore -srckeystore cert.p12 -srcstoretype pkcs12 -destkeystore cert.jks
PLAIN (cleartext passwords, although they will be encrypted across the wire per security.protocol settings above)
SCRAM-SHA-512 (modern Salted Challenge Response Authentication Mechanism)
GSSAPI (Kerberos-supported authentication and the default if not specified otherwise)
  • for java based app, the sasl.jaas.config strings is one of the following depending of the sasl.mechanism:
sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="{USERNAME}" password="{PASSWORD}";
sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required username="{USERNAME}" password="{PASSWORD}";

For external connection to Strimzi cluster use the following, where USERNAME is a scram-user

bootstrap.servers={kafka-cluster-name}-kafka-bootstrap-{namespace}.{kubernetes-cluster-fully-qualified-domain-name}:443
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{USERNAME}" password="{PASSWORD}";
ssl.truststore.location={/provided/to/you/by/the/kafka/administrator}
ssl.truststore.password={__provided_to_you_by_the_kafka_administrator__}

To get the user password get the user secret:

oc get secret scram-user -o jsonpath='{.data.admin_password}' | base64 --decode && echo ""

To get the Bootstrap URL use:

export K_CLUSTER_NAME=mycluster
export BOOTSTRAP="$(oc get route ${K_CLUSTER_NAME}-kafka-bootstrap -o jsonpath='{.spec.host}'):443"

The sasl.jaas.config can come from an environment variable inside of a secret, but in fact it is already defined in the scram user in Strimzi:

oc get secret my-user -o json | jq -r '.data["sasl.jaas.config"]' | base64 -d -
  • For internal communication, with PLAIN the setting is
bootstrap.servers={kafka-cluster-name}-kafka-bootstrap.{namespace}.svc.cluster.local:9093
security.protocol = SASL_PLAINTEXT (these clients do not require SSL-based encryption as they are local to the cluster)
sasl.mechanism = PLAIN
sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="{USERNAME}" password="{PASSWORD}";
  • For internal authentication with mutual TLS the settings: The certificates are mounted into the pod:
bootstrap.servers={kafka-cluster-name}-kafka-bootstrap.{namespace}.svc.cluster.local:9093
security.protocol=SSL
ssl.truststore.location=/deployments/certs/server/ca.p12
ssl.truststore.password={__provided_to_you_by_kafka_administrator__}
ssl.keystore.location=/deployments/certs/user/user.p12
ssl.keystore.password={__extracted_from_generated_kafka_user_secret_with_key=user.password__}

Remember that if the application does not run in the same namespace as the kafka cluster then copy the secrets with something like

if [[ -z $(oc get secret ${TLS_USER} 2> /dev/null) ]]
then
# As the project is personal to the user, we can keep a generic name for the secret
oc get secret ${TLS_USER} -n ${KAFKA_NS} -o json | jq -r '.metadata.name="tls-user"' | jq -r '.metadata.namespace="'${YOUR_PROJECT_NAME}'"' | oc apply -f -
fi
if [[ -z $(oc get secret ${SCRAM_USER} 2> /dev/null) ]]
then
# As the project is personal to the user, we can keep a generic name for the secret

Kafka Connect

For Kafka connector, you need to define authentication used to connect to the Kafka Cluster:

authentication:
type: tls
certificateAndKey:
secretName: tls-user
certificate: user.crt
key: user.key
  • Get TLS public cluster certificate:
tls:
trustedCertificates:
- secretName: dev-cluster-ca-cert
certificate: ca.crt