diff --git a/spring-kafka-3/pom.xml b/spring-kafka-3/pom.xml index 894eab2576..f94ce7748b 100644 --- a/spring-kafka-3/pom.xml +++ b/spring-kafka-3/pom.xml @@ -2,9 +2,9 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> com.baeldung - parent-boot-2 + parent-boot-3 0.0.1-SNAPSHOT - ../parent-boot-2 + ../parent-boot-3 4.0.0 @@ -22,6 +22,7 @@ org.springframework.kafka spring-kafka + ${spring-kafka.version} com.fasterxml.jackson.core @@ -54,8 +55,9 @@ 17 - 3.0.12 + 3.1.2 1.19.3 4.2.0 + org.springframework.boot.SpringApplication.Application diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/KafkaErrorHandler.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/KafkaErrorHandler.java index ea4211ab53..99d676e6c5 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/KafkaErrorHandler.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/deserialization/exception/KafkaErrorHandler.java @@ -13,8 +13,9 @@ class KafkaErrorHandler implements CommonErrorHandler { private static final Logger log = LoggerFactory.getLogger(KafkaErrorHandler.class); @Override - public void handleRecord(Exception exception, ConsumerRecord record, Consumer consumer, MessageListenerContainer container) { + public boolean handleOne(Exception exception, ConsumerRecord record, Consumer consumer, MessageListenerContainer container) { handle(exception, consumer); + return true; } @Override diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/viewheaders/KafkaMessageConsumer.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/viewheaders/KafkaMessageConsumer.java index 3bdc13d968..a9a414ac9f 100644 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/viewheaders/KafkaMessageConsumer.java +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/viewheaders/KafkaMessageConsumer.java @@ -20,13 +20,13 @@ public class KafkaMessageConsumer { String topicName = (String) headers.get(KafkaHeaders.TOPIC); System.out.println("Topic: " + topicName); - int partitionID = (int) headers.get(KafkaHeaders.RECEIVED_PARTITION_ID); + int partitionID = (int) headers.get(KafkaHeaders.RECEIVED_PARTITION); System.out.println("Partition ID: " + partitionID); } @KafkaListener(topics = { "my-topic" }, groupId = "my-consumer-group") public void listen(@Payload String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topicName, - @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { + @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) { System.out.println("Topic: " + topicName); System.out.println("Partition ID: " + partition); }