Update KafkaApplication.java (#12424)

* Update KafkaApplication.java

Added partition offset to the listener so that it can start consuming the messages from the beginning of the topic.

* Revert "Update KafkaApplication.java"

This reverts commit 28801f85174af8bf2d2bb89e1fb39b52c49a6cf0.

* Counting Messages

1. Counting messages from Producer offset
2. Counting messages from Consumer
3. Rest APIs to view the count

* Refactoring

* Adding 3rd programmatic method to count number of messages in Kafka topic

* Removing irrelevant code

* Server port removed
This commit is contained in:
Muhammad Abdullah Azam Khan 2022-08-08 08:03:20 +04:00 committed by GitHub
parent 2db5fcea1e
commit ef0b7ea20c
2 changed files with 63 additions and 0 deletions

View File

@ -0,0 +1,11 @@
package com.baeldung.countingmessages;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args){
SpringApplication.run(Application.class,args);
}
}

View File

@ -0,0 +1,52 @@
package com.baeldung.countingmessages;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
@Component
public class KafkaCountingMessagesComponent {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public static Map<String, Object> props = new HashMap<>();
@PostConstruct
public void init(){
System.out.println(getTotalNumberOfMessagesInATopic("baeldung"));
}
public Long getTotalNumberOfMessagesInATopic(String topic){
org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(getProps());
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
.map(p -> new TopicPartition(topic, p.partition()))
.collect(Collectors.toList());
consumer.assign(partitions);
consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
.collect(Collectors.toMap(Function.identity(), consumer::position));
return partitions.stream().mapToLong(p -> endPartitions.get(p)).sum();
}
public Map<String, Object> getProps() {
if (props.isEmpty()) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");
}
return props;
}
}