[BAEL-11415] - Initial commit with sparing-kafka version and topic creation configuration onstatup
This commit is contained in:
parent
e4dd6e0b6a
commit
e89c0948d8
|
@ -33,7 +33,7 @@
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<spring-kafka.version>1.1.3.RELEASE</spring-kafka.version>
|
<spring-kafka.version>2.2.2.RELEASE</spring-kafka.version>
|
||||||
<jackson.version>2.9.7</jackson.version>
|
<jackson.version>2.9.7</jackson.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
|
|
|
@ -13,8 +13,11 @@ import org.springframework.kafka.annotation.KafkaListener;
|
||||||
import org.springframework.kafka.annotation.TopicPartition;
|
import org.springframework.kafka.annotation.TopicPartition;
|
||||||
import org.springframework.kafka.core.KafkaTemplate;
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
import org.springframework.kafka.support.KafkaHeaders;
|
import org.springframework.kafka.support.KafkaHeaders;
|
||||||
|
import org.springframework.kafka.support.SendResult;
|
||||||
import org.springframework.messaging.handler.annotation.Header;
|
import org.springframework.messaging.handler.annotation.Header;
|
||||||
import org.springframework.messaging.handler.annotation.Payload;
|
import org.springframework.messaging.handler.annotation.Payload;
|
||||||
|
import org.springframework.util.concurrent.ListenableFuture;
|
||||||
|
import org.springframework.util.concurrent.ListenableFutureCallback;
|
||||||
|
|
||||||
@SpringBootApplication
|
@SpringBootApplication
|
||||||
public class KafkaApplication {
|
public class KafkaApplication {
|
||||||
|
@ -98,7 +101,20 @@ public class KafkaApplication {
|
||||||
private String greetingTopicName;
|
private String greetingTopicName;
|
||||||
|
|
||||||
public void sendMessage(String message) {
|
public void sendMessage(String message) {
|
||||||
kafkaTemplate.send(topicName, message);
|
|
||||||
|
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
|
||||||
|
|
||||||
|
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess(SendResult<String, String> result) {
|
||||||
|
System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]");
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable ex) {
|
||||||
|
System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage());
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendMessageToPartion(String message, int partition) {
|
public void sendMessageToPartion(String message, int partition) {
|
||||||
|
|
|
@ -0,0 +1,57 @@
|
||||||
|
package com.baeldung.spring.kafka;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||||
|
import org.apache.kafka.clients.admin.NewTopic;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.kafka.core.KafkaAdmin;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
public class KafkaTopicConfig {
|
||||||
|
|
||||||
|
@Value(value = "${kafka.bootstrapAddress}")
|
||||||
|
private String bootstrapAddress;
|
||||||
|
|
||||||
|
@Value(value = "${message.topic.name}")
|
||||||
|
private String topicName;
|
||||||
|
|
||||||
|
@Value(value = "${partitioned.topic.name}")
|
||||||
|
private String partionedTopicName;
|
||||||
|
|
||||||
|
@Value(value = "${filtered.topic.name}")
|
||||||
|
private String filteredTopicName;
|
||||||
|
|
||||||
|
@Value(value = "${greeting.topic.name}")
|
||||||
|
private String greetingTopicName;
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public KafkaAdmin kafkaAdmin() {
|
||||||
|
Map<String, Object> configs = new HashMap<>();
|
||||||
|
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||||
|
return new KafkaAdmin(configs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public NewTopic topic1() {
|
||||||
|
return new NewTopic(topicName, 1, (short) 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public NewTopic topic2() {
|
||||||
|
return new NewTopic(partionedTopicName, 6, (short) 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public NewTopic topic3() {
|
||||||
|
return new NewTopic(filteredTopicName, 1, (short) 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public NewTopic topic4() {
|
||||||
|
return new NewTopic(greetingTopicName, 1, (short) 1);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,17 @@
|
||||||
|
package org.baeldung;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
|
import org.springframework.test.context.junit4.SpringRunner;
|
||||||
|
|
||||||
|
import com.baeldung.spring.kafka.KafkaApplication;
|
||||||
|
|
||||||
|
@RunWith(SpringRunner.class)
|
||||||
|
@SpringBootTest(classes = KafkaApplication.class)
|
||||||
|
public class SpringContextLiveTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenSpringContextIsBootstrapped_thenNoExceptions() {
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue