From 57adf2dcfe37dcf3eba50348fc0b864d8400777b Mon Sep 17 00:00:00 2001 From: amit2103 Date: Sun, 30 Jun 2019 01:05:30 +0530 Subject: [PATCH] [BAEL-14274] - Fixed article code for https://www.baeldung.com/spring-kafka --- .../spring/kafka/KafkaConsumerConfig.java | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java index 9353e63ff6..933d2353aa 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java @@ -29,39 +29,36 @@ public class KafkaConsumerConfig { props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } + + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(String groupId) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory(groupId)); + return factory; + } @Bean public ConcurrentKafkaListenerContainerFactory fooKafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory("foo")); - return factory; + return kafkaListenerContainerFactory("foo"); } @Bean public ConcurrentKafkaListenerContainerFactory barKafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory("bar")); - return factory; + return kafkaListenerContainerFactory("bar"); } @Bean public ConcurrentKafkaListenerContainerFactory headersKafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory("headers")); - return factory; + return kafkaListenerContainerFactory("headers"); } @Bean public ConcurrentKafkaListenerContainerFactory partitionsKafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory("partitions")); - return factory; + return kafkaListenerContainerFactory("partitions"); } @Bean public ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory("filter")); + ConcurrentKafkaListenerContainerFactory factory = kafkaListenerContainerFactory("filter"); factory.setRecordFilterStrategy(record -> record.value() .contains("World")); return factory;