Skip to main contentIBM Automation Event-Driven Solution - Sharing knowledge

Kafka Streams Test Lab 1

Updated 03/10/2022

Overview

  • In this lab scenario we are still using Apache Kafka Streams TestDriver to test a Topology, a Stream and Table.
  • While using the TestDriver we will perform operations such as groupBy, join with another Stream or Kafka Table.

*The code for this lab is in this repository eda-kstreams-labs folder LoadKtableFromTopic

Scenario Prerequisites

Java

  • For the purposes of this lab we suggest Java 11+

Maven

  • Maven will be needed for bootstrapping our application from the command-line and running our application.

An IDE of your choice

  • Ideally an IDE that supports Quarkus (such as Visual Studio Code)

Setting up the Quarkus Application

  • We will bootstrap the Quarkus application with the following Maven command
quarkus create LoadKtableFromTopic

You can replace the fields within {} as you like.

  • Since we will be using the Kafka Streams testing functionality we will need to edit the pom.xml to add the dependency to our project. Open pom.xml and add the following.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>3.1.0</version>

Creating your Test Class

  • Open the TestLoadKtableFromTopic.java file and paste the following content.
package ibm.eda.kstreams.lab1;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
  • The above code uses TopologyTestDriver to mimic a Topology. A Topology is basically a graph of stream processors (nodes) and the edges between these nodes are the streams. In the first section we instantiate our TopologyTestDriver named testDriver, as well as the topic name and store name.

  • Test the application by running the following:

./mvnw clean verify
  • You should see the tests pass with the following output:
[INFO] -------------------------------------------------------
[INFO] T E S T S
[INFO] -------------------------------------------------------
[INFO] Running com.ibm.GreetingResourceTest
2021-01-16 14:20:26,488 INFO [io.quarkus] (main) Quarkus 1.10.5.Final on JVM started in 2.089s. Listening on: http://localhost:8081
2021-01-16 14:20:26,490 INFO [io.quarkus] (main) Profile test activated.
2021-01-16 14:20:26,490 INFO [io.quarkus] (main) Installed features: [cdi, kafka-streams, resteasy, resteasy-jsonb]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 6.096 s - in com.ibm.GreetingResourceTest
[INFO] Running com.ibm.garage.cpat.lab.TestLoadKtableFromTopic
  • How this test topology creation flow works:

    • A StreamsBuilder object (builder) from the Kafka Streams DSL API is created.
    • A KeyValueBytesStoreSupplier (storeSupplier) is configured with String variable (storeName).
    • A KTable is created reading from the topic (companySectorsTopic), deserialized and materialized as the previously create (storeSupplier).
    • A TopologyTestDriver (testDriver) is built from the provided config properties and the KTable within the builder topology.
    • Lastly test input topic (inTopic) is created from the testDriver topology.
    • When inTopic.pipeInput("C01","Health Care"); is invoked, it populates the topic, which then populates the KTable which ultimately persists in a KeyValue State Store.
  • You should see the tests pass. These are three simple tests. The first of which checks that the value fetched from the Kafka Table is not null,the second makes sure that value retrieved from key C02 is equal to Finance and lastly we make sure that the our state store (which was piped by ways of the Kafka Topic) indeed has six key-value pairs.

More Robust Kafka Streams Testing

  • add jsonb Serdes using Quarkus kafka client library:

    quarkus ext add kafka-client jsonb
  • Now that we have tested some simple functionality by using the Kafka Streams API let’s check out some other operators that we can use.

  • Let’s create a new class for our Plain Old Java Object (POJO) named FinancialMessage and copy and paste the following content into the newly created file.

package ibm.eda.kstreams.lab.domain;
import io.quarkus.runtime.annotations.RegisterForReflection;
@RegisterForReflection
public class FinancialMessage implements JSONSerdeCompatible {
public String userId;
public String stockSymbol;

Note: We have not provided any accessors (getters) or mutators (setters) for simplicity. You can set those at your own discretion.

Add the following interface for JSon Serde

/**
* An interface for registering types that can be de/serialized with {@link JSONSerde}.
*/
@SuppressWarnings("DefaultAnnotationParam") // being explicit for the example
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "_t")
@JsonSubTypes({
@JsonSubTypes.Type(value = FinancialMessage.class, name = "fm")
})
public interface JSONSerdeCompatible {
  • Now that we have our new Java class, let’s create a new and separate Java Test class: src/test/java/ibm/eda/kstreams/lab1/TestFinancialMessage.java.

Copy the contents below:

package ibm.eda.kstreams.lab1;
import java.util.Properties;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
  • We have the setup for the TestTopology. Now, we can add a test that will insert two events into the topic. Add the following code to your test class:
@Test
public void shouldHaveOneTransaction() {
// A FinancialMessage is mocked and set to the input topic. Within the Topology,
// this gets sent to the outTopic because a userId exists for the incoming message.
FinancialMessage mock = new FinancialMessage(
"1", "MET", "SWISS", 12, 1822.38, 21868.55, 94, 7, true
);
FinancialMessage mock2 = new FinancialMessage(
  • Test the application by running the following:
./mvnw clean verify
  • You should see the following output:
[INFO] -------------------------------------------------------
[INFO] T E S T S
[INFO] -------------------------------------------------------
[INFO] Running com.ibm.GreetingResourceTest
2021-01-16 17:21:37,836 INFO [io.quarkus] (main) Quarkus 1.10.5.Final on JVM started in 1.996s. Listening on: http://localhost:8081
2021-01-16 17:21:37,837 INFO [io.quarkus] (main) Profile test activated.
2021-01-16 17:21:37,838 INFO [io.quarkus] (main) Installed features: [cdi, kafka-streams, resteasy, resteasy-jsonb]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 5.234 s - in com.ibm.GreetingResourceTest
[INFO] Running com.ibm.garage.cpat.lab.TestFinancialMessage
  • We see that our recently added test failed. And this is expected due to the fact that we inserted two records but our test expects one. To remedy this test we must change Assertions.assertEquals(1, store.approximateNumEntries()); Set to 2 the comparisson.

  • Next let’s add another very simple test. Copy the following code to your Java test class:

@Test
public void testErrorTopicIsNotEmpty() {
FinancialMessage mock = new FinancialMessage(
null, "MET", "SWISS", 12, 1822.38, 21868.55, 94, 7, true
);
inTopic.pipeInput("T03", mock);
Assertions.assertFalse(errorTopic.isEmpty());
  • Test the application by running the following:
./mvnw clean verify
  • You should see the following output:
[INFO] -------------------------------------------------------
[INFO] T E S T S
[INFO] -------------------------------------------------------
[INFO] Running com.ibm.GreetingResourceTest
2021-01-16 17:29:34,258 INFO [io.quarkus] (main) Quarkus 1.10.5.Final on JVM started in 2.470s. Listening on: http://localhost:8081
2021-01-16 17:29:34,260 INFO [io.quarkus] (main) Profile test activated.
2021-01-16 17:29:34,260 INFO [io.quarkus] (main) Installed features: [cdi, kafka-streams, resteasy, resteasy-jsonb]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 5.694 s - in com.ibm.GreetingResourceTest
[INFO] Running com.ibm.garage.cpat.lab.TestFinancialMessage

As you can see here, our message payload is created with null for the userId field which means this message will branch out to the errorTopic. The purpose of the test is to check if our errorTopic is empty, which should not be. Since our errorTopic.isEmpty() resolves to false and our assertion is asserting that it is false as well, thus the test passes.

  • Now that we have two simple tests, let’s update our first branch to allow us to filter the stream on a condition that we want. Let’s edit our branches[1] statement so that it will filter out and retain only the records where the totalCost is greater than 5000.
branches[1].filter(
(key, value) -> (value.totalCost > 5000)
)
.groupBy(
(key, value) -> value.userId
)
.count(
Materialized.as(storeSupplier)
)
  • Test the application by running the following:
./mvnw clean verify
  • You should see the following output:
[INFO] -------------------------------------------------------
[INFO] T E S T S
[INFO] -------------------------------------------------------
[INFO] Running com.ibm.GreetingResourceTest
2021-01-16 17:40:50,765 INFO [io.quarkus] (main) Quarkus 1.10.5.Final on JVM started in 2.102s. Listening on: http://localhost:8081
2021-01-16 17:40:50,766 INFO [io.quarkus] (main) Profile test activated.
2021-01-16 17:40:50,766 INFO [io.quarkus] (main) Installed features: [cdi, kafka-streams, resteasy, resteasy-jsonb]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 5.474 s - in com.ibm.GreetingResourceTest
[INFO] Running com.ibm.garage.cpat.lab.TestFinancialMessage

We see that our first tests is now failing again. And this is expected because we are changing the logic of how branches[1] works to filter out those transactions less than 5000. This makes the second record we send in to get filtered out. In order to fix our test again, we either decrease the assertion for the expected entries in our store back to 1 or we modify the amount of the second transaction to be greater than 5000. Once we do that, if we test the application again, we should get all tests passing.

Next Steps

  • Now that you have finished this initial part of Lab 1 you can optionally proceed to Lab 2