[BAEL-14274] - Fixed article code for https://www.baeldung.com/spring-kafka

This commit is contained in:
amit2103 2019-06-30 01:05:30 +05:30
parent 883f0a525b
commit 57adf2dcfe
1 changed files with 11 additions and 14 deletions

View File

@ -29,39 +29,36 @@ public class KafkaConsumerConfig {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props); return new DefaultKafkaConsumerFactory<>(props);
} }
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(groupId));
return factory;
}
@Bean @Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() { public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); return kafkaListenerContainerFactory("foo");
factory.setConsumerFactory(consumerFactory("foo"));
return factory;
} }
@Bean @Bean
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() { public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); return kafkaListenerContainerFactory("bar");
factory.setConsumerFactory(consumerFactory("bar"));
return factory;
} }
@Bean @Bean
public ConcurrentKafkaListenerContainerFactory<String, String> headersKafkaListenerContainerFactory() { public ConcurrentKafkaListenerContainerFactory<String, String> headersKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); return kafkaListenerContainerFactory("headers");
factory.setConsumerFactory(consumerFactory("headers"));
return factory;
} }
@Bean @Bean
public ConcurrentKafkaListenerContainerFactory<String, String> partitionsKafkaListenerContainerFactory() { public ConcurrentKafkaListenerContainerFactory<String, String> partitionsKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); return kafkaListenerContainerFactory("partitions");
factory.setConsumerFactory(consumerFactory("partitions"));
return factory;
} }
@Bean @Bean
public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() { public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory("filter");
factory.setConsumerFactory(consumerFactory("filter"));
factory.setRecordFilterStrategy(record -> record.value() factory.setRecordFilterStrategy(record -> record.value()
.contains("World")); .contains("World"));
return factory; return factory;