Vaccine Cold Chain Monitoring
Overview
The reefer monitoring agent gets telemetry events from the telemetries
Kafka topic and processes them using stateful operation on time window, and then creates reefer-cold-chain-violation event to the reefer
Kafka topic in case the temperature goes over a define threshold over a specific time period. For each received metrics it can, optionally, call an anomaly detection service to compute the risk of failure.
In case of cold chain violation the impacted vaccine lots needs to be reported as spoiled via new records logged to the blockchain hyperledger. This is the responsability of the reefer manager microservice has it has visibility of the loaded vaccine lots for each container. Telemetry events have information on the sensors and the geolocation of the reefer.
Github repository: vaccine-monitoring-agent
Kafka topics consumed from: telemetries as defined by:
apiVersion: kafka.strimzi.io/v1beta1kind: KafkaTopicmetadata:name: telemetrieslabels:strimzi.io/cluster: event-streamsspec:partitions: 10replicas: 3
Kafka topics produced to: reefers in case of anomaly detection or cold chain violation
Events reacted to: telemetry events like:
public class TelemetryEvent {public String containerID;public Telemetry payload;public String timestamp;public String type;
and the payload:
public String container_id;public String measurement_time;public String product_id;public double temperature;public double target_temperature;public double ambiant_temperature;public double kilowatts;public double time_door_open;public int content_type;
Events produced: reefer anomaly detected and reefer cold chain violated
Code structure
The API is supported by the ContainerResource class which exposes interactive query on the container id.
The core of the process is a Kafka Streams topology in the class TelemetryAssessor.
The topology processes telemetry records and build a new streams with the containerID as key, and the telemetry payload as value. Then it builds a Ktable to keep aggregate per container. The aggregate is defined in this ReeferAggregate class and aims to keep max Temperature read so far, the number of time the maximum temperature is violated. Finally when a container reaches the maximum number of temperature violation, a new message is sent to a ‘reefer’ topic for down stream processing.
If you need to learn more on Kafka streams read this introduction and do those labs to learn more on how to program with Kafka Streams.
The nice capability of Quarkus app, is most of the work is in the application.properties configuration. The highlights of this configuration is the fact that once deploy to OpenShift the environment variables are defined in config map and secret:
quarkus.openshift.env.configmaps=agent-cmquarkus.openshift.env.secrets=agent-secrets
The SSL certificate for the server and the user are coming from secrets:
quarkus.openshift.env.vars.KAFKA_CERT_PATH=/deployments/certs/server/ca.p12quarkus.openshift.env.mapping.KAFKA_CERT_PWD.from-secret=kafka-cluster-ca-certquarkus.openshift.env.mapping.KAFKA_CERT_PWD.with-key=ca.passwordquarkus.openshift.mounts.es-cert.path=/deployments/certs/serverquarkus.openshift.secret-volumes.es-cert.secret-name=kafka-cluster-ca-cert# TLS userquarkus.openshift.env.mapping.USER_CERT_PWD.from-secret=${KAFKA_USER}quarkus.openshift.env.mapping.USER_CERT_PWD.with-key=user.passwordquarkus.openshift.env.vars.USER_CERT_PATH=/deployments/certs/user/user.p12
The outgoing message sent to Kafka reefer
topic is done via microprofile reactive messaging configuration and plugin:
mp.messaging.outgoing.reefers.connector=smallrye-kafkamp.messaging.outgoing.reefers.topic=${REEFER_TOPIC:vaccine-reefers}mp.messaging.outgoing.reefers.key.serializer=org.apache.kafka.common.serialization.StringSerializermp.messaging.outgoing.reefers.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
Then the last part if for Kafka Streams.
Build
As a quarkus application it is possible to run locally with ./mvnw quarkus:dev
. It is important to configure a .env
file with the needed environment variables to remote connect to Kafka Cluster using a SCRAM user:
export KAFKA_USER=app-scramexport KAFKA_PASSWORD=<>export KAFKA_BOOTSTRAP_SERVERS=eda-dev-kafka-bootstrap-eventstreams.<....>.cloud:443export KAFKA_SSL_TRUSTSTORE_LOCATION=${PWD}/certs/truststore.p12export KAFKA_SSL_TRUSTSTORE_PASSWORD=<>export TELEMETRY_TOPIC=coldchain-telemetriesexport REEFER_TOPIC=coldchain-reefersexport PREDICTION_ENABLED=falseexport EDA_LOGGING_LEVEL=INFO
See the repository readme to build and run it locally or to deploy to openshift.
Also the cold chain monitoring use case presents how to deploy on openshift, but it uses the Kubernetes config and source to image capability so one command will build and deploy to OpenShift:
./mvnw clean package -Dquarkus.kubernetes.deploy=true -DskipTests
Usage details
The demonstration script for this component is described in the cold chain monitoring use case.