Merge pull request #13538 from anuragkumawat/JAVA-18598

JAVA-18598 Update Intro to Apache Kafka with Spring article
This commit is contained in:
Loredana Crusoveanu 2023-02-26 18:19:21 +02:00 committed by GitHub
commit 3b2d9be8b3
6 changed files with 41 additions and 25 deletions

View File

@ -9,9 +9,9 @@
<parent> <parent>
<groupId>com.baeldung</groupId> <groupId>com.baeldung</groupId>
<artifactId>parent-boot-2</artifactId> <artifactId>parent-boot-3</artifactId>
<version>0.0.1-SNAPSHOT</version> <version>0.0.1-SNAPSHOT</version>
<relativePath>../parent-boot-2</relativePath> <relativePath>../parent-boot-3</relativePath>
</parent> </parent>
<dependencies> <dependencies>
@ -61,8 +61,24 @@
<artifactId>awaitility</artifactId> <artifactId>awaitility</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>com.baeldung.spring.kafka.KafkaApplication</mainClass>
</configuration>
</plugin>
</plugins>
</build>
<properties> <properties>
<testcontainers-kafka.version>1.16.2</testcontainers-kafka.version> <testcontainers-kafka.version>1.16.2</testcontainers-kafka.version>
</properties> </properties>

View File

@ -6,7 +6,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;

View File

@ -15,9 +15,12 @@ public class KafkaProducer {
public void sendMessage(String message, String topic) { public void sendMessage(String message, String topic) {
log.info("Producing message: {}", message); log.info("Producing message: {}", message);
kafkaTemplate.send(topic, "key", message) kafkaTemplate.send(topic, "key", message)
.addCallback( .whenComplete((result, ex) -> {
result -> log.info("Message sent to topic: {}", message), if (ex == null) {
ex -> log.error("Failed to send message", ex) log.info("Message sent to topic: {}", message);
); } else {
log.error("Failed to send message", ex);
}
});
} }
} }

View File

@ -15,9 +15,12 @@ public class KafkaProducer {
public void sendMessage(String message) { public void sendMessage(String message) {
kafkaTemplate.send("input-topic", message) kafkaTemplate.send("input-topic", message)
.addCallback( .whenComplete((result, ex) -> {
result -> log.info("Message sent to topic: {}", message), if (ex == null) {
ex -> log.error("Failed to send message", ex) log.info("Message sent to topic: {}", message);
); } else {
log.error("Failed to send message", ex);
}
});
} }
} }

View File

@ -1,5 +1,6 @@
package com.baeldung.spring.kafka; package com.baeldung.spring.kafka;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -16,8 +17,6 @@ import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult; import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload; import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@SpringBootApplication @SpringBootApplication
public class KafkaApplication { public class KafkaApplication {
@ -102,18 +101,13 @@ public class KafkaApplication {
public void sendMessage(String message) { public void sendMessage(String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message); CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
future.whenComplete((result, ex) -> {
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { if (ex == null) {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata() System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata()
.offset() + "]"); .offset() + "]");
} } else {
@Override
public void onFailure(Throwable ex) {
System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage()); System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage());
} }
}); });
@ -155,13 +149,13 @@ public class KafkaApplication {
} }
@KafkaListener(topics = "${message.topic.name}", containerFactory = "headersKafkaListenerContainerFactory") @KafkaListener(topics = "${message.topic.name}", containerFactory = "headersKafkaListenerContainerFactory")
public void listenWithHeaders(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { public void listenWithHeaders(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
System.out.println("Received Message: " + message + " from partition: " + partition); System.out.println("Received Message: " + message + " from partition: " + partition);
latch.countDown(); latch.countDown();
} }
@KafkaListener(topicPartitions = @TopicPartition(topic = "${partitioned.topic.name}", partitions = { "0", "3" }), containerFactory = "partitionsKafkaListenerContainerFactory") @KafkaListener(topicPartitions = @TopicPartition(topic = "${partitioned.topic.name}", partitions = { "0", "3" }), containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { public void listenToPartition(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
System.out.println("Received Message: " + message + " from partition: " + partition); System.out.println("Received Message: " + message + " from partition: " + partition);
this.partitionLatch.countDown(); this.partitionLatch.countDown();
} }

View File

@ -20,7 +20,7 @@ import org.junit.jupiter.api.io.TempDir;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort; import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.http.HttpEntity; import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;