From dbc2c49fe2af9ac0bfb7def7fa3730e8f4e6727f Mon Sep 17 00:00:00 2001 From: Vivek Kumar Date: Fri, 17 Mar 2017 00:04:53 +0530 Subject: [PATCH] BAEL-578: Add spring-kafka module (#1407) --- spring-kafka/README.md | 9 +++ spring-kafka/pom.xml | 46 ++++++++++++ .../spring/kafka/KafkaApplication.java | 72 +++++++++++++++++++ .../spring/kafka/KafkaConsumerConfig.java | 45 ++++++++++++ .../spring/kafka/KafkaProducerConfig.java | 36 ++++++++++ .../src/main/resources/application.properties | 2 + 6 files changed, 210 insertions(+) create mode 100644 spring-kafka/README.md create mode 100644 spring-kafka/pom.xml create mode 100644 spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java create mode 100644 spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java create mode 100644 spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java create mode 100644 spring-kafka/src/main/resources/application.properties diff --git a/spring-kafka/README.md b/spring-kafka/README.md new file mode 100644 index 0000000000..2731eca042 --- /dev/null +++ b/spring-kafka/README.md @@ -0,0 +1,9 @@ +# Spring Kakfa + +This is a simple Spring Boot app to demonstrate sending and receiving of messages in Kafka using spring-kafka. + +As Kafka topics are not created automatically by default, this application requires that a topic named 'baeldung' is created manually. + +`$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic baeldung` + +Two listeners with group Ids **foo** and **bar** are configured. When run successfully, the *Hello World!* message will be received by both the listeners and logged on console. diff --git a/spring-kafka/pom.xml b/spring-kafka/pom.xml new file mode 100644 index 0000000000..73eaf3acff --- /dev/null +++ b/spring-kafka/pom.xml @@ -0,0 +1,46 @@ + + 4.0.0 + + com.baeldung + spring-kafka + 0.0.1-SNAPSHOT + + spring-kafka + Intro to Kafka with Spring + + + 1.8 + 1.1.3.RELEASE + + + + org.springframework.boot + spring-boot-starter-parent + 1.5.2.RELEASE + + + + + + org.springframework.boot + spring-boot-starter + + + + org.springframework.kafka + spring-kafka + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + \ No newline at end of file diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java new file mode 100644 index 0000000000..252054a9f1 --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java @@ -0,0 +1,72 @@ +package com.baeldung.spring.kafka; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; + +@SpringBootApplication +public class KafkaApplication { + + public static void main(String[] args) throws Exception { + ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args); + MessageProducer producer = context.getBean(MessageProducer.class); + producer.sendMessage("Hello, World!"); + + MessageListener listener = context.getBean(MessageListener.class); + listener.latch.await(20, TimeUnit.SECONDS); + Thread.sleep(60000); + context.close(); + + } + + @Bean + public MessageProducer messageProducer() { + return new MessageProducer(); + } + + @Bean + public MessageListener messageListener() { + return new MessageListener(); + } + + public static class MessageProducer { + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Value(value = "${message.topic.name}") + private String topicName; + + public void sendMessage(String message) { + kafkaTemplate.send(topicName, message); + } + + } + + public static class MessageListener { + + private CountDownLatch latch = new CountDownLatch(2); + + @KafkaListener(topics = "${message.topic.name}", group = "foo", containerFactory = "fooKafkaListenerContainerFactory") + public void listenGroupFoo(String message) { + System.out.println("Received Messasge in group 'foo': " + message); + latch.countDown(); + } + + @KafkaListener(topics = "${message.topic.name}", group = "bar", containerFactory = "barKafkaListenerContainerFactory") + public void listenGroupBar(String message) { + System.out.println("Received Messasge in group 'bar': " + message); + latch.countDown(); + } + + } + +} diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java new file mode 100644 index 0000000000..f9edda2435 --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java @@ -0,0 +1,45 @@ +package com.baeldung.spring.kafka; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; + +@EnableKafka +@Configuration +public class KafkaConsumerConfig { + + @Value(value = "${kafka.bootstrapAddress}") + private String bootstrapAddress; + + public ConsumerFactory consumerFactory(String groupId) { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory fooKafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory("foo")); + return factory; + } + + @Bean + public ConcurrentKafkaListenerContainerFactory barKafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory("bar")); + return factory; + } +} diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java new file mode 100644 index 0000000000..4f9f9719ee --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java @@ -0,0 +1,36 @@ +package com.baeldung.spring.kafka; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +@Configuration +public class KafkaProducerConfig { + + @Value(value = "${kafka.bootstrapAddress}") + private String bootstrapAddress; + + @Bean + public ProducerFactory producerFactory() { + Map configProps = new HashMap(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new DefaultKafkaProducerFactory(configProps); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + KafkaTemplate template = + new KafkaTemplate(producerFactory()); + return template; + } +} diff --git a/spring-kafka/src/main/resources/application.properties b/spring-kafka/src/main/resources/application.properties new file mode 100644 index 0000000000..a1d73b204c --- /dev/null +++ b/spring-kafka/src/main/resources/application.properties @@ -0,0 +1,2 @@ +kafka.bootstrapAddress=localhost:9092 +message.topic.name=baeldung