diff --git a/spring-kafka/pom.xml b/spring-kafka/pom.xml
index 1382195de6..2db62044b2 100644
--- a/spring-kafka/pom.xml
+++ b/spring-kafka/pom.xml
@@ -40,6 +40,11 @@
${testcontainers-kafka.version}
test
+
+ commons-collections
+ commons-collections
+ 3.2.1
+
diff --git a/spring-kafka/src/main/java/com/baeldung/monitoring/LagAnalyzerApplication.java b/spring-kafka/src/main/java/com/baeldung/monitoring/LagAnalyzerApplication.java
new file mode 100644
index 0000000000..9275151d50
--- /dev/null
+++ b/spring-kafka/src/main/java/com/baeldung/monitoring/LagAnalyzerApplication.java
@@ -0,0 +1,15 @@
+package com.baeldung.monitoring;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+@SpringBootApplication
+@EnableScheduling
+public class LagAnalyzerApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(LagAnalyzerApplication.class, args);
+ while (true) ;
+ }
+}
diff --git a/spring-kafka/src/main/java/com/baeldung/monitoring/Makefile b/spring-kafka/src/main/java/com/baeldung/monitoring/Makefile
new file mode 100644
index 0000000000..f85334057d
--- /dev/null
+++ b/spring-kafka/src/main/java/com/baeldung/monitoring/Makefile
@@ -0,0 +1,10 @@
+build:
+ mvn clean install -B -U
+start-kafka:
+ docker-compose up -d
+check-kafka:
+ nc -z localhost 2181
+ nc -z localhost 9092
+ docker-compose logs kafka | grep -i 'started'
+stop-kafka:
+ docker-compose down --remove-orphans
diff --git a/spring-kafka/src/main/java/com/baeldung/monitoring/README.md b/spring-kafka/src/main/java/com/baeldung/monitoring/README.md
new file mode 100644
index 0000000000..1b3f638ae2
--- /dev/null
+++ b/spring-kafka/src/main/java/com/baeldung/monitoring/README.md
@@ -0,0 +1,16 @@
+## Monitoring Consumer Lag
+
+## Spin Up Local Kafka Container
+```
+$ make start-kafka
+```
+
+## Verify that Kafka is Up
+```
+$ make check-kafka
+```
+
+## Stop Local Kafka Container
+```
+$ make stop-kafka
+```
diff --git a/spring-kafka/src/main/java/com/baeldung/monitoring/docker-compose.yml b/spring-kafka/src/main/java/com/baeldung/monitoring/docker-compose.yml
new file mode 100644
index 0000000000..507b8e2f3c
--- /dev/null
+++ b/spring-kafka/src/main/java/com/baeldung/monitoring/docker-compose.yml
@@ -0,0 +1,23 @@
+version: '2'
+services:
+ zookeeper:
+ image: confluentinc/cp-zookeeper:latest
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+ ports:
+ - 22181:2181
+
+ kafka:
+ image: confluentinc/cp-kafka:latest
+ depends_on:
+ - zookeeper
+ ports:
+ - 9092:9092
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+ KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
\ No newline at end of file
diff --git a/spring-kafka/src/main/java/com/baeldung/monitoring/service/LagAnalyzerService.java b/spring-kafka/src/main/java/com/baeldung/monitoring/service/LagAnalyzerService.java
new file mode 100644
index 0000000000..b046f0b2c4
--- /dev/null
+++ b/spring-kafka/src/main/java/com/baeldung/monitoring/service/LagAnalyzerService.java
@@ -0,0 +1,105 @@
+package com.baeldung.monitoring.service;
+
+import com.baeldung.monitoring.util.MonitoringUtil;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+
+@Service
+public class LagAnalyzerService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(LagAnalyzerService.class);
+
+ private final AdminClient adminClient;
+ private final KafkaConsumer consumer;
+
+ @Autowired
+ public LagAnalyzerService(
+ @Value("${monitor.kafka.bootstrap.config}") String bootstrapServerConfig) {
+ adminClient = getAdminClient(bootstrapServerConfig);
+ consumer = getKafkaConsumer(bootstrapServerConfig);
+ }
+
+ public Map analyzeLag(
+ String groupId)
+ throws ExecutionException, InterruptedException {
+ Map consumerGrpOffsets = getConsumerGrpOffsets(groupId);
+ Map producerOffsets = getProducerOffsets(consumerGrpOffsets);
+ Map lags = computeLags(consumerGrpOffsets, producerOffsets);
+ for (Map.Entry lagEntry : lags.entrySet()) {
+ String topic = lagEntry.getKey().topic();
+ int partition = lagEntry.getKey().partition();
+ Long lag = lagEntry.getValue();
+ LOGGER.info("Time={} | Lag for topic = {}, partition = {} is {}",
+ MonitoringUtil.time(),
+ topic,
+ partition,
+ lag);
+ }
+ return lags;
+ }
+
+ public Map getConsumerGrpOffsets(String groupId)
+ throws ExecutionException, InterruptedException {
+ ListConsumerGroupOffsetsResult info = adminClient.listConsumerGroupOffsets(groupId);
+ Map metadataMap
+ = info.partitionsToOffsetAndMetadata().get();
+ Map groupOffset = new HashMap<>();
+ for (Map.Entry entry : metadataMap.entrySet()) {
+ TopicPartition key = entry.getKey();
+ OffsetAndMetadata metadata = entry.getValue();
+ groupOffset.putIfAbsent(new TopicPartition(key.topic(), key.partition()), metadata.offset());
+ }
+ return groupOffset;
+ }
+
+ private Map getProducerOffsets(
+ Map consumerGrpOffset) {
+ List topicPartitions = new LinkedList<>();
+ for (Map.Entry entry : consumerGrpOffset.entrySet()) {
+ TopicPartition key = entry.getKey();
+ topicPartitions.add(new TopicPartition(key.topic(), key.partition()));
+ }
+ return consumer.endOffsets(topicPartitions);
+ }
+
+ public Map computeLags(
+ Map consumerGrpOffsets,
+ Map producerOffsets) {
+ Map lags = new HashMap<>();
+ for (Map.Entry entry : consumerGrpOffsets.entrySet()) {
+ Long producerOffset = producerOffsets.get(entry.getKey());
+ Long consumerOffset = consumerGrpOffsets.get(entry.getKey());
+ long lag = Math.abs(Math.max(0, producerOffset) - Math.max(0, consumerOffset));
+ lags.putIfAbsent(entry.getKey(), lag);
+ }
+ return lags;
+ }
+
+ private AdminClient getAdminClient(String bootstrapServerConfig) {
+ Properties config = new Properties();
+ config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
+ return AdminClient.create(config);
+ }
+
+ private KafkaConsumer getKafkaConsumer(String bootstrapServerConfig) {
+ Properties properties = new Properties();
+ properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
+ properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ return new KafkaConsumer<>(properties);
+ }
+}
diff --git a/spring-kafka/src/main/java/com/baeldung/monitoring/service/LiveLagAnalyzerService.java b/spring-kafka/src/main/java/com/baeldung/monitoring/service/LiveLagAnalyzerService.java
new file mode 100644
index 0000000000..a20b9e9a0c
--- /dev/null
+++ b/spring-kafka/src/main/java/com/baeldung/monitoring/service/LiveLagAnalyzerService.java
@@ -0,0 +1,28 @@
+package com.baeldung.monitoring.service;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.ExecutionException;
+
+@Service
+public class LiveLagAnalyzerService {
+
+ private final LagAnalyzerService lagAnalyzerService;
+ private final String groupId;
+
+ @Autowired
+ public LiveLagAnalyzerService(
+ LagAnalyzerService lagAnalyzerService,
+ @Value(value = "${monitor.kafka.consumer.groupid}") String groupId) {
+ this.lagAnalyzerService = lagAnalyzerService;
+ this.groupId = groupId;
+ }
+
+ @Scheduled(fixedDelay = 5000L)
+ public void liveLagAnalysis() throws ExecutionException, InterruptedException {
+ lagAnalyzerService.analyzeLag(groupId);
+ }
+}
diff --git a/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/ConsumerSimulator.java b/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/ConsumerSimulator.java
new file mode 100644
index 0000000000..2d376432e5
--- /dev/null
+++ b/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/ConsumerSimulator.java
@@ -0,0 +1,17 @@
+package com.baeldung.monitoring.simulation;
+
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Service;
+
+@Service
+public class ConsumerSimulator {
+
+ @KafkaListener(
+ topics = "${monitor.topic.name}",
+ containerFactory = "kafkaListenerContainerFactory",
+ autoStartup = "${monitor.consumer.simulate}")
+ public void listenGroup(String message) throws InterruptedException {
+ Thread.sleep(10L);
+ }
+}
diff --git a/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/KafkaConsumerConfig.java b/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/KafkaConsumerConfig.java
new file mode 100644
index 0000000000..a4a8847bcf
--- /dev/null
+++ b/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/KafkaConsumerConfig.java
@@ -0,0 +1,54 @@
+package com.baeldung.monitoring.simulation;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@EnableKafka
+@Configuration
+public class KafkaConsumerConfig {
+
+ @Value(value = "${monitor.kafka.bootstrap.config}")
+ private String bootstrapAddress;
+ @Value(value = "${monitor.kafka.consumer.groupid}")
+ private String groupId;
+ @Value(value = "${monitor.kafka.consumer.groupid.simulate}")
+ private String simulateGroupId;
+ @Value(value = "${monitor.producer.simulate}")
+ private boolean enabled;
+
+ public ConsumerFactory consumerFactory(String groupId) {
+ Map props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+ if (enabled) {
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ } else {
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, simulateGroupId);
+ }
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 0);
+ return new DefaultKafkaConsumerFactory<>(props);
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ if (enabled) {
+ factory.setConsumerFactory(consumerFactory(groupId));
+ } else {
+ factory.setConsumerFactory(consumerFactory(simulateGroupId));
+ }
+ return factory;
+ }
+}
diff --git a/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/KafkaProducerConfig.java b/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/KafkaProducerConfig.java
new file mode 100644
index 0000000000..80048a47ee
--- /dev/null
+++ b/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/KafkaProducerConfig.java
@@ -0,0 +1,29 @@
+package com.baeldung.monitoring.simulation;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+public class KafkaProducerConfig {
+
+ @Value("${monitor.kafka.bootstrap.config}")
+ private String bootstrapAddress;
+
+ @Bean
+ public KafkaTemplate kafkaTemplate() {
+ Map configProps = new HashMap<>();
+ configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+ configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(configProps);
+ return new KafkaTemplate<>(producerFactory);
+ }
+}
diff --git a/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/ProducerSimulator.java b/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/ProducerSimulator.java
new file mode 100644
index 0000000000..30476ff7ec
--- /dev/null
+++ b/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/ProducerSimulator.java
@@ -0,0 +1,44 @@
+package com.baeldung.monitoring.simulation;
+
+import org.apache.commons.lang3.BooleanUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+import java.util.Date;
+import java.util.concurrent.ExecutionException;
+
+import static com.baeldung.monitoring.util.MonitoringUtil.endTime;
+import static com.baeldung.monitoring.util.MonitoringUtil.time;
+
+@Service
+public class ProducerSimulator {
+
+ private final KafkaTemplate kafkaTemplate;
+ private final String topicName;
+ private final boolean enabled;
+
+ @Autowired
+ public ProducerSimulator(
+ KafkaTemplate kafkaTemplate,
+ @Value(value = "${monitor.topic.name}") String topicName,
+ @Value(value = "${monitor.producer.simulate}") String enabled) {
+ this.kafkaTemplate = kafkaTemplate;
+ this.topicName = topicName;
+ this.enabled = BooleanUtils.toBoolean(enabled);
+ }
+
+ @Scheduled(fixedDelay = 1L, initialDelay = 5L)
+ public void sendMessage() throws ExecutionException, InterruptedException {
+ if (enabled) {
+ if (endTime.after(new Date())) {
+ String message = "msg-" + time();
+ SendResult result = kafkaTemplate.send(topicName, message).get();
+ }
+ }
+ }
+}
diff --git a/spring-kafka/src/main/java/com/baeldung/monitoring/util/MonitoringUtil.java b/spring-kafka/src/main/java/com/baeldung/monitoring/util/MonitoringUtil.java
new file mode 100644
index 0000000000..a7ba78b1e3
--- /dev/null
+++ b/spring-kafka/src/main/java/com/baeldung/monitoring/util/MonitoringUtil.java
@@ -0,0 +1,17 @@
+package com.baeldung.monitoring.util;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Date;
+
+public class MonitoringUtil {
+ public static final Date startTime = new Date();
+ public static final Date endTime = new Date(startTime.getTime() + 30 * 1000);
+
+ public static String time() {
+ DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");
+ LocalDateTime now = LocalDateTime.now();
+ String date = dtf.format(now);
+ return date;
+ }
+}
diff --git a/spring-kafka/src/main/resources/application.properties b/spring-kafka/src/main/resources/application.properties
index eaf113191e..e6a4668da3 100644
--- a/spring-kafka/src/main/resources/application.properties
+++ b/spring-kafka/src/main/resources/application.properties
@@ -2,4 +2,14 @@ kafka.bootstrapAddress=localhost:9092
message.topic.name=baeldung
greeting.topic.name=greeting
filtered.topic.name=filtered
-partitioned.topic.name=partitioned
\ No newline at end of file
+partitioned.topic.name=partitioned
+
+# monitoring - lag analysis
+monitor.kafka.bootstrap.config=localhost:9092
+monitor.kafka.consumer.groupid=baeldungGrp
+monitor.topic.name=baeldung
+
+# monitoring - simulation
+monitor.producer.simulate=true
+monitor.consumer.simulate=true
+monitor.kafka.consumer.groupid.simulate=baeldungGrpSimulate
\ No newline at end of file
diff --git a/spring-kafka/src/test/java/com/baeldung/monitoring/LiveLagAnalyzerServiceLiveTest.java b/spring-kafka/src/test/java/com/baeldung/monitoring/LiveLagAnalyzerServiceLiveTest.java
new file mode 100644
index 0000000000..9dcbbc4837
--- /dev/null
+++ b/spring-kafka/src/test/java/com/baeldung/monitoring/LiveLagAnalyzerServiceLiveTest.java
@@ -0,0 +1,174 @@
+package com.baeldung.monitoring;
+
+import com.baeldung.monitoring.service.LagAnalyzerService;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+
+@RunWith(SpringRunner.class)
+@DirtiesContext
+@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9085", "port=9085"})
+@EnableKafka
+public class LiveLagAnalyzerServiceLiveTest {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(LiveLagAnalyzerServiceLiveTest.class);
+
+ private static KafkaConsumer consumer;
+ private static KafkaProducer producer;
+ private static LagAnalyzerService lagAnalyzerService;
+ private static final String GROUP_ID = "baeldungGrp";
+ private static final String TOPIC = "baeldung";
+ private static final int PARTITION = 0;
+ private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
+ private static final String BOOTSTRAP_SERVER_CONFIG = "localhost:9085";
+ private static final int BATCH_SIZE = 100;
+ private static final long POLL_DURATION = 1000L;
+
+ @Before
+ public void setup() throws Exception {
+ initProducer();
+ initConsumer();
+ lagAnalyzerService = new LagAnalyzerService(BOOTSTRAP_SERVER_CONFIG);
+ consume();
+ }
+
+ @Test
+ public void givenEmbeddedKafkaBroker_whenAllProducedMessagesAreConsumed_thenLagBecomesZero()
+ throws ExecutionException, InterruptedException {
+ produce();
+ long consumeLag = 0L;
+ consume();
+ Map lag = lagAnalyzerService.analyzeLag(GROUP_ID);
+ Assert.assertNotNull(lag);
+ Assert.assertEquals(1, lag.size());
+ consumeLag = lag.get(TOPIC_PARTITION);
+ Assert.assertEquals(0L, consumeLag);
+ produce();
+ produce();
+ lag = lagAnalyzerService.analyzeLag(GROUP_ID);
+ Assert.assertNotNull(lag);
+ Assert.assertEquals(1, lag.size());
+ consumeLag = lag.get(TOPIC_PARTITION);
+ Assert.assertEquals(200L, consumeLag);
+
+ produce();
+ produce();
+
+ lag = lagAnalyzerService.analyzeLag(GROUP_ID);
+ Assert.assertNotNull(lag);
+ Assert.assertEquals(1, lag.size());
+ consumeLag = lag.get(TOPIC_PARTITION);
+ Assert.assertEquals(400L, consumeLag);
+
+ produce();
+ lag = lagAnalyzerService.analyzeLag(GROUP_ID);
+ Assert.assertNotNull(lag);
+ Assert.assertEquals(1, lag.size());
+ consumeLag = lag.get(TOPIC_PARTITION);
+ Assert.assertEquals(500L, consumeLag);
+
+ consume();
+ lag = lagAnalyzerService.analyzeLag(GROUP_ID);
+ Assert.assertNotNull(lag);
+ Assert.assertEquals(1, lag.size());
+ consumeLag = lag.get(TOPIC_PARTITION);
+ Assert.assertEquals(consumeLag, 0L);
+ }
+
+ @Test
+ public void givenEmbeddedKafkaBroker_whenMessageNotConsumed_thenLagIsEqualToProducedMessage()
+ throws ExecutionException, InterruptedException {
+ produce();
+ Map lagByTopicPartition = lagAnalyzerService.analyzeLag(GROUP_ID);
+ Assert.assertNotNull(lagByTopicPartition);
+ Assert.assertEquals(1, lagByTopicPartition.size());
+ long LAG = lagByTopicPartition.get(TOPIC_PARTITION);
+ Assert.assertEquals(BATCH_SIZE, LAG);
+ }
+
+ @Test
+ public void givenEmbeddedKafkaBroker_whenMessageConsumedLessThanProduced_thenLagIsNonZero()
+ throws ExecutionException, InterruptedException {
+ produce();
+ consume();
+ produce();
+ produce();
+ Map lagByTopicPartition = lagAnalyzerService.analyzeLag(GROUP_ID);
+ Assert.assertNotNull(lagByTopicPartition);
+ Assert.assertEquals(1, lagByTopicPartition.size());
+ long LAG = lagByTopicPartition.get(TOPIC_PARTITION);
+ Assert.assertEquals(2 * BATCH_SIZE, LAG);
+ }
+
+ private static void consume() {
+ try {
+ ConsumerRecords record = consumer.poll(Duration.ofMillis(POLL_DURATION));
+ consumer.commitSync();
+ Thread.sleep(POLL_DURATION);
+ } catch (Exception ex) {
+ LOGGER.error("Exception caught in consume", ex);
+ }
+ }
+
+ private static void produce() {
+ try {
+ int count = BATCH_SIZE;
+ while (count > 0) {
+ String messageKey = UUID.randomUUID().toString();
+ String messageValue = UUID.randomUUID().toString() + "_" + count;
+ ProducerRecord producerRecord = new ProducerRecord<>(TOPIC, messageKey, messageValue);
+ RecordMetadata recordMetadata = producer.send(producerRecord).get();
+ LOGGER.info("Message with key = {}, value = {} sent to partition = {}, offset = {}, topic = {}",
+ messageKey,
+ messageValue,
+ recordMetadata.partition(),
+ recordMetadata.offset(),
+ recordMetadata.topic());
+ count--;
+ }
+ } catch (Exception ex) {
+ LOGGER.error("Exception caught in produce", ex);
+ }
+ }
+
+ private static void initConsumer() {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER_CONFIG);
+ props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
+ consumer = new KafkaConsumer<>(props);
+ consumer.assign(Arrays.asList(TOPIC_PARTITION));
+ consumer.poll(Duration.ofMillis(1L));
+ consumer.commitSync();
+ }
+
+ private static void initProducer() {
+ Map configProps = new HashMap<>();
+ configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER_CONFIG);
+ configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ producer = new KafkaProducer<>(configProps);
+ }
+}
diff --git a/spring-kafka/src/test/resources/application.yml b/spring-kafka/src/test/resources/application.yml
index 7d7997c6fd..8b245f08b1 100644
--- a/spring-kafka/src/test/resources/application.yml
+++ b/spring-kafka/src/test/resources/application.yml
@@ -4,4 +4,11 @@ spring:
auto-offset-reset: earliest
group-id: baeldung
test:
- topic: embedded-test-topic
\ No newline at end of file
+ topic: embedded-test-topic
+
+monitor:
+ kafka:
+ bootstrap:
+ config: "PLAINTEXT://localhost:9085"
+ consumer:
+ groupid: "baeldungGrp"