JAVA-32180 Upgrade spring-kafka-3 to boot-3 (#16120)

This commit is contained in:
anuragkumawat 2024-03-16 18:08:45 +05:30 committed by GitHub
parent c30d36118d
commit 9d0e847f15
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 9 additions and 6 deletions

View File

@ -2,9 +2,9 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent> <parent>
<groupId>com.baeldung</groupId> <groupId>com.baeldung</groupId>
<artifactId>parent-boot-2</artifactId> <artifactId>parent-boot-3</artifactId>
<version>0.0.1-SNAPSHOT</version> <version>0.0.1-SNAPSHOT</version>
<relativePath>../parent-boot-2</relativePath> <relativePath>../parent-boot-3</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
@ -22,6 +22,7 @@
<dependency> <dependency>
<groupId>org.springframework.kafka</groupId> <groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId> <artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
@ -54,8 +55,9 @@
<properties> <properties>
<java.version>17</java.version> <java.version>17</java.version>
<kafka-version>3.0.12</kafka-version> <spring-kafka.version>3.1.2</spring-kafka.version>
<testcontainers.version>1.19.3</testcontainers.version> <testcontainers.version>1.19.3</testcontainers.version>
<awaitility.version>4.2.0</awaitility.version> <awaitility.version>4.2.0</awaitility.version>
<start-class>org.springframework.boot.SpringApplication.Application</start-class>
</properties> </properties>
</project> </project>

View File

@ -13,8 +13,9 @@ class KafkaErrorHandler implements CommonErrorHandler {
private static final Logger log = LoggerFactory.getLogger(KafkaErrorHandler.class); private static final Logger log = LoggerFactory.getLogger(KafkaErrorHandler.class);
@Override @Override
public void handleRecord(Exception exception, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, MessageListenerContainer container) { public boolean handleOne(Exception exception, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, MessageListenerContainer container) {
handle(exception, consumer); handle(exception, consumer);
return true;
} }
@Override @Override

View File

@ -20,13 +20,13 @@ public class KafkaMessageConsumer {
String topicName = (String) headers.get(KafkaHeaders.TOPIC); String topicName = (String) headers.get(KafkaHeaders.TOPIC);
System.out.println("Topic: " + topicName); System.out.println("Topic: " + topicName);
int partitionID = (int) headers.get(KafkaHeaders.RECEIVED_PARTITION_ID); int partitionID = (int) headers.get(KafkaHeaders.RECEIVED_PARTITION);
System.out.println("Partition ID: " + partitionID); System.out.println("Partition ID: " + partitionID);
} }
@KafkaListener(topics = { "my-topic" }, groupId = "my-consumer-group") @KafkaListener(topics = { "my-topic" }, groupId = "my-consumer-group")
public void listen(@Payload String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topicName, public void listen(@Payload String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topicName,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
System.out.println("Topic: " + topicName); System.out.println("Topic: " + topicName);
System.out.println("Partition ID: " + partition); System.out.println("Partition ID: " + partition);
} }