BAEL-4855 | Monitor the Kafka Consume Lag in Java (#10866)

This commit is contained in:
Tapan Avasthi 2021-06-26 04:52:01 +05:30 committed by GitHub
parent 01699ba08a
commit 52495bef53
15 changed files with 556 additions and 2 deletions

View File

@ -40,6 +40,11 @@
<version>${testcontainers-kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
<properties>

View File

@ -0,0 +1,15 @@
package com.baeldung.monitoring;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class LagAnalyzerApplication {
public static void main(String[] args) {
SpringApplication.run(LagAnalyzerApplication.class, args);
while (true) ;
}
}

View File

@ -0,0 +1,10 @@
build:
mvn clean install -B -U
start-kafka:
docker-compose up -d
check-kafka:
nc -z localhost 2181
nc -z localhost 9092
docker-compose logs kafka | grep -i 'started'
stop-kafka:
docker-compose down --remove-orphans

View File

@ -0,0 +1,16 @@
## Monitoring Consumer Lag
## Spin Up Local Kafka Container
```
$ make start-kafka
```
## Verify that Kafka is Up
```
$ make check-kafka
```
## Stop Local Kafka Container
```
$ make stop-kafka
```

View File

@ -0,0 +1,23 @@
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

View File

@ -0,0 +1,105 @@
package com.baeldung.monitoring.service;
import com.baeldung.monitoring.util.MonitoringUtil;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.ExecutionException;
@Service
public class LagAnalyzerService {
private static final Logger LOGGER = LoggerFactory.getLogger(LagAnalyzerService.class);
private final AdminClient adminClient;
private final KafkaConsumer<String, String> consumer;
@Autowired
public LagAnalyzerService(
@Value("${monitor.kafka.bootstrap.config}") String bootstrapServerConfig) {
adminClient = getAdminClient(bootstrapServerConfig);
consumer = getKafkaConsumer(bootstrapServerConfig);
}
public Map<TopicPartition, Long> analyzeLag(
String groupId)
throws ExecutionException, InterruptedException {
Map<TopicPartition, Long> consumerGrpOffsets = getConsumerGrpOffsets(groupId);
Map<TopicPartition, Long> producerOffsets = getProducerOffsets(consumerGrpOffsets);
Map<TopicPartition, Long> lags = computeLags(consumerGrpOffsets, producerOffsets);
for (Map.Entry<TopicPartition, Long> lagEntry : lags.entrySet()) {
String topic = lagEntry.getKey().topic();
int partition = lagEntry.getKey().partition();
Long lag = lagEntry.getValue();
LOGGER.info("Time={} | Lag for topic = {}, partition = {} is {}",
MonitoringUtil.time(),
topic,
partition,
lag);
}
return lags;
}
public Map<TopicPartition, Long> getConsumerGrpOffsets(String groupId)
throws ExecutionException, InterruptedException {
ListConsumerGroupOffsetsResult info = adminClient.listConsumerGroupOffsets(groupId);
Map<TopicPartition, OffsetAndMetadata> metadataMap
= info.partitionsToOffsetAndMetadata().get();
Map<TopicPartition, Long> groupOffset = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : metadataMap.entrySet()) {
TopicPartition key = entry.getKey();
OffsetAndMetadata metadata = entry.getValue();
groupOffset.putIfAbsent(new TopicPartition(key.topic(), key.partition()), metadata.offset());
}
return groupOffset;
}
private Map<TopicPartition, Long> getProducerOffsets(
Map<TopicPartition, Long> consumerGrpOffset) {
List<TopicPartition> topicPartitions = new LinkedList<>();
for (Map.Entry<TopicPartition, Long> entry : consumerGrpOffset.entrySet()) {
TopicPartition key = entry.getKey();
topicPartitions.add(new TopicPartition(key.topic(), key.partition()));
}
return consumer.endOffsets(topicPartitions);
}
public Map<TopicPartition, Long> computeLags(
Map<TopicPartition, Long> consumerGrpOffsets,
Map<TopicPartition, Long> producerOffsets) {
Map<TopicPartition, Long> lags = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry : consumerGrpOffsets.entrySet()) {
Long producerOffset = producerOffsets.get(entry.getKey());
Long consumerOffset = consumerGrpOffsets.get(entry.getKey());
long lag = Math.abs(Math.max(0, producerOffset) - Math.max(0, consumerOffset));
lags.putIfAbsent(entry.getKey(), lag);
}
return lags;
}
private AdminClient getAdminClient(String bootstrapServerConfig) {
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
return AdminClient.create(config);
}
private KafkaConsumer<String, String> getKafkaConsumer(String bootstrapServerConfig) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new KafkaConsumer<>(properties);
}
}

View File

@ -0,0 +1,28 @@
package com.baeldung.monitoring.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.concurrent.ExecutionException;
@Service
public class LiveLagAnalyzerService {
private final LagAnalyzerService lagAnalyzerService;
private final String groupId;
@Autowired
public LiveLagAnalyzerService(
LagAnalyzerService lagAnalyzerService,
@Value(value = "${monitor.kafka.consumer.groupid}") String groupId) {
this.lagAnalyzerService = lagAnalyzerService;
this.groupId = groupId;
}
@Scheduled(fixedDelay = 5000L)
public void liveLagAnalysis() throws ExecutionException, InterruptedException {
lagAnalyzerService.analyzeLag(groupId);
}
}

View File

@ -0,0 +1,17 @@
package com.baeldung.monitoring.simulation;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class ConsumerSimulator {
@KafkaListener(
topics = "${monitor.topic.name}",
containerFactory = "kafkaListenerContainerFactory",
autoStartup = "${monitor.consumer.simulate}")
public void listenGroup(String message) throws InterruptedException {
Thread.sleep(10L);
}
}

View File

@ -0,0 +1,54 @@
package com.baeldung.monitoring.simulation;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${monitor.kafka.bootstrap.config}")
private String bootstrapAddress;
@Value(value = "${monitor.kafka.consumer.groupid}")
private String groupId;
@Value(value = "${monitor.kafka.consumer.groupid.simulate}")
private String simulateGroupId;
@Value(value = "${monitor.producer.simulate}")
private boolean enabled;
public ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
if (enabled) {
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
} else {
props.put(ConsumerConfig.GROUP_ID_CONFIG, simulateGroupId);
}
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 0);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
if (enabled) {
factory.setConsumerFactory(consumerFactory(groupId));
} else {
factory.setConsumerFactory(consumerFactory(simulateGroupId));
}
return factory;
}
}

View File

@ -0,0 +1,29 @@
package com.baeldung.monitoring.simulation;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${monitor.kafka.bootstrap.config}")
private String bootstrapAddress;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configProps);
return new KafkaTemplate<>(producerFactory);
}
}

View File

@ -0,0 +1,44 @@
package com.baeldung.monitoring.simulation;
import org.apache.commons.lang3.BooleanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import static com.baeldung.monitoring.util.MonitoringUtil.endTime;
import static com.baeldung.monitoring.util.MonitoringUtil.time;
@Service
public class ProducerSimulator {
private final KafkaTemplate<String, String> kafkaTemplate;
private final String topicName;
private final boolean enabled;
@Autowired
public ProducerSimulator(
KafkaTemplate<String, String> kafkaTemplate,
@Value(value = "${monitor.topic.name}") String topicName,
@Value(value = "${monitor.producer.simulate}") String enabled) {
this.kafkaTemplate = kafkaTemplate;
this.topicName = topicName;
this.enabled = BooleanUtils.toBoolean(enabled);
}
@Scheduled(fixedDelay = 1L, initialDelay = 5L)
public void sendMessage() throws ExecutionException, InterruptedException {
if (enabled) {
if (endTime.after(new Date())) {
String message = "msg-" + time();
SendResult<String, String> result = kafkaTemplate.send(topicName, message).get();
}
}
}
}

View File

@ -0,0 +1,17 @@
package com.baeldung.monitoring.util;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
public class MonitoringUtil {
public static final Date startTime = new Date();
public static final Date endTime = new Date(startTime.getTime() + 30 * 1000);
public static String time() {
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");
LocalDateTime now = LocalDateTime.now();
String date = dtf.format(now);
return date;
}
}

View File

@ -2,4 +2,14 @@ kafka.bootstrapAddress=localhost:9092
message.topic.name=baeldung
greeting.topic.name=greeting
filtered.topic.name=filtered
partitioned.topic.name=partitioned
partitioned.topic.name=partitioned
# monitoring - lag analysis
monitor.kafka.bootstrap.config=localhost:9092
monitor.kafka.consumer.groupid=baeldungGrp
monitor.topic.name=baeldung
# monitoring - simulation
monitor.producer.simulate=true
monitor.consumer.simulate=true
monitor.kafka.consumer.groupid.simulate=baeldungGrpSimulate

View File

@ -0,0 +1,174 @@
package com.baeldung.monitoring;
import com.baeldung.monitoring.service.LagAnalyzerService;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9085", "port=9085"})
@EnableKafka
public class LiveLagAnalyzerServiceLiveTest {
private static final Logger LOGGER = LoggerFactory.getLogger(LiveLagAnalyzerServiceLiveTest.class);
private static KafkaConsumer<String, String> consumer;
private static KafkaProducer<String, String> producer;
private static LagAnalyzerService lagAnalyzerService;
private static final String GROUP_ID = "baeldungGrp";
private static final String TOPIC = "baeldung";
private static final int PARTITION = 0;
private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
private static final String BOOTSTRAP_SERVER_CONFIG = "localhost:9085";
private static final int BATCH_SIZE = 100;
private static final long POLL_DURATION = 1000L;
@Before
public void setup() throws Exception {
initProducer();
initConsumer();
lagAnalyzerService = new LagAnalyzerService(BOOTSTRAP_SERVER_CONFIG);
consume();
}
@Test
public void givenEmbeddedKafkaBroker_whenAllProducedMessagesAreConsumed_thenLagBecomesZero()
throws ExecutionException, InterruptedException {
produce();
long consumeLag = 0L;
consume();
Map<TopicPartition, Long> lag = lagAnalyzerService.analyzeLag(GROUP_ID);
Assert.assertNotNull(lag);
Assert.assertEquals(1, lag.size());
consumeLag = lag.get(TOPIC_PARTITION);
Assert.assertEquals(0L, consumeLag);
produce();
produce();
lag = lagAnalyzerService.analyzeLag(GROUP_ID);
Assert.assertNotNull(lag);
Assert.assertEquals(1, lag.size());
consumeLag = lag.get(TOPIC_PARTITION);
Assert.assertEquals(200L, consumeLag);
produce();
produce();
lag = lagAnalyzerService.analyzeLag(GROUP_ID);
Assert.assertNotNull(lag);
Assert.assertEquals(1, lag.size());
consumeLag = lag.get(TOPIC_PARTITION);
Assert.assertEquals(400L, consumeLag);
produce();
lag = lagAnalyzerService.analyzeLag(GROUP_ID);
Assert.assertNotNull(lag);
Assert.assertEquals(1, lag.size());
consumeLag = lag.get(TOPIC_PARTITION);
Assert.assertEquals(500L, consumeLag);
consume();
lag = lagAnalyzerService.analyzeLag(GROUP_ID);
Assert.assertNotNull(lag);
Assert.assertEquals(1, lag.size());
consumeLag = lag.get(TOPIC_PARTITION);
Assert.assertEquals(consumeLag, 0L);
}
@Test
public void givenEmbeddedKafkaBroker_whenMessageNotConsumed_thenLagIsEqualToProducedMessage()
throws ExecutionException, InterruptedException {
produce();
Map<TopicPartition, Long> lagByTopicPartition = lagAnalyzerService.analyzeLag(GROUP_ID);
Assert.assertNotNull(lagByTopicPartition);
Assert.assertEquals(1, lagByTopicPartition.size());
long LAG = lagByTopicPartition.get(TOPIC_PARTITION);
Assert.assertEquals(BATCH_SIZE, LAG);
}
@Test
public void givenEmbeddedKafkaBroker_whenMessageConsumedLessThanProduced_thenLagIsNonZero()
throws ExecutionException, InterruptedException {
produce();
consume();
produce();
produce();
Map<TopicPartition, Long> lagByTopicPartition = lagAnalyzerService.analyzeLag(GROUP_ID);
Assert.assertNotNull(lagByTopicPartition);
Assert.assertEquals(1, lagByTopicPartition.size());
long LAG = lagByTopicPartition.get(TOPIC_PARTITION);
Assert.assertEquals(2 * BATCH_SIZE, LAG);
}
private static void consume() {
try {
ConsumerRecords<String, String> record = consumer.poll(Duration.ofMillis(POLL_DURATION));
consumer.commitSync();
Thread.sleep(POLL_DURATION);
} catch (Exception ex) {
LOGGER.error("Exception caught in consume", ex);
}
}
private static void produce() {
try {
int count = BATCH_SIZE;
while (count > 0) {
String messageKey = UUID.randomUUID().toString();
String messageValue = UUID.randomUUID().toString() + "_" + count;
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC, messageKey, messageValue);
RecordMetadata recordMetadata = producer.send(producerRecord).get();
LOGGER.info("Message with key = {}, value = {} sent to partition = {}, offset = {}, topic = {}",
messageKey,
messageValue,
recordMetadata.partition(),
recordMetadata.offset(),
recordMetadata.topic());
count--;
}
} catch (Exception ex) {
LOGGER.error("Exception caught in produce", ex);
}
}
private static void initConsumer() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER_CONFIG);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
consumer = new KafkaConsumer<>(props);
consumer.assign(Arrays.asList(TOPIC_PARTITION));
consumer.poll(Duration.ofMillis(1L));
consumer.commitSync();
}
private static void initProducer() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER_CONFIG);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producer = new KafkaProducer<>(configProps);
}
}

View File

@ -4,4 +4,11 @@ spring:
auto-offset-reset: earliest
group-id: baeldung
test:
topic: embedded-test-topic
topic: embedded-test-topic
monitor:
kafka:
bootstrap:
config: "PLAINTEXT://localhost:9085"
consumer:
groupid: "baeldungGrp"