diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java index b313eafdb9..fde56bebc0 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java @@ -25,7 +25,7 @@ public class KafkaApplication { public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args); - + MessageProducer producer = context.getBean(MessageProducer.class); MessageListener listener = context.getBean(MessageListener.class); /* @@ -101,15 +101,17 @@ public class KafkaApplication { private String greetingTopicName; public void sendMessage(String message) { - + ListenableFuture> future = kafkaTemplate.send(topicName, message); - + future.addCallback(new ListenableFutureCallback>() { @Override public void onSuccess(SendResult result) { - System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]"); + System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata() + .offset() + "]"); } + @Override public void onFailure(Throwable ex) { System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage()); @@ -158,7 +160,7 @@ public class KafkaApplication { latch.countDown(); } - @KafkaListener(topicPartitions = @TopicPartition(topic = "${partitioned.topic.name}", partitions = { "0", "3" })) + @KafkaListener(topicPartitions = @TopicPartition(topic = "${partitioned.topic.name}", partitions = { "0", "3" }), containerFactory = "partitionsKafkaListenerContainerFactory") public void listenToParition(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { System.out.println("Received Message: " + message + " from partition: " + partition); this.partitionLatch.countDown(); diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java index 933d2353aa..abaa431eec 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java @@ -29,7 +29,7 @@ public class KafkaConsumerConfig { props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } - + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(String groupId) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory(groupId)); @@ -50,12 +50,12 @@ public class KafkaConsumerConfig { public ConcurrentKafkaListenerContainerFactory headersKafkaListenerContainerFactory() { return kafkaListenerContainerFactory("headers"); } - + @Bean public ConcurrentKafkaListenerContainerFactory partitionsKafkaListenerContainerFactory() { return kafkaListenerContainerFactory("partitions"); } - + @Bean public ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = kafkaListenerContainerFactory("filter"); diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java index 7e2527b36e..0223bab0fe 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java @@ -32,7 +32,7 @@ public class KafkaProducerConfig { public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } - + @Bean public ProducerFactory greetingProducerFactory() { Map configProps = new HashMap<>(); @@ -41,10 +41,10 @@ public class KafkaProducerConfig { configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } - + @Bean public KafkaTemplate greetingKafkaTemplate() { return new KafkaTemplate<>(greetingProducerFactory()); } - + } diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java index a3426e78a3..fb60fadde4 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaTopicConfig.java @@ -12,10 +12,10 @@ import org.springframework.kafka.core.KafkaAdmin; @Configuration public class KafkaTopicConfig { - + @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; - + @Value(value = "${message.topic.name}") private String topicName; @@ -27,31 +27,31 @@ public class KafkaTopicConfig { @Value(value = "${greeting.topic.name}") private String greetingTopicName; - + @Bean public KafkaAdmin kafkaAdmin() { Map configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); return new KafkaAdmin(configs); } - + @Bean public NewTopic topic1() { - return new NewTopic(topicName, 1, (short) 1); + return new NewTopic(topicName, 1, (short) 1); } - + @Bean public NewTopic topic2() { - return new NewTopic(partionedTopicName, 6, (short) 1); + return new NewTopic(partionedTopicName, 6, (short) 1); } - + @Bean public NewTopic topic3() { - return new NewTopic(filteredTopicName, 1, (short) 1); + return new NewTopic(filteredTopicName, 1, (short) 1); } - + @Bean public NewTopic topic4() { - return new NewTopic(greetingTopicName, 1, (short) 1); + return new NewTopic(greetingTopicName, 1, (short) 1); } }