Merge pull request #7217 from amit2103/BAEL-14274-20

[BAEL-14274] - Fixed article code for https://www.baeldung.com/spring…
This commit is contained in:
Loredana Crusoveanu 2019-07-20 21:47:17 +03:00 committed by GitHub
commit 0c0aff29df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -29,39 +29,36 @@ public class KafkaConsumerConfig {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props); return new DefaultKafkaConsumerFactory<>(props);
} }
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(groupId));
return factory;
}
@Bean @Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() { public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); return kafkaListenerContainerFactory("foo");
factory.setConsumerFactory(consumerFactory("foo"));
return factory;
} }
@Bean @Bean
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() { public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); return kafkaListenerContainerFactory("bar");
factory.setConsumerFactory(consumerFactory("bar"));
return factory;
} }
@Bean @Bean
public ConcurrentKafkaListenerContainerFactory<String, String> headersKafkaListenerContainerFactory() { public ConcurrentKafkaListenerContainerFactory<String, String> headersKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); return kafkaListenerContainerFactory("headers");
factory.setConsumerFactory(consumerFactory("headers"));
return factory;
} }
@Bean @Bean
public ConcurrentKafkaListenerContainerFactory<String, String> partitionsKafkaListenerContainerFactory() { public ConcurrentKafkaListenerContainerFactory<String, String> partitionsKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); return kafkaListenerContainerFactory("partitions");
factory.setConsumerFactory(consumerFactory("partitions"));
return factory;
} }
@Bean @Bean
public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() { public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory("filter");
factory.setConsumerFactory(consumerFactory("filter"));
factory.setRecordFilterStrategy(record -> record.value() factory.setRecordFilterStrategy(record -> record.value()
.contains("World")); .contains("World"));
return factory; return factory;