Skip to content

Lab 2 solution

The output inventory class

The class needs to keep store name and a map of items and current inventory. The class is StoreInventory

public class StoreInventory  {

    public String storeName;
    // map <item_id,quantity>
    public HashMap<String,Long> stock = new HashMap<String,Long>();

This class is used to get to the out topic but inside the Store and aggregate via the update method:

public StoreInventory updateStockQuantity(String key, ItemTransaction newValue) {
    this.storeName = key;
    if (newValue.type != null && ItemTransaction.SALE.equals(newValue.type))
        newValue.quantity=-newValue.quantity;
    return this.updateStock(newValue.sku,newValue.quantity);
}

public StoreInventory updateStock(String sku, long newV) {
    if (stock.get(sku) == null) {
        stock.put(sku, Long.valueOf(newV));
    } else {
        Long currentValue = stock.get(sku);
        stock.put(sku, Long.valueOf(newV) + currentValue );
    }
    return this;
}

Developing the Topology in test class

Continuing test with the TopolofyTestDriver, you will implement the topology with the same structure as before. Here what the topology needs to do:

  • Get ItemTransaction from input stream the Key being the storeName
  • Aggregation wwork on keyed group, so groupByKey the input records
  • Aggregate using the update method.

The stream topology looks like:

    KStream<String,ItemTransaction> items = builder.stream(inTopicName, 
            Consumed.with(Serdes.String(),  
            StoreSerdes.ItemTransactionSerde()));  
    // 2 processing   
    // process items and aggregate at the store level 
    KTable<String,StoreInventory> storeItemInventory = items
        // use store name as key, which is what the item event is also using
        .groupByKey()
        // update the current stock for this <store,item> pair
        // change the value type
        .aggregate(
            () ->  new StoreInventory(), // initializer when there was no store in the table
            (store , newItem, existingStoreInventory) 
                -> existingStoreInventory.updateStockQuantity(store,newItem), 
                Materialized.<String, 
                        StoreInventory, 
                        KeyValueStore<Bytes, byte[]>>as(STORE_INVENTORY_KAFKA_STORE_NAME)
                        .withKeySerde(Serdes.String())
                        .withValueSerde( StoreSerdes.StoreInventorySerde())
                );   
    // Generate to output topic
    storeItemInventory.toStream().to(outTopicName,
            Produced.with(Serdes.String(), StoreSerdes.StoreInventorySerde()));

The full application code analysis

In fact the topology creation is defined in a business service. The microservice application is using the Liberty runtime and API and the code organization uses the onion architecture introduced in the Domain-driven design:

* `domain` contains the business logic and business entities related to item transaction and store inventory.
* `infra` is for infrastructure code, containing JAXRS class, event processing, and ser-des.
                ├── app
                │   └── StoreAggregatorApplication.java
                ├── domain
                │   ├── ItemTransaction.java
                │   ├── StoreInventory.java
                │   └── StoreInventoryAggregator.java
                └── infra
                    ├── api
                    │   ├── StoreInventoryQueries.java
                    │   ├── StoreInventoryResource.java
                    │   ├── VersionResource.java
                    │   └── dto
                    │       ├── InventoryQueryResult.java
                    │       ├── ItemCountQueryResult.java
                    │       └── PipelineMetadata.java
                    └── events
                        ├── ItemProcessingAgent.java
                        ├── JSONSerde.java
                        ├── KafkaConfig.java
                        ├── KafkaPropertiesUtil.java
                        └── StoreSerdes.java

The topology is in the Domain layer in the StoreInventoryAggregator classhttps://github.com/ibm-cloud-architecture/eda-tech-academy/blob/main/lab2/refarch-eda-store-inventory/src/main/java/ibm/gse/eda/stores/domain/StoreInventoryAggregator.java.

The Topology is started in a thread in the ItemProcessingAgent class when the application starts, by looking at the StartupEvent

    void onStart(@Observes StartupEvent ev){
        this.kafkaStreams = initializeKafkaStreams();
        logger.info("ItemProcessingAgent started");
     }