diff --git a/spring-kafka/pom.xml b/spring-kafka/pom.xml index 5c370880b4..b76d4f10c0 100644 --- a/spring-kafka/pom.xml +++ b/spring-kafka/pom.xml @@ -33,7 +33,7 @@ - 1.1.3.RELEASE + 2.2.2.RELEASE 2.9.7 diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java index 4ee7f40335..b313eafdb9 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java @@ -13,8 +13,11 @@ import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.TopicPartition; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.SendResult; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; @SpringBootApplication public class KafkaApplication { @@ -98,7 +101,20 @@ public class KafkaApplication { private String greetingTopicName; public void sendMessage(String message) { - kafkaTemplate.send(topicName, message); + + ListenableFuture> future = kafkaTemplate.send(topicName, message); + + future.addCallback(new ListenableFutureCallback>() { + + @Override + public void onSuccess(SendResult result) { + System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]"); + } + @Override + public void onFailure(Throwable ex) { + System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage()); + } + }); } public void sendMessageToPartion(String message, int partition) { diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java new file mode 100644 index 0000000000..a3426e78a3 --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java @@ -0,0 +1,57 @@ +package com.baeldung.spring.kafka; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaAdmin; + +@Configuration +public class KafkaTopicConfig { + + @Value(value = "${kafka.bootstrapAddress}") + private String bootstrapAddress; + + @Value(value = "${message.topic.name}") + private String topicName; + + @Value(value = "${partitioned.topic.name}") + private String partionedTopicName; + + @Value(value = "${filtered.topic.name}") + private String filteredTopicName; + + @Value(value = "${greeting.topic.name}") + private String greetingTopicName; + + @Bean + public KafkaAdmin kafkaAdmin() { + Map configs = new HashMap<>(); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + return new KafkaAdmin(configs); + } + + @Bean + public NewTopic topic1() { + return new NewTopic(topicName, 1, (short) 1); + } + + @Bean + public NewTopic topic2() { + return new NewTopic(partionedTopicName, 6, (short) 1); + } + + @Bean + public NewTopic topic3() { + return new NewTopic(filteredTopicName, 1, (short) 1); + } + + @Bean + public NewTopic topic4() { + return new NewTopic(greetingTopicName, 1, (short) 1); + } +} diff --git a/spring-kafka/src/test/java/org/baeldung/SpringContextLiveTest.java b/spring-kafka/src/test/java/org/baeldung/SpringContextLiveTest.java new file mode 100644 index 0000000000..d8fb3131f5 --- /dev/null +++ b/spring-kafka/src/test/java/org/baeldung/SpringContextLiveTest.java @@ -0,0 +1,17 @@ +package org.baeldung; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +import com.baeldung.spring.kafka.KafkaApplication; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = KafkaApplication.class) +public class SpringContextLiveTest { + + @Test + public void whenSpringContextIsBootstrapped_thenNoExceptions() { + } +}