update spring-kafka project with support for multiple partitions and JSON serializer (#1472)

This commit is contained in:
Vivek Kumar 2017-03-24 02:20:30 +05:30 committed by Grzegorz Piwowarek
parent b01c2c5a94
commit e884e3f924
7 changed files with 228 additions and 16 deletions

View File

@ -2,8 +2,28 @@
This is a simple Spring Boot app to demonstrate sending and receiving of messages in Kafka using spring-kafka.
As Kafka topics are not created automatically by default, this application requires that a topic named 'baeldung' is created manually.
As Kafka topics are not created automatically by default, this application requires that you create the following topics manually.
`$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic baeldung`
`$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic baeldung`<br>
`$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic partitioned`<br>
`$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic filtered`<br>
`$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic greeting`<br>
Two listeners with group Ids **foo** and **bar** are configured. When run successfully, the *Hello World!* message will be received by both the listeners and logged on console.
When the application runs successfully, following output is logged on to console (along with spring logs):
#### Message received from the 'baeldung' topic by the basic listeners with groups foo and bar
>Received Messasge in group 'foo': Hello, World!<br>
Received Messasge in group 'bar': Hello, World!
#### Message received from the 'baeldung' topic, with the partition info
>Received Messasge: Hello, World! from partition: 0
#### Message received from the 'partitioned' topic, only from specific partitions
>Received Message: Hello To Partioned Topic! from partition: 0<br>
Received Message: Hello To Partioned Topic! from partition: 3
#### Message received from the 'filtered' topic after filtering
>Recieved Message in filtered listener: Hello Baeldung!
#### Message (Serialized Java Object) received from the 'greeting' topic
>Recieved greeting message: Greetings, World!!

View File

@ -12,6 +12,7 @@
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>1.1.3.RELEASE</spring-kafka.version>
<jackson.version>2.6.7</jackson.version>
</properties>
<parent>
@ -21,17 +22,22 @@
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,37 @@
package com.baeldung.spring.kafka;
public class Greeting {
private String msg;
private String name;
public Greeting() {
}
public Greeting(String msg, String name) {
this.msg = msg;
this.name = name;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return msg + ", " + name + "!";
}
}

View File

@ -10,21 +10,61 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);
MessageProducer producer = context.getBean(MessageProducer.class);
producer.sendMessage("Hello, World!");
MessageListener listener = context.getBean(MessageListener.class);
listener.latch.await(20, TimeUnit.SECONDS);
Thread.sleep(60000);
context.close();
/*
* Sending a Hello World message to topic 'baeldung'.
* Must be recieved by both listeners with group foo
* and bar with containerFactory fooKafkaListenerContainerFactory
* and barKafkaListenerContainerFactory respectively.
* It will also be recieved by the listener with
* headersKafkaListenerContainerFactory as container factory
*/
producer.sendMessage("Hello, World!");
listener.latch.await(10, TimeUnit.SECONDS);
/*
* Sending message to a topic with 5 partition,
* each message to a different partition. But as per
* listener configuration, only the messages from
* partition 0 and 3 will be consumed.
*/
for (int i = 0; i < 5; i++) {
producer.sendMessageToPartion("Hello To Partioned Topic!", i);
}
listener.partitionLatch.await(10, TimeUnit.SECONDS);
/*
* Sending message to 'filtered' topic. As per listener
* configuration, all messages with char sequence
* 'World' will be discarded.
*/
producer.sendMessageToFiltered("Hello Baeldung!");
producer.sendMessageToFiltered("Hello World!");
listener.filterLatch.await(10, TimeUnit.SECONDS);
/*
* Sending message to 'greeting' topic. This will send
* and recieved a java object with the help of
* greetingKafkaListenerContainerFactory.
*/
producer.sendGreetingMessage(new Greeting("Greetings", "World!"));
listener.greetingLatch.await(10, TimeUnit.SECONDS);
context.close();
}
@Bean
@ -42,18 +82,47 @@ public class KafkaApplication {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaTemplate<String, Greeting> greetingKafkaTemplate;
@Value(value = "${message.topic.name}")
private String topicName;
@Value(value = "${partitioned.topic.name}")
private String partionedTopicName;
@Value(value = "${filtered.topic.name}")
private String filteredTopicName;
@Value(value = "${greeting.topic.name}")
private String greetingTopicName;
public void sendMessage(String message) {
kafkaTemplate.send(topicName, message);
}
public void sendMessageToPartion(String message, int partition) {
kafkaTemplate.send(partionedTopicName, partition, message);
}
public void sendMessageToFiltered(String message) {
kafkaTemplate.send(filteredTopicName, message);
}
public void sendGreetingMessage(Greeting greeting) {
greetingKafkaTemplate.send(greetingTopicName, greeting);
}
}
public static class MessageListener {
private CountDownLatch latch = new CountDownLatch(2);
private CountDownLatch latch = new CountDownLatch(3);
private CountDownLatch partitionLatch = new CountDownLatch(2);
private CountDownLatch filterLatch = new CountDownLatch(2);
private CountDownLatch greetingLatch = new CountDownLatch(1);
@KafkaListener(topics = "${message.topic.name}", group = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
@ -67,6 +136,30 @@ public class KafkaApplication {
latch.countDown();
}
@KafkaListener(topics = "${message.topic.name}", containerFactory = "headersKafkaListenerContainerFactory")
public void listenWithHeaders(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println("Received Messasge: " + message + " from partition: " + partition);
latch.countDown();
}
@KafkaListener(topicPartitions = @TopicPartition(topic = "${partitioned.topic.name}", partitions = { "0", "3" }))
public void listenToParition(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println("Received Message: " + message + " from partition: " + partition);
this.partitionLatch.countDown();
}
@KafkaListener(topics = "${filtered.topic.name}", containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
System.out.println("Recieved Message in filtered listener: " + message);
this.filterLatch.countDown();
}
@KafkaListener(topics = "${greeting.topic.name}", containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
System.out.println("Recieved greeting message: " + greeting);
this.greetingLatch.countDown();
}
}
}

View File

@ -12,6 +12,7 @@ import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
@EnableKafka
@Configuration
@ -35,11 +36,49 @@ public class KafkaConsumerConfig {
factory.setConsumerFactory(consumerFactory("foo"));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("bar"));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> headersKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("headers"));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> partitionsKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("partitions"));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("filter"));
factory.setRecordFilterStrategy(record -> record.value()
.contains("World"));
return factory;
}
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "greeting");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Greeting.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(greetingConsumerFactory());
return factory;
}
}

View File

@ -11,6 +11,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
@Configuration
public class KafkaProducerConfig {
@ -29,8 +30,21 @@ public class KafkaProducerConfig {
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> template =
new KafkaTemplate<String, String>(producerFactory());
return template;
return new KafkaTemplate<String, String>(producerFactory());
}
@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
Map<String, Object> configProps = new HashMap<String, Object>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<String, Greeting>(configProps);
}
@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
return new KafkaTemplate<String, Greeting>(greetingProducerFactory());
}
}

View File

@ -1,2 +1,5 @@
kafka.bootstrapAddress=localhost:9092
message.topic.name=baeldung
greeting.topic.name=greeting
filtered.topic.name=filtered
partitioned.topic.name=partitioned