Merge pull request #8782 from srzamfir/feature/BAEL-3185_Kafka_not_strating

Feature/bael 3185 kafka not strating
This commit is contained in:
Eric Martin 2020-02-28 00:25:37 -06:00 committed by GitHub
commit 8bd02c32e3
4 changed files with 24 additions and 22 deletions

View File

@ -25,7 +25,7 @@ public class KafkaApplication {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args); ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);
MessageProducer producer = context.getBean(MessageProducer.class); MessageProducer producer = context.getBean(MessageProducer.class);
MessageListener listener = context.getBean(MessageListener.class); MessageListener listener = context.getBean(MessageListener.class);
/* /*
@ -101,15 +101,17 @@ public class KafkaApplication {
private String greetingTopicName; private String greetingTopicName;
public void sendMessage(String message) { public void sendMessage(String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message); ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override @Override
public void onSuccess(SendResult<String, String> result) { public void onSuccess(SendResult<String, String> result) {
System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]"); System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata()
.offset() + "]");
} }
@Override @Override
public void onFailure(Throwable ex) { public void onFailure(Throwable ex) {
System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage()); System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage());
@ -158,7 +160,7 @@ public class KafkaApplication {
latch.countDown(); latch.countDown();
} }
@KafkaListener(topicPartitions = @TopicPartition(topic = "${partitioned.topic.name}", partitions = { "0", "3" })) @KafkaListener(topicPartitions = @TopicPartition(topic = "${partitioned.topic.name}", partitions = { "0", "3" }), containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToParition(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { public void listenToParition(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println("Received Message: " + message + " from partition: " + partition); System.out.println("Received Message: " + message + " from partition: " + partition);
this.partitionLatch.countDown(); this.partitionLatch.countDown();

View File

@ -29,7 +29,7 @@ public class KafkaConsumerConfig {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props); return new DefaultKafkaConsumerFactory<>(props);
} }
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) { public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(groupId)); factory.setConsumerFactory(consumerFactory(groupId));
@ -50,12 +50,12 @@ public class KafkaConsumerConfig {
public ConcurrentKafkaListenerContainerFactory<String, String> headersKafkaListenerContainerFactory() { public ConcurrentKafkaListenerContainerFactory<String, String> headersKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("headers"); return kafkaListenerContainerFactory("headers");
} }
@Bean @Bean
public ConcurrentKafkaListenerContainerFactory<String, String> partitionsKafkaListenerContainerFactory() { public ConcurrentKafkaListenerContainerFactory<String, String> partitionsKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("partitions"); return kafkaListenerContainerFactory("partitions");
} }
@Bean @Bean
public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() { public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory("filter"); ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory("filter");

View File

@ -32,7 +32,7 @@ public class KafkaProducerConfig {
public KafkaTemplate<String, String> kafkaTemplate() { public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory()); return new KafkaTemplate<>(producerFactory());
} }
@Bean @Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() { public ProducerFactory<String, Greeting> greetingProducerFactory() {
Map<String, Object> configProps = new HashMap<>(); Map<String, Object> configProps = new HashMap<>();
@ -41,10 +41,10 @@ public class KafkaProducerConfig {
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps); return new DefaultKafkaProducerFactory<>(configProps);
} }
@Bean @Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() { public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
return new KafkaTemplate<>(greetingProducerFactory()); return new KafkaTemplate<>(greetingProducerFactory());
} }
} }

View File

@ -12,10 +12,10 @@ import org.springframework.kafka.core.KafkaAdmin;
@Configuration @Configuration
public class KafkaTopicConfig { public class KafkaTopicConfig {
@Value(value = "${kafka.bootstrapAddress}") @Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress; private String bootstrapAddress;
@Value(value = "${message.topic.name}") @Value(value = "${message.topic.name}")
private String topicName; private String topicName;
@ -27,31 +27,31 @@ public class KafkaTopicConfig {
@Value(value = "${greeting.topic.name}") @Value(value = "${greeting.topic.name}")
private String greetingTopicName; private String greetingTopicName;
@Bean @Bean
public KafkaAdmin kafkaAdmin() { public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>(); Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs); return new KafkaAdmin(configs);
} }
@Bean @Bean
public NewTopic topic1() { public NewTopic topic1() {
return new NewTopic(topicName, 1, (short) 1); return new NewTopic(topicName, 1, (short) 1);
} }
@Bean @Bean
public NewTopic topic2() { public NewTopic topic2() {
return new NewTopic(partionedTopicName, 6, (short) 1); return new NewTopic(partionedTopicName, 6, (short) 1);
} }
@Bean @Bean
public NewTopic topic3() { public NewTopic topic3() {
return new NewTopic(filteredTopicName, 1, (short) 1); return new NewTopic(filteredTopicName, 1, (short) 1);
} }
@Bean @Bean
public NewTopic topic4() { public NewTopic topic4() {
return new NewTopic(greetingTopicName, 1, (short) 1); return new NewTopic(greetingTopicName, 1, (short) 1);
} }
} }