Merge pull request #6090 from amit2103/BAEL-11415
[BAEL-11415] - Initial commit with sparing-kafka version and topic cr…
This commit is contained in:
		
						commit
						feccd5d345
					
				| @ -33,7 +33,7 @@ | |||||||
|     </dependencies> |     </dependencies> | ||||||
| 
 | 
 | ||||||
|     <properties> |     <properties> | ||||||
|         <spring-kafka.version>1.1.3.RELEASE</spring-kafka.version> |         <spring-kafka.version>2.2.2.RELEASE</spring-kafka.version> | ||||||
|         <jackson.version>2.9.7</jackson.version> |         <jackson.version>2.9.7</jackson.version> | ||||||
|     </properties> |     </properties> | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -13,8 +13,11 @@ import org.springframework.kafka.annotation.KafkaListener; | |||||||
| import org.springframework.kafka.annotation.TopicPartition; | import org.springframework.kafka.annotation.TopicPartition; | ||||||
| import org.springframework.kafka.core.KafkaTemplate; | import org.springframework.kafka.core.KafkaTemplate; | ||||||
| import org.springframework.kafka.support.KafkaHeaders; | import org.springframework.kafka.support.KafkaHeaders; | ||||||
|  | import org.springframework.kafka.support.SendResult; | ||||||
| import org.springframework.messaging.handler.annotation.Header; | import org.springframework.messaging.handler.annotation.Header; | ||||||
| import org.springframework.messaging.handler.annotation.Payload; | import org.springframework.messaging.handler.annotation.Payload; | ||||||
|  | import org.springframework.util.concurrent.ListenableFuture; | ||||||
|  | import org.springframework.util.concurrent.ListenableFutureCallback; | ||||||
| 
 | 
 | ||||||
| @SpringBootApplication | @SpringBootApplication | ||||||
| public class KafkaApplication { | public class KafkaApplication { | ||||||
| @ -98,7 +101,20 @@ public class KafkaApplication { | |||||||
|         private String greetingTopicName; |         private String greetingTopicName; | ||||||
| 
 | 
 | ||||||
|         public void sendMessage(String message) { |         public void sendMessage(String message) { | ||||||
|             kafkaTemplate.send(topicName, message); |              | ||||||
|  |             ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message); | ||||||
|  |              | ||||||
|  |             future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { | ||||||
|  | 
 | ||||||
|  |                 @Override | ||||||
|  |                 public void onSuccess(SendResult<String, String> result) { | ||||||
|  |                     System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]"); | ||||||
|  |                 } | ||||||
|  |                 @Override | ||||||
|  |                 public void onFailure(Throwable ex) { | ||||||
|  |                     System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage()); | ||||||
|  |                 } | ||||||
|  |             }); | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         public void sendMessageToPartion(String message, int partition) { |         public void sendMessageToPartion(String message, int partition) { | ||||||
|  | |||||||
| @ -0,0 +1,57 @@ | |||||||
|  | package com.baeldung.spring.kafka; | ||||||
|  | 
 | ||||||
|  | import java.util.HashMap; | ||||||
|  | import java.util.Map; | ||||||
|  | 
 | ||||||
|  | import org.apache.kafka.clients.admin.AdminClientConfig; | ||||||
|  | import org.apache.kafka.clients.admin.NewTopic; | ||||||
|  | import org.springframework.beans.factory.annotation.Value; | ||||||
|  | import org.springframework.context.annotation.Bean; | ||||||
|  | import org.springframework.context.annotation.Configuration; | ||||||
|  | import org.springframework.kafka.core.KafkaAdmin; | ||||||
|  | 
 | ||||||
|  | @Configuration | ||||||
|  | public class KafkaTopicConfig { | ||||||
|  |      | ||||||
|  |     @Value(value = "${kafka.bootstrapAddress}") | ||||||
|  |     private String bootstrapAddress; | ||||||
|  |      | ||||||
|  |     @Value(value = "${message.topic.name}") | ||||||
|  |     private String topicName; | ||||||
|  | 
 | ||||||
|  |     @Value(value = "${partitioned.topic.name}") | ||||||
|  |     private String partionedTopicName; | ||||||
|  | 
 | ||||||
|  |     @Value(value = "${filtered.topic.name}") | ||||||
|  |     private String filteredTopicName; | ||||||
|  | 
 | ||||||
|  |     @Value(value = "${greeting.topic.name}") | ||||||
|  |     private String greetingTopicName; | ||||||
|  |      | ||||||
|  |     @Bean | ||||||
|  |     public KafkaAdmin kafkaAdmin() { | ||||||
|  |         Map<String, Object> configs = new HashMap<>(); | ||||||
|  |         configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); | ||||||
|  |         return new KafkaAdmin(configs); | ||||||
|  |     } | ||||||
|  |      | ||||||
|  |     @Bean | ||||||
|  |     public NewTopic topic1() { | ||||||
|  |          return new NewTopic(topicName, 1, (short) 1); | ||||||
|  |     } | ||||||
|  |      | ||||||
|  |     @Bean | ||||||
|  |     public NewTopic topic2() { | ||||||
|  |          return new NewTopic(partionedTopicName, 6, (short) 1); | ||||||
|  |     } | ||||||
|  |      | ||||||
|  |     @Bean | ||||||
|  |     public NewTopic topic3() { | ||||||
|  |          return new NewTopic(filteredTopicName, 1, (short) 1); | ||||||
|  |     } | ||||||
|  |      | ||||||
|  |     @Bean | ||||||
|  |     public NewTopic topic4() { | ||||||
|  |          return new NewTopic(greetingTopicName, 1, (short) 1); | ||||||
|  |     } | ||||||
|  | } | ||||||
| @ -0,0 +1,17 @@ | |||||||
|  | package org.baeldung; | ||||||
|  | 
 | ||||||
|  | import org.junit.Test; | ||||||
|  | import org.junit.runner.RunWith; | ||||||
|  | import org.springframework.boot.test.context.SpringBootTest; | ||||||
|  | import org.springframework.test.context.junit4.SpringRunner; | ||||||
|  | 
 | ||||||
|  | import com.baeldung.spring.kafka.KafkaApplication; | ||||||
|  | 
 | ||||||
|  | @RunWith(SpringRunner.class) | ||||||
|  | @SpringBootTest(classes = KafkaApplication.class) | ||||||
|  | public class SpringContextLiveTest { | ||||||
|  | 
 | ||||||
|  |     @Test | ||||||
|  |     public void whenSpringContextIsBootstrapped_thenNoExceptions() { | ||||||
|  |     } | ||||||
|  | } | ||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user