[JAVA-29500] Moved article "Implementing Retry in Kafka Consumer" to spring-kafka (#15904)
This commit is contained in:
parent
a0ebe67b38
commit
d6db20dd02
|
@ -4,7 +4,6 @@ This module contains articles about Spring with Kafka
|
||||||
|
|
||||||
### Relevant articles
|
### Relevant articles
|
||||||
|
|
||||||
- [Implementing Retry in Kafka Consumer](https://www.baeldung.com/spring-retry-kafka-consumer)
|
|
||||||
- [Spring Kafka: Configure Multiple Listeners on Same Topic](https://www.baeldung.com/spring-kafka-multiple-listeners-same-topic)
|
- [Spring Kafka: Configure Multiple Listeners on Same Topic](https://www.baeldung.com/spring-kafka-multiple-listeners-same-topic)
|
||||||
- [Understanding Kafka Topics and Partitions](https://www.baeldung.com/kafka-topics-partitions)
|
- [Understanding Kafka Topics and Partitions](https://www.baeldung.com/kafka-topics-partitions)
|
||||||
- [How to Subscribe a Kafka Consumer to Multiple Topics](https://www.baeldung.com/kafka-subscribe-consumer-multiple-topics)
|
- [How to Subscribe a Kafka Consumer to Multiple Topics](https://www.baeldung.com/kafka-subscribe-consumer-multiple-topics)
|
||||||
|
|
|
@ -1,2 +0,0 @@
|
||||||
## Relevant Articles
|
|
||||||
- [Spring Kafka Trusted Packages Feature](https://www.baeldung.com/spring-kafka-trusted-packages-feature)
|
|
|
@ -1,34 +0,0 @@
|
||||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
||||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
|
||||||
<modelVersion>4.0.0</modelVersion>
|
|
||||||
<artifactId>spring-kafka-3_</artifactId>
|
|
||||||
<packaging>jar</packaging>
|
|
||||||
<name>spring-kafka-3_</name>
|
|
||||||
|
|
||||||
<parent>
|
|
||||||
<groupId>com.baeldung</groupId>
|
|
||||||
<artifactId>parent-boot-2</artifactId>
|
|
||||||
<version>0.0.1-SNAPSHOT</version>
|
|
||||||
<relativePath>../parent-boot-2</relativePath>
|
|
||||||
</parent>
|
|
||||||
|
|
||||||
<dependencies>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.springframework.kafka</groupId>
|
|
||||||
<artifactId>spring-kafka</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.fasterxml.jackson.core</groupId>
|
|
||||||
<artifactId>jackson-databind</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.springframework.kafka</groupId>
|
|
||||||
<artifactId>spring-kafka-test</artifactId>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
|
||||||
|
|
||||||
<properties>
|
|
||||||
<kafka-version>3.0.12</kafka-version>
|
|
||||||
</properties>
|
|
||||||
</project>
|
|
|
@ -1,37 +0,0 @@
|
||||||
package com.baeldung.spring.kafka;
|
|
||||||
|
|
||||||
import java.time.Instant;
|
|
||||||
|
|
||||||
public class SomeData {
|
|
||||||
|
|
||||||
private String id;
|
|
||||||
private String type;
|
|
||||||
private String status;
|
|
||||||
private Instant timestamp;
|
|
||||||
|
|
||||||
public SomeData() {
|
|
||||||
}
|
|
||||||
|
|
||||||
public SomeData(String id, String type, String status, Instant timestamp) {
|
|
||||||
this.id = id;
|
|
||||||
this.type = type;
|
|
||||||
this.status = status;
|
|
||||||
this.timestamp = timestamp;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getId() {
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getType() {
|
|
||||||
return type;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getStatus() {
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Instant getTimestamp() {
|
|
||||||
return timestamp;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,42 +0,0 @@
|
||||||
package com.baeldung.spring.kafka;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
||||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
|
||||||
import org.springframework.context.annotation.Configuration;
|
|
||||||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
|
||||||
import org.springframework.kafka.core.ConsumerFactory;
|
|
||||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
|
||||||
import org.springframework.kafka.support.serializer.JsonDeserializer;
|
|
||||||
|
|
||||||
@Configuration
|
|
||||||
public class ListenerConfiguration {
|
|
||||||
|
|
||||||
@Bean("messageListenerContainer")
|
|
||||||
public ConcurrentKafkaListenerContainerFactory<String, SomeData> messageListenerContainer() {
|
|
||||||
ConcurrentKafkaListenerContainerFactory<String, SomeData> container = new ConcurrentKafkaListenerContainerFactory<>();
|
|
||||||
container.setConsumerFactory(someDataConsumerFactory());
|
|
||||||
return container;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public ConsumerFactory<String, SomeData> someDataConsumerFactory() {
|
|
||||||
JsonDeserializer<SomeData> payloadJsonDeserializer = new JsonDeserializer<>();
|
|
||||||
payloadJsonDeserializer.trustedPackages("com.baeldung.spring.kafka");
|
|
||||||
return new DefaultKafkaConsumerFactory<>(
|
|
||||||
consumerConfigs(),
|
|
||||||
new StringDeserializer(),
|
|
||||||
payloadJsonDeserializer
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public Map<String, Object> consumerConfigs() {
|
|
||||||
return Map.of(
|
|
||||||
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "PLAINTEXT://localhost:9092",
|
|
||||||
ConsumerConfig.GROUP_ID_CONFIG, "some-group-id"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,40 +0,0 @@
|
||||||
package com.baeldung.spring.kafka;
|
|
||||||
|
|
||||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
|
||||||
import org.springframework.context.annotation.Configuration;
|
|
||||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
|
||||||
import org.springframework.kafka.core.KafkaTemplate;
|
|
||||||
import org.springframework.kafka.core.ProducerFactory;
|
|
||||||
import org.springframework.kafka.support.serializer.JsonSerializer;
|
|
||||||
import org.springframework.kafka.support.serializer.StringOrBytesSerializer;
|
|
||||||
|
|
||||||
import java.time.Instant;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
@Configuration
|
|
||||||
public class ProducerConfiguration {
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public KafkaTemplate<Object, SomeData> kafkaTemplate() {
|
|
||||||
return new KafkaTemplate<>(producerFactory());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public ProducerFactory<Object, SomeData> producerFactory() {
|
|
||||||
JsonSerializer<SomeData> jsonSerializer = new JsonSerializer<>();
|
|
||||||
jsonSerializer.setAddTypeInfo(true);
|
|
||||||
return new DefaultKafkaProducerFactory<>(
|
|
||||||
producerFactoryConfig(),
|
|
||||||
new StringOrBytesSerializer(),
|
|
||||||
jsonSerializer
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public Map<String, Object> producerFactoryConfig() {
|
|
||||||
return Map.of(
|
|
||||||
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "PLAINTEXT://localhost:9092"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,57 +0,0 @@
|
||||||
package com.baeldung.spring.kafka;
|
|
||||||
|
|
||||||
import java.time.Instant;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
|
||||||
import org.junit.jupiter.api.Assertions;
|
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
import org.mockito.Mockito;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.boot.test.context.SpringBootTest;
|
|
||||||
import org.springframework.boot.test.mock.mockito.SpyBean;
|
|
||||||
import org.springframework.kafka.annotation.KafkaListener;
|
|
||||||
import org.springframework.kafka.core.KafkaTemplate;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This test requires a running instance of kafka to be present
|
|
||||||
*/
|
|
||||||
@SpringBootTest
|
|
||||||
public class TrustedPackagesLiveTest {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private KafkaTemplate<Object, SomeData> kafkaTemplate;
|
|
||||||
|
|
||||||
@SpyBean
|
|
||||||
TestConsumer testConsumer;
|
|
||||||
|
|
||||||
@Test
|
|
||||||
void givenMessageInTheTopic_whenTypeInfoPackageIsTrusted_thenMessageIsSuccessfullyConsumed() throws InterruptedException {
|
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
|
||||||
|
|
||||||
Mockito.doAnswer(invocationOnMock -> {
|
|
||||||
try {
|
|
||||||
latch.countDown();
|
|
||||||
return invocationOnMock.callRealMethod();
|
|
||||||
} catch (Exception e) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}).when(testConsumer).onMessage(Mockito.any());
|
|
||||||
|
|
||||||
SomeData someData = new SomeData("1", "active", "sent", Instant.now());
|
|
||||||
kafkaTemplate.send(new ProducerRecord<>("sourceTopic", null, someData));
|
|
||||||
|
|
||||||
Assertions.assertTrue(latch.await(20L, TimeUnit.SECONDS));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Component
|
|
||||||
static class TestConsumer {
|
|
||||||
|
|
||||||
@KafkaListener(topics = "sourceTopic", containerFactory = "messageListenerContainer")
|
|
||||||
public void onMessage(SomeData someData) {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -12,7 +12,7 @@ This module contains articles about Spring with Kafka
|
||||||
- [Kafka Streams With Spring Boot](https://www.baeldung.com/spring-boot-kafka-streams)
|
- [Kafka Streams With Spring Boot](https://www.baeldung.com/spring-boot-kafka-streams)
|
||||||
- [Get the Number of Messages in an Apache Kafka Topic](https://www.baeldung.com/java-kafka-count-topic-messages)
|
- [Get the Number of Messages in an Apache Kafka Topic](https://www.baeldung.com/java-kafka-count-topic-messages)
|
||||||
- [Sending Data to a Specific Partition in Kafka](https://www.baeldung.com/kafka-send-data-partition)
|
- [Sending Data to a Specific Partition in Kafka](https://www.baeldung.com/kafka-send-data-partition)
|
||||||
|
- [Implementing Retry in Kafka Consumer](https://www.baeldung.com/spring-retry-kafka-consumer)
|
||||||
### Intro
|
### Intro
|
||||||
|
|
||||||
This is a simple Spring Boot app to demonstrate sending and receiving of messages in Kafka using spring-kafka.
|
This is a simple Spring Boot app to demonstrate sending and receiving of messages in Kafka using spring-kafka.
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package com.baeldung.spring.kafka.retryable;
|
package com.baeldung.kafka.retryable;
|
||||||
|
|
||||||
public class Farewell {
|
public class Farewell {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package com.baeldung.spring.kafka.retryable;
|
package com.baeldung.kafka.retryable;
|
||||||
|
|
||||||
public class Greeting {
|
public class Greeting {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package com.baeldung.spring.kafka.retryable;
|
package com.baeldung.kafka.retryable;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -129,7 +129,6 @@ public class KafkaConsumerConfig {
|
||||||
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
|
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
|
||||||
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
||||||
factory.setConsumerFactory(multiTypeConsumerFactory());
|
factory.setConsumerFactory(multiTypeConsumerFactory());
|
||||||
factory.setMessageConverter(multiTypeConverter());
|
|
||||||
factory.setCommonErrorHandler(errorHandler());
|
factory.setCommonErrorHandler(errorHandler());
|
||||||
factory.getContainerProperties()
|
factory.getContainerProperties()
|
||||||
.setAckMode(ContainerProperties.AckMode.RECORD);
|
.setAckMode(ContainerProperties.AckMode.RECORD);
|
|
@ -1,4 +1,4 @@
|
||||||
package com.baeldung.spring.kafka.retryable;
|
package com.baeldung.kafka.retryable;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
|
@ -1,4 +1,4 @@
|
||||||
package com.baeldung.spring.kafka.retryable;
|
package com.baeldung.kafka.retryable;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
|
@ -1,4 +1,4 @@
|
||||||
package com.baeldung.spring.kafka.retryable;
|
package com.baeldung.kafka.retryable;
|
||||||
|
|
||||||
import org.springframework.kafka.annotation.KafkaHandler;
|
import org.springframework.kafka.annotation.KafkaHandler;
|
||||||
import org.springframework.kafka.annotation.KafkaListener;
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
@ -1,4 +1,4 @@
|
||||||
package com.baeldung.spring.kafka.retryable;
|
package com.baeldung.kafka.retryable;
|
||||||
|
|
||||||
import org.springframework.boot.SpringApplication;
|
import org.springframework.boot.SpringApplication;
|
||||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
|
@ -1,12 +1,11 @@
|
||||||
package com.baeldung.spring.kafka.retryable;
|
package com.baeldung.kafka.retryable;
|
||||||
|
|
||||||
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
|
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.junit.Before;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.ClassRule;
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.test.context.SpringBootTest;
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
|
@ -14,20 +13,15 @@ import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
|
||||||
import org.springframework.kafka.core.KafkaTemplate;
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
|
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
|
||||||
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
||||||
import org.springframework.kafka.test.EmbeddedKafkaBroker;
|
|
||||||
import org.springframework.kafka.test.context.EmbeddedKafka;
|
import org.springframework.kafka.test.context.EmbeddedKafka;
|
||||||
|
|
||||||
import com.baeldung.spring.kafka.retryable.Greeting;
|
|
||||||
import com.baeldung.spring.kafka.retryable.RetryableApplicationKafkaApp;
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import org.springframework.test.context.ActiveProfiles;
|
import org.springframework.test.context.ActiveProfiles;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
@SpringBootTest(classes = RetryableApplicationKafkaApp.class)
|
@SpringBootTest(classes = RetryableApplicationKafkaApp.class)
|
||||||
@EmbeddedKafka(partitions = 1, controlledShutdown = true, brokerProperties = { "listeners=PLAINTEXT://localhost:9093", "port=9093" })
|
@EmbeddedKafka(partitions = 1, controlledShutdown = true, brokerProperties = { "listeners=PLAINTEXT://localhost:9093", "port=9093" })
|
||||||
@ActiveProfiles("retry")
|
@ActiveProfiles("retry")
|
||||||
public class KafkaRetryableIntegrationTest {
|
public class KafkaRetryableIntegrationTest {
|
||||||
@ClassRule
|
|
||||||
public static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, true, "multitype");
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private KafkaListenerEndpointRegistry registry;
|
private KafkaListenerEndpointRegistry registry;
|
||||||
|
@ -41,9 +35,9 @@ public class KafkaRetryableIntegrationTest {
|
||||||
|
|
||||||
private static final String TOPIC = "topic";
|
private static final String TOPIC = "topic";
|
||||||
|
|
||||||
@Before
|
@BeforeEach
|
||||||
public void setup() {
|
public void setup() {
|
||||||
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
|
System.setProperty("spring.kafka.bootstrap-servers", "localhost:9093");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
Loading…
Reference in New Issue