BAEL-6057 - Implementing Retry In Kafka Consumer (#13274)
* BAEL-6057 - Implementing Retry In Kafka Consumer (moved code example to new module) * BAEL-6057 - Implementing Retry In Kafka Consumer (fix on README.md) Co-authored-by: Cesare <cesare.valenti@hotmail.com>
This commit is contained in:
parent
7a202296a7
commit
f396ff0341
7
spring-kafka-2/README.md
Normal file
7
spring-kafka-2/README.md
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
## Spring Kafka 2
|
||||||
|
|
||||||
|
This module contains articles about Spring with Kafka
|
||||||
|
|
||||||
|
### Relevant articles
|
||||||
|
|
||||||
|
- [Implementing Retry In Kafka Consumer]
|
70
spring-kafka-2/pom.xml
Normal file
70
spring-kafka-2/pom.xml
Normal file
@ -0,0 +1,70 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<artifactId>spring-kafka</artifactId>
|
||||||
|
<name>spring-kafka</name>
|
||||||
|
<description>Intro to Kafka with Spring</description>
|
||||||
|
|
||||||
|
<parent>
|
||||||
|
<groupId>com.baeldung</groupId>
|
||||||
|
<artifactId>parent-boot-2</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
<relativePath>../parent-boot-2</relativePath>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-web</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.kafka</groupId>
|
||||||
|
<artifactId>spring-kafka</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.kafka</groupId>
|
||||||
|
<artifactId>kafka-streams</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
<artifactId>jackson-databind</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.projectlombok</groupId>
|
||||||
|
<artifactId>lombok</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.kafka</groupId>
|
||||||
|
<artifactId>spring-kafka-test</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.testcontainers</groupId>
|
||||||
|
<artifactId>kafka</artifactId>
|
||||||
|
<version>${testcontainers-kafka.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.testcontainers</groupId>
|
||||||
|
<artifactId>junit-jupiter</artifactId>
|
||||||
|
<version>${testcontainers-kafka.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.awaitility</groupId>
|
||||||
|
<artifactId>awaitility</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<testcontainers-kafka.version>1.16.2</testcontainers-kafka.version>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
</project>
|
@ -0,0 +1,37 @@
|
|||||||
|
package com.baeldung.spring.kafka;
|
||||||
|
|
||||||
|
public class Farewell {
|
||||||
|
|
||||||
|
private String message;
|
||||||
|
private Integer remainingMinutes;
|
||||||
|
|
||||||
|
public Farewell() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public Farewell(String message, Integer remainingMinutes) {
|
||||||
|
this.message = message;
|
||||||
|
this.remainingMinutes = remainingMinutes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMessage() {
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMessage(String message) {
|
||||||
|
this.message = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getRemainingMinutes() {
|
||||||
|
return remainingMinutes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRemainingMinutes(Integer remainingMinutes) {
|
||||||
|
this.remainingMinutes = remainingMinutes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return message + ". In " + remainingMinutes + "!";
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,37 @@
|
|||||||
|
package com.baeldung.spring.kafka;
|
||||||
|
|
||||||
|
public class Greeting {
|
||||||
|
|
||||||
|
private String msg;
|
||||||
|
private String name;
|
||||||
|
|
||||||
|
public Greeting() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public Greeting(String msg, String name) {
|
||||||
|
this.msg = msg;
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMsg() {
|
||||||
|
return msg;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMsg(String msg) {
|
||||||
|
this.msg = msg;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getName() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setName(String name) {
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return msg + ", " + name + "!";
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,152 @@
|
|||||||
|
package com.baeldung.spring.kafka;
|
||||||
|
|
||||||
|
import java.net.SocketTimeoutException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.context.annotation.Primary;
|
||||||
|
import org.springframework.kafka.annotation.EnableKafka;
|
||||||
|
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||||
|
import org.springframework.kafka.core.ConsumerFactory;
|
||||||
|
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||||
|
import org.springframework.kafka.listener.ContainerProperties;
|
||||||
|
import org.springframework.kafka.listener.DefaultErrorHandler;
|
||||||
|
import org.springframework.kafka.support.converter.RecordMessageConverter;
|
||||||
|
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
|
||||||
|
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
|
||||||
|
import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper;
|
||||||
|
import org.springframework.kafka.support.serializer.JsonDeserializer;
|
||||||
|
import org.springframework.util.backoff.BackOff;
|
||||||
|
import org.springframework.util.backoff.FixedBackOff;
|
||||||
|
|
||||||
|
@EnableKafka
|
||||||
|
@Configuration
|
||||||
|
public class KafkaConsumerConfig {
|
||||||
|
|
||||||
|
@Value(value = "${spring.kafka.bootstrap-servers}")
|
||||||
|
private String bootstrapAddress;
|
||||||
|
|
||||||
|
@Value(value = "${kafka.backoff.interval}")
|
||||||
|
private Long interval;
|
||||||
|
|
||||||
|
@Value(value = "${kafka.backoff.max_failure}")
|
||||||
|
private Long maxAttempts;
|
||||||
|
|
||||||
|
public ConsumerFactory<String, String> consumerFactory(String groupId) {
|
||||||
|
Map<String, Object> props = new HashMap<>();
|
||||||
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||||
|
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
|
||||||
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||||
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||||
|
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");
|
||||||
|
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");
|
||||||
|
return new DefaultKafkaConsumerFactory<>(props);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
|
||||||
|
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
||||||
|
factory.setConsumerFactory(consumerFactory(groupId));
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
|
||||||
|
return kafkaListenerContainerFactory("foo");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
|
||||||
|
return kafkaListenerContainerFactory("bar");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConcurrentKafkaListenerContainerFactory<String, String> headersKafkaListenerContainerFactory() {
|
||||||
|
return kafkaListenerContainerFactory("headers");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConcurrentKafkaListenerContainerFactory<String, String> partitionsKafkaListenerContainerFactory() {
|
||||||
|
return kafkaListenerContainerFactory("partitions");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConcurrentKafkaListenerContainerFactory<String, String> longMessageKafkaListenerContainerFactory() {
|
||||||
|
return kafkaListenerContainerFactory("longMessage");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() {
|
||||||
|
ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory("filter");
|
||||||
|
factory.setRecordFilterStrategy(record -> record.value()
|
||||||
|
.contains("World"));
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
|
||||||
|
Map<String, Object> props = new HashMap<>();
|
||||||
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||||
|
props.put(ConsumerConfig.GROUP_ID_CONFIG, "greeting");
|
||||||
|
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Greeting.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConcurrentKafkaListenerContainerFactory<String, Greeting> kafkaListenerContainerFactory() {
|
||||||
|
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
||||||
|
factory.setConsumerFactory(greetingConsumerFactory());
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public RecordMessageConverter multiTypeConverter() {
|
||||||
|
StringJsonMessageConverter converter = new StringJsonMessageConverter();
|
||||||
|
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
|
||||||
|
typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
|
||||||
|
typeMapper.addTrustedPackages("com.baeldung.spring.kafka");
|
||||||
|
Map<String, Class<?>> mappings = new HashMap<>();
|
||||||
|
mappings.put("greeting", Greeting.class);
|
||||||
|
mappings.put("farewell", Farewell.class);
|
||||||
|
typeMapper.setIdClassMapping(mappings);
|
||||||
|
converter.setTypeMapper(typeMapper);
|
||||||
|
return converter;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConsumerFactory<String, Object> multiTypeConsumerFactory() {
|
||||||
|
HashMap<String, Object> props = new HashMap<>();
|
||||||
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||||
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||||
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
|
||||||
|
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id_test");
|
||||||
|
return new DefaultKafkaConsumerFactory<>(props);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@Primary
|
||||||
|
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
|
||||||
|
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
||||||
|
factory.setConsumerFactory(multiTypeConsumerFactory());
|
||||||
|
factory.setMessageConverter(multiTypeConverter());
|
||||||
|
factory.setCommonErrorHandler(errorHandler());
|
||||||
|
factory.getContainerProperties()
|
||||||
|
.setAckMode(ContainerProperties.AckMode.RECORD);
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public DefaultErrorHandler errorHandler() {
|
||||||
|
BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
|
||||||
|
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
|
||||||
|
System.out.println(String.format("consumed record %s because this exception was thrown",consumerRecord.toString(),e.getClass().getName()));
|
||||||
|
}, fixedBackOff);
|
||||||
|
//Commented because of the test
|
||||||
|
//errorHandler.addRetryableExceptions(SocketTimeoutException.class,RuntimeException.class);
|
||||||
|
errorHandler.addNotRetryableExceptions(NullPointerException.class);
|
||||||
|
return errorHandler;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,67 @@
|
|||||||
|
package com.baeldung.spring.kafka;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||||
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
|
import org.springframework.kafka.core.ProducerFactory;
|
||||||
|
import org.springframework.kafka.support.serializer.JsonSerializer;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
public class KafkaProducerConfig {
|
||||||
|
|
||||||
|
@Value(value = "${spring.kafka.bootstrap-servers}")
|
||||||
|
private String bootstrapAddress;
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ProducerFactory<String, String> producerFactory() {
|
||||||
|
Map<String, Object> configProps = new HashMap<>();
|
||||||
|
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||||
|
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||||
|
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||||
|
configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
|
||||||
|
|
||||||
|
return new DefaultKafkaProducerFactory<>(configProps);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public KafkaTemplate<String, String> kafkaTemplate() {
|
||||||
|
return new KafkaTemplate<>(producerFactory());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ProducerFactory<String, Greeting> greetingProducerFactory() {
|
||||||
|
Map<String, Object> configProps = new HashMap<>();
|
||||||
|
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||||
|
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||||
|
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
|
||||||
|
return new DefaultKafkaProducerFactory<>(configProps);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
|
||||||
|
return new KafkaTemplate<>(greetingProducerFactory());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ProducerFactory<String, Object> multiTypeProducerFactory() {
|
||||||
|
Map<String, Object> configProps = new HashMap<>();
|
||||||
|
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||||
|
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||||
|
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
|
||||||
|
configProps.put(JsonSerializer.TYPE_MAPPINGS, "greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell");
|
||||||
|
return new DefaultKafkaProducerFactory<>(configProps);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public KafkaTemplate<String, Object> multiTypeKafkaTemplate() {
|
||||||
|
return new KafkaTemplate<>(multiTypeProducerFactory());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,77 @@
|
|||||||
|
package com.baeldung.spring.kafka;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||||
|
import org.apache.kafka.clients.admin.NewTopic;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.kafka.core.KafkaAdmin;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
public class KafkaTopicConfig {
|
||||||
|
|
||||||
|
@Value(value = "${spring.kafka.bootstrap-servers}")
|
||||||
|
private String bootstrapAddress;
|
||||||
|
|
||||||
|
@Value(value = "${message.topic.name}")
|
||||||
|
private String topicName;
|
||||||
|
|
||||||
|
@Value(value = "${long.message.topic.name}")
|
||||||
|
private String longMsgTopicName;
|
||||||
|
|
||||||
|
@Value(value = "${partitioned.topic.name}")
|
||||||
|
private String partitionedTopicName;
|
||||||
|
|
||||||
|
@Value(value = "${filtered.topic.name}")
|
||||||
|
private String filteredTopicName;
|
||||||
|
|
||||||
|
@Value(value = "${greeting.topic.name}")
|
||||||
|
private String greetingTopicName;
|
||||||
|
|
||||||
|
@Value(value = "${multi.type.topic.name}")
|
||||||
|
private String multiTypeTopicName;
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public KafkaAdmin kafkaAdmin() {
|
||||||
|
Map<String, Object> configs = new HashMap<>();
|
||||||
|
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||||
|
return new KafkaAdmin(configs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public NewTopic topic1() {
|
||||||
|
return new NewTopic(topicName, 1, (short) 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public NewTopic topic2() {
|
||||||
|
return new NewTopic(partitionedTopicName, 6, (short) 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public NewTopic topic3() {
|
||||||
|
return new NewTopic(filteredTopicName, 1, (short) 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public NewTopic topic4() {
|
||||||
|
return new NewTopic(greetingTopicName, 1, (short) 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public NewTopic topic5() {
|
||||||
|
NewTopic newTopic = new NewTopic(longMsgTopicName, 1, (short) 1);
|
||||||
|
Map<String, String> configs = new HashMap<>();
|
||||||
|
configs.put("max.message.bytes", "20971520");
|
||||||
|
newTopic.configs(configs);
|
||||||
|
return newTopic;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public NewTopic multiTypeTopic() {
|
||||||
|
return new NewTopic(multiTypeTopicName, 1, (short) 1);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,32 @@
|
|||||||
|
package com.baeldung.spring.kafka;
|
||||||
|
|
||||||
|
import org.springframework.kafka.annotation.KafkaHandler;
|
||||||
|
import org.springframework.kafka.annotation.KafkaListener;
|
||||||
|
import org.springframework.messaging.MessagingException;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@KafkaListener(id = "multiGroup", topics = "multitype")
|
||||||
|
public class MultiTypeKafkaListener {
|
||||||
|
|
||||||
|
@KafkaHandler
|
||||||
|
//@RetryableTopic(backoff = @Backoff(value = 3000L), attempts = "5", autoCreateTopics = "false",include = SocketTimeoutException.class, exclude = NullPointerException.class)
|
||||||
|
public void handleGreeting(Greeting greeting) {
|
||||||
|
if (greeting.getName()
|
||||||
|
.equalsIgnoreCase("test")) {
|
||||||
|
throw new MessagingException("test not allowed");
|
||||||
|
}
|
||||||
|
System.out.println("Greeting received: " + greeting);
|
||||||
|
}
|
||||||
|
|
||||||
|
@KafkaHandler
|
||||||
|
public void handleF(Farewell farewell) {
|
||||||
|
System.out.println("Farewell received: " + farewell);
|
||||||
|
}
|
||||||
|
|
||||||
|
@KafkaHandler(isDefault = true)
|
||||||
|
public void unknown(Object object) {
|
||||||
|
System.out.println("Unkown type received: " + object);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
20
spring-kafka-2/src/main/resources/application.properties
Normal file
20
spring-kafka-2/src/main/resources/application.properties
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
spring.kafka.bootstrap-servers=localhost:9092
|
||||||
|
message.topic.name=baeldung
|
||||||
|
long.message.topic.name=longMessage
|
||||||
|
greeting.topic.name=greeting
|
||||||
|
filtered.topic.name=filtered
|
||||||
|
partitioned.topic.name=partitioned
|
||||||
|
multi.type.topic.name=multitype
|
||||||
|
# monitoring - lag analysis
|
||||||
|
monitor.kafka.bootstrap.config=localhost:9092
|
||||||
|
monitor.kafka.consumer.groupid=baeldungGrp
|
||||||
|
monitor.topic.name=baeldung
|
||||||
|
# monitoring - simulation
|
||||||
|
monitor.producer.simulate=true
|
||||||
|
monitor.consumer.simulate=true
|
||||||
|
monitor.kafka.consumer.groupid.simulate=baeldungGrpSimulate
|
||||||
|
test.topic=testtopic1
|
||||||
|
kafka.backoff.interval=9000
|
||||||
|
kafka.backoff.max_failure=5
|
||||||
|
|
||||||
|
|
13
spring-kafka-2/src/main/resources/logback.xml
Normal file
13
spring-kafka-2/src/main/resources/logback.xml
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<configuration>
|
||||||
|
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||||
|
<encoder>
|
||||||
|
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
|
||||||
|
</pattern>
|
||||||
|
</encoder>
|
||||||
|
</appender>
|
||||||
|
|
||||||
|
<root level="INFO">
|
||||||
|
<appender-ref ref="STDOUT" />
|
||||||
|
</root>
|
||||||
|
</configuration>
|
14
spring-kafka-2/src/test/resources/application.yml
Normal file
14
spring-kafka-2/src/test/resources/application.yml
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
spring:
|
||||||
|
kafka:
|
||||||
|
consumer:
|
||||||
|
auto-offset-reset: earliest
|
||||||
|
group-id: baeldung
|
||||||
|
test:
|
||||||
|
topic: embedded-test-topic
|
||||||
|
|
||||||
|
monitor:
|
||||||
|
kafka:
|
||||||
|
bootstrap:
|
||||||
|
config: "PLAINTEXT://localhost:9085"
|
||||||
|
consumer:
|
||||||
|
groupid: "baeldungGrp"
|
19
spring-kafka-2/src/test/resources/logback-test.xml
Normal file
19
spring-kafka-2/src/test/resources/logback-test.xml
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<configuration>
|
||||||
|
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||||
|
<encoder>
|
||||||
|
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
|
||||||
|
</pattern>
|
||||||
|
</encoder>
|
||||||
|
</appender>
|
||||||
|
|
||||||
|
<root level="INFO">
|
||||||
|
<appender-ref ref="STDOUT" />
|
||||||
|
</root>
|
||||||
|
|
||||||
|
<!-- Reduce the noise as the consumer keeps trying to connect until the Kafka instance is available -->
|
||||||
|
<springProfile name="ssl">
|
||||||
|
<logger name="org.apache.kafka.clients.NetworkClient" level="ERROR" additivity="false"/>
|
||||||
|
</springProfile>
|
||||||
|
|
||||||
|
</configuration>
|
@ -1,6 +1,5 @@
|
|||||||
package com.baeldung.spring.kafka;
|
package com.baeldung.spring.kafka;
|
||||||
|
|
||||||
import java.net.SocketTimeoutException;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
@ -9,20 +8,15 @@ import org.apache.kafka.common.serialization.StringDeserializer;
|
|||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.context.annotation.Primary;
|
|
||||||
import org.springframework.kafka.annotation.EnableKafka;
|
import org.springframework.kafka.annotation.EnableKafka;
|
||||||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||||
import org.springframework.kafka.core.ConsumerFactory;
|
import org.springframework.kafka.core.ConsumerFactory;
|
||||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||||
import org.springframework.kafka.listener.ContainerProperties;
|
|
||||||
import org.springframework.kafka.listener.DefaultErrorHandler;
|
|
||||||
import org.springframework.kafka.support.converter.RecordMessageConverter;
|
import org.springframework.kafka.support.converter.RecordMessageConverter;
|
||||||
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
|
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
|
||||||
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
|
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
|
||||||
import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper;
|
import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper;
|
||||||
import org.springframework.kafka.support.serializer.JsonDeserializer;
|
import org.springframework.kafka.support.serializer.JsonDeserializer;
|
||||||
import org.springframework.util.backoff.BackOff;
|
|
||||||
import org.springframework.util.backoff.FixedBackOff;
|
|
||||||
|
|
||||||
@EnableKafka
|
@EnableKafka
|
||||||
@Configuration
|
@Configuration
|
||||||
@ -31,12 +25,6 @@ public class KafkaConsumerConfig {
|
|||||||
@Value(value = "${spring.kafka.bootstrap-servers}")
|
@Value(value = "${spring.kafka.bootstrap-servers}")
|
||||||
private String bootstrapAddress;
|
private String bootstrapAddress;
|
||||||
|
|
||||||
@Value(value = "${kafka.backoff.interval}")
|
|
||||||
private Long interval;
|
|
||||||
|
|
||||||
@Value(value = "${kafka.backoff.max_failure}")
|
|
||||||
private Long maxAttempts;
|
|
||||||
|
|
||||||
public ConsumerFactory<String, String> consumerFactory(String groupId) {
|
public ConsumerFactory<String, String> consumerFactory(String groupId) {
|
||||||
Map<String, Object> props = new HashMap<>();
|
Map<String, Object> props = new HashMap<>();
|
||||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||||
@ -83,7 +71,7 @@ public class KafkaConsumerConfig {
|
|||||||
public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() {
|
public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() {
|
||||||
ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory("filter");
|
ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory("filter");
|
||||||
factory.setRecordFilterStrategy(record -> record.value()
|
factory.setRecordFilterStrategy(record -> record.value()
|
||||||
.contains("World"));
|
.contains("World"));
|
||||||
return factory;
|
return factory;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -95,7 +83,7 @@ public class KafkaConsumerConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public ConcurrentKafkaListenerContainerFactory<String, Greeting> kafkaListenerContainerFactory() {
|
public ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingKafkaListenerContainerFactory() {
|
||||||
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
||||||
factory.setConsumerFactory(greetingConsumerFactory());
|
factory.setConsumerFactory(greetingConsumerFactory());
|
||||||
return factory;
|
return factory;
|
||||||
@ -121,32 +109,15 @@ public class KafkaConsumerConfig {
|
|||||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
|
||||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id_test");
|
|
||||||
return new DefaultKafkaConsumerFactory<>(props);
|
return new DefaultKafkaConsumerFactory<>(props);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
@Primary
|
public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
|
||||||
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
|
|
||||||
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
||||||
factory.setConsumerFactory(multiTypeConsumerFactory());
|
factory.setConsumerFactory(multiTypeConsumerFactory());
|
||||||
factory.setMessageConverter(multiTypeConverter());
|
factory.setMessageConverter(multiTypeConverter());
|
||||||
factory.setCommonErrorHandler(errorHandler());
|
|
||||||
factory.getContainerProperties()
|
|
||||||
.setAckMode(ContainerProperties.AckMode.RECORD);
|
|
||||||
return factory;
|
return factory;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
|
||||||
public DefaultErrorHandler errorHandler() {
|
|
||||||
BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
|
|
||||||
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
|
|
||||||
System.out.println(String.format("consumed record %s because this exception was thrown",consumerRecord.toString(),e.getClass().getName()));
|
|
||||||
}, fixedBackOff);
|
|
||||||
//Commented because of the test
|
|
||||||
//errorHandler.addRetryableExceptions(SocketTimeoutException.class,RuntimeException.class);
|
|
||||||
errorHandler.addNotRetryableExceptions(NullPointerException.class);
|
|
||||||
return errorHandler;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,6 @@ package com.baeldung.spring.kafka;
|
|||||||
|
|
||||||
import org.springframework.kafka.annotation.KafkaHandler;
|
import org.springframework.kafka.annotation.KafkaHandler;
|
||||||
import org.springframework.kafka.annotation.KafkaListener;
|
import org.springframework.kafka.annotation.KafkaListener;
|
||||||
import org.springframework.messaging.MessagingException;
|
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
@ -10,12 +9,7 @@ import org.springframework.stereotype.Component;
|
|||||||
public class MultiTypeKafkaListener {
|
public class MultiTypeKafkaListener {
|
||||||
|
|
||||||
@KafkaHandler
|
@KafkaHandler
|
||||||
//@RetryableTopic(backoff = @Backoff(value = 3000L), attempts = "5", autoCreateTopics = "false",include = SocketTimeoutException.class, exclude = NullPointerException.class)
|
|
||||||
public void handleGreeting(Greeting greeting) {
|
public void handleGreeting(Greeting greeting) {
|
||||||
if (greeting.getName()
|
|
||||||
.equalsIgnoreCase("test")) {
|
|
||||||
throw new MessagingException("test not allowed");
|
|
||||||
}
|
|
||||||
System.out.println("Greeting received: " + greeting);
|
System.out.println("Greeting received: " + greeting);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -14,7 +14,4 @@ monitor.producer.simulate=true
|
|||||||
monitor.consumer.simulate=true
|
monitor.consumer.simulate=true
|
||||||
monitor.kafka.consumer.groupid.simulate=baeldungGrpSimulate
|
monitor.kafka.consumer.groupid.simulate=baeldungGrpSimulate
|
||||||
test.topic=testtopic1
|
test.topic=testtopic1
|
||||||
kafka.backoff.interval=9000
|
|
||||||
kafka.backoff.max_failure=5
|
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user