BAEL-578: Add spring-kafka module (#1407)
This commit is contained in:
		
							parent
							
								
									6aefd62288
								
							
						
					
					
						commit
						dbc2c49fe2
					
				
							
								
								
									
										9
									
								
								spring-kafka/README.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										9
									
								
								spring-kafka/README.md
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,9 @@ | ||||
| # Spring Kakfa | ||||
| 
 | ||||
| This is a simple Spring Boot app to demonstrate sending and receiving of messages in Kafka using spring-kafka. | ||||
| 
 | ||||
| As Kafka topics are not created automatically by default, this application requires that a topic named 'baeldung' is created manually. | ||||
| 
 | ||||
| `$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic baeldung` | ||||
| 
 | ||||
| Two listeners with group Ids **foo** and **bar** are configured. When run successfully, the *Hello World!* message will be received by both the listeners and logged on console. | ||||
							
								
								
									
										46
									
								
								spring-kafka/pom.xml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										46
									
								
								spring-kafka/pom.xml
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,46 @@ | ||||
| <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||||
|     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||||
|     <modelVersion>4.0.0</modelVersion> | ||||
| 
 | ||||
|     <groupId>com.baeldung</groupId> | ||||
|     <artifactId>spring-kafka</artifactId> | ||||
|     <version>0.0.1-SNAPSHOT</version> | ||||
| 
 | ||||
|     <name>spring-kafka</name> | ||||
|     <description>Intro to Kafka with Spring</description> | ||||
| 
 | ||||
|     <properties> | ||||
|         <java.version>1.8</java.version> | ||||
|         <spring-kafka.version>1.1.3.RELEASE</spring-kafka.version> | ||||
|     </properties> | ||||
| 
 | ||||
|     <parent> | ||||
|         <groupId>org.springframework.boot</groupId> | ||||
|         <artifactId>spring-boot-starter-parent</artifactId> | ||||
|         <version>1.5.2.RELEASE</version> | ||||
|     </parent> | ||||
| 
 | ||||
|     <dependencies> | ||||
|          | ||||
|         <dependency> | ||||
|             <groupId>org.springframework.boot</groupId> | ||||
|             <artifactId>spring-boot-starter</artifactId> | ||||
|         </dependency> | ||||
|      | ||||
|         <dependency> | ||||
|             <groupId>org.springframework.kafka</groupId> | ||||
|             <artifactId>spring-kafka</artifactId> | ||||
|         </dependency> | ||||
|      | ||||
|     </dependencies> | ||||
| 
 | ||||
|     <build> | ||||
|         <plugins> | ||||
|             <plugin> | ||||
|                 <groupId>org.springframework.boot</groupId> | ||||
|                 <artifactId>spring-boot-maven-plugin</artifactId> | ||||
|             </plugin> | ||||
|         </plugins> | ||||
|     </build> | ||||
| 
 | ||||
| </project> | ||||
| @ -0,0 +1,72 @@ | ||||
| package com.baeldung.spring.kafka; | ||||
| 
 | ||||
| import java.util.concurrent.CountDownLatch; | ||||
| import java.util.concurrent.TimeUnit; | ||||
| 
 | ||||
| import org.springframework.beans.factory.annotation.Autowired; | ||||
| import org.springframework.beans.factory.annotation.Value; | ||||
| import org.springframework.boot.SpringApplication; | ||||
| import org.springframework.boot.autoconfigure.SpringBootApplication; | ||||
| import org.springframework.context.ConfigurableApplicationContext; | ||||
| import org.springframework.context.annotation.Bean; | ||||
| import org.springframework.kafka.annotation.KafkaListener; | ||||
| import org.springframework.kafka.core.KafkaTemplate; | ||||
| 
 | ||||
| @SpringBootApplication | ||||
| public class KafkaApplication { | ||||
| 
 | ||||
|     public static void main(String[] args) throws Exception { | ||||
|         ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args); | ||||
|         MessageProducer producer = context.getBean(MessageProducer.class); | ||||
|         producer.sendMessage("Hello, World!"); | ||||
| 
 | ||||
|         MessageListener listener = context.getBean(MessageListener.class); | ||||
|         listener.latch.await(20, TimeUnit.SECONDS); | ||||
|         Thread.sleep(60000); | ||||
|         context.close(); | ||||
| 
 | ||||
|     } | ||||
| 
 | ||||
|     @Bean | ||||
|     public MessageProducer messageProducer() { | ||||
|         return new MessageProducer(); | ||||
|     } | ||||
| 
 | ||||
|     @Bean | ||||
|     public MessageListener messageListener() { | ||||
|         return new MessageListener(); | ||||
|     } | ||||
| 
 | ||||
|     public static class MessageProducer { | ||||
| 
 | ||||
|         @Autowired | ||||
|         private KafkaTemplate<String, String> kafkaTemplate; | ||||
| 
 | ||||
|         @Value(value = "${message.topic.name}") | ||||
|         private String topicName; | ||||
| 
 | ||||
|         public void sendMessage(String message) { | ||||
|             kafkaTemplate.send(topicName, message); | ||||
|         } | ||||
| 
 | ||||
|     } | ||||
| 
 | ||||
|     public static class MessageListener { | ||||
| 
 | ||||
|         private CountDownLatch latch = new CountDownLatch(2); | ||||
| 
 | ||||
|         @KafkaListener(topics = "${message.topic.name}", group = "foo", containerFactory = "fooKafkaListenerContainerFactory") | ||||
|         public void listenGroupFoo(String message) { | ||||
|             System.out.println("Received Messasge in group 'foo': " + message); | ||||
|             latch.countDown(); | ||||
|         } | ||||
| 
 | ||||
|         @KafkaListener(topics = "${message.topic.name}", group = "bar", containerFactory = "barKafkaListenerContainerFactory") | ||||
|         public void listenGroupBar(String message) { | ||||
|             System.out.println("Received Messasge in group 'bar': " + message); | ||||
|             latch.countDown(); | ||||
|         } | ||||
| 
 | ||||
|     } | ||||
| 
 | ||||
| } | ||||
| @ -0,0 +1,45 @@ | ||||
| package com.baeldung.spring.kafka; | ||||
| 
 | ||||
| import java.util.HashMap; | ||||
| import java.util.Map; | ||||
| 
 | ||||
| import org.apache.kafka.clients.consumer.ConsumerConfig; | ||||
| import org.apache.kafka.common.serialization.StringDeserializer; | ||||
| import org.springframework.beans.factory.annotation.Value; | ||||
| import org.springframework.context.annotation.Bean; | ||||
| import org.springframework.context.annotation.Configuration; | ||||
| import org.springframework.kafka.annotation.EnableKafka; | ||||
| import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; | ||||
| import org.springframework.kafka.core.ConsumerFactory; | ||||
| import org.springframework.kafka.core.DefaultKafkaConsumerFactory; | ||||
| 
 | ||||
| @EnableKafka | ||||
| @Configuration | ||||
| public class KafkaConsumerConfig { | ||||
| 
 | ||||
|     @Value(value = "${kafka.bootstrapAddress}") | ||||
|     private String bootstrapAddress; | ||||
| 
 | ||||
|     public ConsumerFactory<String, String> consumerFactory(String groupId) { | ||||
|         Map<String, Object> props = new HashMap<>(); | ||||
|         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); | ||||
|         props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); | ||||
|         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | ||||
|         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | ||||
|         return new DefaultKafkaConsumerFactory<>(props); | ||||
|     } | ||||
| 
 | ||||
|     @Bean | ||||
|     public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() { | ||||
|         ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); | ||||
|         factory.setConsumerFactory(consumerFactory("foo")); | ||||
|         return factory; | ||||
|     } | ||||
|      | ||||
|     @Bean | ||||
|     public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() { | ||||
|         ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); | ||||
|         factory.setConsumerFactory(consumerFactory("bar")); | ||||
|         return factory; | ||||
|     } | ||||
| } | ||||
| @ -0,0 +1,36 @@ | ||||
| package com.baeldung.spring.kafka; | ||||
| 
 | ||||
| import java.util.HashMap; | ||||
| import java.util.Map; | ||||
| 
 | ||||
| import org.apache.kafka.clients.producer.ProducerConfig; | ||||
| import org.apache.kafka.common.serialization.StringSerializer; | ||||
| import org.springframework.beans.factory.annotation.Value; | ||||
| import org.springframework.context.annotation.Bean; | ||||
| import org.springframework.context.annotation.Configuration; | ||||
| import org.springframework.kafka.core.DefaultKafkaProducerFactory; | ||||
| import org.springframework.kafka.core.KafkaTemplate; | ||||
| import org.springframework.kafka.core.ProducerFactory; | ||||
| 
 | ||||
| @Configuration | ||||
| public class KafkaProducerConfig { | ||||
| 
 | ||||
|     @Value(value = "${kafka.bootstrapAddress}") | ||||
|     private String bootstrapAddress; | ||||
| 
 | ||||
|     @Bean | ||||
|     public ProducerFactory<String, String> producerFactory() { | ||||
|         Map<String, Object> configProps = new HashMap<String, Object>(); | ||||
|         configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); | ||||
|         configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | ||||
|         configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | ||||
|         return new DefaultKafkaProducerFactory<String, String>(configProps); | ||||
|     } | ||||
| 
 | ||||
|     @Bean | ||||
|     public KafkaTemplate<String, String> kafkaTemplate() { | ||||
|         KafkaTemplate<String, String> template =  | ||||
|             new KafkaTemplate<String, String>(producerFactory()); | ||||
|         return template; | ||||
|     } | ||||
| } | ||||
							
								
								
									
										2
									
								
								spring-kafka/src/main/resources/application.properties
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										2
									
								
								spring-kafka/src/main/resources/application.properties
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,2 @@ | ||||
| kafka.bootstrapAddress=localhost:9092 | ||||
| message.topic.name=baeldung | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user