JAVA-18598 Update Intro to Apache Kafka with Spring article
This commit is contained in:
parent
8a02df3f58
commit
d1144a5605
|
@ -9,9 +9,9 @@
|
|||
|
||||
<parent>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>parent-boot-2</artifactId>
|
||||
<artifactId>parent-boot-3</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<relativePath>../parent-boot-2</relativePath>
|
||||
<relativePath>../parent-boot-3</relativePath>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
@ -61,8 +61,24 @@
|
|||
<artifactId>awaitility</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
<configuration>
|
||||
<mainClass>com.baeldung.spring.kafka.KafkaApplication</mainClass>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
<properties>
|
||||
<testcontainers-kafka.version>1.16.2</testcontainers-kafka.version>
|
||||
</properties>
|
||||
|
|
|
@ -6,7 +6,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
|
|||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
|
|
@ -15,9 +15,12 @@ public class KafkaProducer {
|
|||
public void sendMessage(String message, String topic) {
|
||||
log.info("Producing message: {}", message);
|
||||
kafkaTemplate.send(topic, "key", message)
|
||||
.addCallback(
|
||||
result -> log.info("Message sent to topic: {}", message),
|
||||
ex -> log.error("Failed to send message", ex)
|
||||
);
|
||||
.whenComplete((result, ex) -> {
|
||||
if (ex == null) {
|
||||
log.info("Message sent to topic: {}", message);
|
||||
} else {
|
||||
log.error("Failed to send message", ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,9 +15,12 @@ public class KafkaProducer {
|
|||
|
||||
public void sendMessage(String message) {
|
||||
kafkaTemplate.send("input-topic", message)
|
||||
.addCallback(
|
||||
result -> log.info("Message sent to topic: {}", message),
|
||||
ex -> log.error("Failed to send message", ex)
|
||||
);
|
||||
.whenComplete((result, ex) -> {
|
||||
if (ex == null) {
|
||||
log.info("Message sent to topic: {}", message);
|
||||
} else {
|
||||
log.error("Failed to send message", ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.baeldung.spring.kafka;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -16,8 +17,6 @@ import org.springframework.kafka.support.KafkaHeaders;
|
|||
import org.springframework.kafka.support.SendResult;
|
||||
import org.springframework.messaging.handler.annotation.Header;
|
||||
import org.springframework.messaging.handler.annotation.Payload;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
import org.springframework.util.concurrent.ListenableFutureCallback;
|
||||
|
||||
@SpringBootApplication
|
||||
public class KafkaApplication {
|
||||
|
@ -102,18 +101,13 @@ public class KafkaApplication {
|
|||
|
||||
public void sendMessage(String message) {
|
||||
|
||||
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
|
||||
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
|
||||
future.whenComplete((result, ex) -> {
|
||||
|
||||
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
|
||||
|
||||
@Override
|
||||
public void onSuccess(SendResult<String, String> result) {
|
||||
if (ex == null) {
|
||||
System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata()
|
||||
.offset() + "]");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable ex) {
|
||||
} else {
|
||||
System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage());
|
||||
}
|
||||
});
|
||||
|
@ -155,13 +149,13 @@ public class KafkaApplication {
|
|||
}
|
||||
|
||||
@KafkaListener(topics = "${message.topic.name}", containerFactory = "headersKafkaListenerContainerFactory")
|
||||
public void listenWithHeaders(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
|
||||
public void listenWithHeaders(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
|
||||
System.out.println("Received Message: " + message + " from partition: " + partition);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@KafkaListener(topicPartitions = @TopicPartition(topic = "${partitioned.topic.name}", partitions = { "0", "3" }), containerFactory = "partitionsKafkaListenerContainerFactory")
|
||||
public void listenToPartition(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
|
||||
public void listenToPartition(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
|
||||
System.out.println("Received Message: " + message + " from partition: " + partition);
|
||||
this.partitionLatch.countDown();
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ import org.junit.jupiter.api.io.TempDir;
|
|||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.web.client.TestRestTemplate;
|
||||
import org.springframework.boot.web.server.LocalServerPort;
|
||||
import org.springframework.boot.test.web.server.LocalServerPort;
|
||||
import org.springframework.http.HttpEntity;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
|
|
Loading…
Reference in New Issue