BAEL-5170: Expose Kafka Consumer Metrics via Actuator Endpoint (#13723)

Co-authored-by: Tapan Avasthi <tavasthi@Tapans-MacBook-Air.local>
This commit is contained in:
Tapan Avasthi 2023-03-29 04:15:00 +05:30 committed by GitHub
parent 0895faa78c
commit 4050f47183
8 changed files with 92 additions and 59 deletions

View File

@ -23,6 +23,16 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<version>3.0.5</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>1.10.5</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>

View File

@ -10,6 +10,7 @@ public class LagAnalyzerApplication {
public static void main(String[] args) {
SpringApplication.run(LagAnalyzerApplication.class, args);
while (true) ;
while (true)
;
}
}

View File

@ -1,6 +1,7 @@
package com.baeldung.monitoring.service;
import com.baeldung.monitoring.util.MonitoringUtil;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
@ -27,36 +28,38 @@ public class LagAnalyzerService {
private final KafkaConsumer<String, String> consumer;
@Autowired
public LagAnalyzerService(
@Value("${monitor.kafka.bootstrap.config}") String bootstrapServerConfig) {
public LagAnalyzerService(@Value("${monitor.kafka.bootstrap.config}") String bootstrapServerConfig) {
adminClient = getAdminClient(bootstrapServerConfig);
consumer = getKafkaConsumer(bootstrapServerConfig);
}
public Map<TopicPartition, Long> analyzeLag(
String groupId)
throws ExecutionException, InterruptedException {
public Map<TopicPartition, Long> analyzeLag(String groupId)
throws ExecutionException, InterruptedException {
Map<TopicPartition, Long> consumerGrpOffsets = getConsumerGrpOffsets(groupId);
Map<TopicPartition, Long> producerOffsets = getProducerOffsets(consumerGrpOffsets);
Map<TopicPartition, Long> lags = computeLags(consumerGrpOffsets, producerOffsets);
for (Map.Entry<TopicPartition, Long> lagEntry : lags.entrySet()) {
String topic = lagEntry.getKey().topic();
int partition = lagEntry.getKey().partition();
String topic = lagEntry.getKey()
.topic();
int partition = lagEntry.getKey()
.partition();
Long lag = lagEntry.getValue();
LOGGER.info("Time={} | Lag for topic = {}, partition = {} is {}",
MonitoringUtil.time(),
topic,
partition,
lag);
LOGGER.info("Time={} | Lag for topic = {}, partition = {}, groupId = {} is {}",
MonitoringUtil.time(),
topic,
partition,
groupId,
lag);
}
return lags;
}
public Map<TopicPartition, Long> getConsumerGrpOffsets(String groupId)
throws ExecutionException, InterruptedException {
throws ExecutionException, InterruptedException {
ListConsumerGroupOffsetsResult info = adminClient.listConsumerGroupOffsets(groupId);
Map<TopicPartition, OffsetAndMetadata> metadataMap
= info.partitionsToOffsetAndMetadata().get();
Map<TopicPartition, OffsetAndMetadata> metadataMap = info
.partitionsToOffsetAndMetadata()
.get();
Map<TopicPartition, Long> groupOffset = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : metadataMap.entrySet()) {
TopicPartition key = entry.getKey();
@ -66,8 +69,7 @@ public class LagAnalyzerService {
return groupOffset;
}
private Map<TopicPartition, Long> getProducerOffsets(
Map<TopicPartition, Long> consumerGrpOffset) {
private Map<TopicPartition, Long> getProducerOffsets(Map<TopicPartition, Long> consumerGrpOffset) {
List<TopicPartition> topicPartitions = new LinkedList<>();
for (Map.Entry<TopicPartition, Long> entry : consumerGrpOffset.entrySet()) {
TopicPartition key = entry.getKey();
@ -77,9 +79,9 @@ public class LagAnalyzerService {
}
public Map<TopicPartition, Long> computeLags(
Map<TopicPartition, Long> consumerGrpOffsets,
Map<TopicPartition, Long> producerOffsets) {
Map<TopicPartition, Long> lags = new HashMap<>();
Map<TopicPartition, Long> consumerGrpOffsets,
Map<TopicPartition, Long> producerOffsets) {
Map<TopicPartition, Long> lags = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry : consumerGrpOffsets.entrySet()) {
Long producerOffset = producerOffsets.get(entry.getKey());
Long consumerOffset = consumerGrpOffsets.get(entry.getKey());
@ -91,15 +93,24 @@ public class LagAnalyzerService {
private AdminClient getAdminClient(String bootstrapServerConfig) {
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
config.put(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServerConfig);
return AdminClient.create(config);
}
private KafkaConsumer<String, String> getKafkaConsumer(String bootstrapServerConfig) {
private KafkaConsumer<String, String> getKafkaConsumer(
String bootstrapServerConfig) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServerConfig);
properties.setProperty(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.setProperty(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
return new KafkaConsumer<>(properties);
}
}

View File

@ -15,8 +15,8 @@ public class LiveLagAnalyzerService {
@Autowired
public LiveLagAnalyzerService(
LagAnalyzerService lagAnalyzerService,
@Value(value = "${monitor.kafka.consumer.groupid}") String groupId) {
LagAnalyzerService lagAnalyzerService,
@Value(value = "${monitor.kafka.consumer.groupid}") String groupId) {
this.lagAnalyzerService = lagAnalyzerService;
this.groupId = groupId;
}

View File

@ -7,10 +7,9 @@ import org.springframework.stereotype.Service;
@Service
public class ConsumerSimulator {
@KafkaListener(
topics = "${monitor.topic.name}",
containerFactory = "kafkaListenerContainerFactory",
autoStartup = "${monitor.consumer.simulate}")
@KafkaListener(topics = "${monitor.topic.name}",
containerFactory = "kafkaListenerContainerFactory",
autoStartup = "${monitor.consumer.simulate}")
public void listenGroup(String message) throws InterruptedException {
Thread.sleep(10L);
}

View File

@ -2,53 +2,54 @@ package com.baeldung.monitoring.simulation;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.MicrometerConsumerListener;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import io.micrometer.core.instrument.MeterRegistry;
@EnableKafka
@Configuration
@Component
public class KafkaConsumerConfig {
@Value(value = "${monitor.kafka.bootstrap.config}")
private String bootstrapAddress;
@Value(value = "${monitor.kafka.consumer.groupid}")
private String groupId;
@Value(value = "${monitor.kafka.consumer.groupid.simulate}")
private String simulateGroupId;
@Value(value = "${monitor.producer.simulate}")
private boolean enabled;
public ConsumerFactory<String, String> consumerFactory(String groupId) {
@Autowired
private MeterRegistry meterRegistry;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
if (enabled) {
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
} else {
props.put(ConsumerConfig.GROUP_ID_CONFIG, simulateGroupId);
}
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 0);
return new DefaultKafkaConsumerFactory<>(props);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
consumerFactory.addListener(new MicrometerConsumerListener<>(this.meterRegistry));
return consumerFactory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
if (enabled) {
factory.setConsumerFactory(consumerFactory(groupId));
} else {
factory.setConsumerFactory(consumerFactory(simulateGroupId));
}
return factory;
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory(@Qualifier("consumerFactory") ConsumerFactory<String,
String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> listenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory<>();
listenerContainerFactory.setConsumerFactory(consumerFactory);
return listenerContainerFactory;
}
}

View File

@ -23,10 +23,9 @@ public class ProducerSimulator {
private final boolean enabled;
@Autowired
public ProducerSimulator(
KafkaTemplate<String, String> kafkaTemplate,
@Value(value = "${monitor.topic.name}") String topicName,
@Value(value = "${monitor.producer.simulate}") String enabled) {
public ProducerSimulator(KafkaTemplate<String, String> kafkaTemplate,
@Value(value = "${monitor.topic.name}") String topicName,
@Value(value = "${monitor.producer.simulate}") String enabled) {
this.kafkaTemplate = kafkaTemplate;
this.topicName = topicName;
this.enabled = BooleanUtils.toBoolean(enabled);
@ -37,7 +36,9 @@ public class ProducerSimulator {
if (enabled) {
if (endTime.after(new Date())) {
String message = "msg-" + time();
SendResult<String, String> result = kafkaTemplate.send(topicName, message).get();
SendResult<String, String> result = kafkaTemplate
.send(topicName, message)
.get();
}
}
}

View File

@ -1,3 +1,4 @@
server.port=8081
spring.kafka.bootstrap-servers=localhost:9092
message.topic.name=baeldung
long.message.topic.name=longMessage
@ -15,3 +16,12 @@ monitor.consumer.simulate=true
monitor.kafka.consumer.groupid.simulate=baeldungGrpSimulate
test.topic=testtopic1
management.endpoints.web.base-path=/actuator
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
management.endpoint.metrics.enabled=true
management.endpoint.prometheus.enabled=true
spring.jmx.enabled=false