Merge pull request #15667 from etrandafir93/features/BAEL-7202-handle_kafka_deser_error
BAEL-7202: handling kafka deser errors
This commit is contained in:
commit
e6e071d983
|
@ -15,6 +15,10 @@
|
||||||
<name>spring-kafka-3</name>
|
<name>spring-kafka-3</name>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.kafka</groupId>
|
<groupId>org.springframework.kafka</groupId>
|
||||||
<artifactId>spring-kafka</artifactId>
|
<artifactId>spring-kafka</artifactId>
|
||||||
|
@ -28,9 +32,30 @@
|
||||||
<artifactId>spring-kafka-test</artifactId>
|
<artifactId>spring-kafka-test</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.testcontainers</groupId>
|
||||||
|
<artifactId>junit-jupiter</artifactId>
|
||||||
|
<version>${testcontainers.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.testcontainers</groupId>
|
||||||
|
<artifactId>kafka</artifactId>
|
||||||
|
<version>${testcontainers.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.awaitility</groupId>
|
||||||
|
<artifactId>awaitility</artifactId>
|
||||||
|
<version>${awaitility.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
|
<java.version>17</java.version>
|
||||||
<kafka-version>3.0.12</kafka-version>
|
<kafka-version>3.0.12</kafka-version>
|
||||||
|
<testcontainers.version>1.19.3</testcontainers.version>
|
||||||
|
<awaitility.version>4.2.0</awaitility.version>
|
||||||
</properties>
|
</properties>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -0,0 +1,13 @@
|
||||||
|
package com.baeldung.spring.kafka.deserialization.exception;
|
||||||
|
|
||||||
|
import org.springframework.boot.SpringApplication;
|
||||||
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
|
||||||
|
@SpringBootApplication
|
||||||
|
public class Application {
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
SpringApplication.run(Application.class, args);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,4 @@
|
||||||
|
package com.baeldung.spring.kafka.deserialization.exception;
|
||||||
|
|
||||||
|
public record ArticlePublishedEvent(String article) {
|
||||||
|
}
|
|
@ -0,0 +1,22 @@
|
||||||
|
package com.baeldung.spring.kafka.deserialization.exception;
|
||||||
|
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
|
import org.springframework.kafka.annotation.KafkaListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class ArticlesPublishedListener {
|
||||||
|
private static final Logger log = Logger.getLogger(ArticlesPublishedListener.class.getName());
|
||||||
|
private final EmailService emailService;
|
||||||
|
|
||||||
|
public ArticlesPublishedListener(EmailService emailService) {
|
||||||
|
this.emailService = emailService;
|
||||||
|
}
|
||||||
|
|
||||||
|
@KafkaListener(topics = "baeldung.articles.published")
|
||||||
|
public void onArticlePublished(ArticlePublishedEvent event) {
|
||||||
|
log.info("Received event published event: " + event);
|
||||||
|
emailService.sendNewsletter(event.article());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,22 @@
|
||||||
|
package com.baeldung.spring.kafka.deserialization.exception;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class EmailService {
|
||||||
|
private final static Logger log = Logger.getLogger(EmailService.class.getName());
|
||||||
|
private final List<String> articles = new ArrayList<>();
|
||||||
|
|
||||||
|
public void sendNewsletter(String article) {
|
||||||
|
log.info("Sending newsletter for article: " + article);
|
||||||
|
articles.add(article);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getArticles() {
|
||||||
|
return articles;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,55 @@
|
||||||
|
package com.baeldung.spring.kafka.deserialization.exception;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.kafka.annotation.EnableKafka;
|
||||||
|
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||||
|
import org.springframework.kafka.core.ConsumerFactory;
|
||||||
|
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||||
|
import org.springframework.kafka.listener.CommonErrorHandler;
|
||||||
|
import org.springframework.kafka.support.serializer.JsonDeserializer;
|
||||||
|
|
||||||
|
@EnableKafka
|
||||||
|
@Configuration
|
||||||
|
class KafkaConfig {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
ConsumerFactory<String, ArticlePublishedEvent> consumerFactory(
|
||||||
|
@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers
|
||||||
|
) {
|
||||||
|
Map<String, Object> props = new HashMap<>();
|
||||||
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||||
|
props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung-app-1");
|
||||||
|
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||||
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
|
||||||
|
return new DefaultKafkaConsumerFactory<>(
|
||||||
|
props,
|
||||||
|
new StringDeserializer(),
|
||||||
|
new JsonDeserializer<>(ArticlePublishedEvent.class)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
ConcurrentKafkaListenerContainerFactory<String, ArticlePublishedEvent> kafkaListenerContainerFactory(
|
||||||
|
ConsumerFactory<String, ArticlePublishedEvent> consumerFactory,
|
||||||
|
CommonErrorHandler commonErrorHandler
|
||||||
|
) {
|
||||||
|
var factory = new ConcurrentKafkaListenerContainerFactory<String, ArticlePublishedEvent>();
|
||||||
|
factory.setConsumerFactory(consumerFactory);
|
||||||
|
factory.setCommonErrorHandler(commonErrorHandler);
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
CommonErrorHandler kafkaErrorHandler() {
|
||||||
|
return new KafkaErrorHandler();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
package com.baeldung.spring.kafka.deserialization.exception;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.consumer.Consumer;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
|
import org.apache.kafka.common.errors.RecordDeserializationException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.kafka.listener.CommonErrorHandler;
|
||||||
|
import org.springframework.kafka.listener.MessageListenerContainer;
|
||||||
|
|
||||||
|
class KafkaErrorHandler implements CommonErrorHandler {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(KafkaErrorHandler.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleRecord(Exception exception, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, MessageListenerContainer container) {
|
||||||
|
handle(exception, consumer);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleOtherException(Exception exception, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {
|
||||||
|
handle(exception, consumer);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handle(Exception exception, Consumer<?, ?> consumer) {
|
||||||
|
log.error("Exception thrown", exception);
|
||||||
|
if (exception instanceof RecordDeserializationException ex) {
|
||||||
|
consumer.seek(ex.topicPartition(), ex.offset() + 1L);
|
||||||
|
consumer.commitSync();
|
||||||
|
} else {
|
||||||
|
log.error("Exception not handled", exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,37 @@
|
||||||
|
package com.baeldung.spring.kafka.trusted.packages;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
|
||||||
|
public class SomeData {
|
||||||
|
|
||||||
|
private String id;
|
||||||
|
private String type;
|
||||||
|
private String status;
|
||||||
|
private Instant timestamp;
|
||||||
|
|
||||||
|
public SomeData() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public SomeData(String id, String type, String status, Instant timestamp) {
|
||||||
|
this.id = id;
|
||||||
|
this.type = type;
|
||||||
|
this.status = status;
|
||||||
|
this.timestamp = timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getId() {
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getType() {
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getStatus() {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Instant getTimestamp() {
|
||||||
|
return timestamp;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,96 @@
|
||||||
|
package com.baeldung.spring.kafka.deserialization.exception;
|
||||||
|
|
||||||
|
import static java.time.Duration.ofMillis;
|
||||||
|
import static java.time.Duration.ofSeconds;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.awaitility.Awaitility.await;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
|
import org.awaitility.Awaitility;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
|
import org.springframework.test.context.DynamicPropertyRegistry;
|
||||||
|
import org.springframework.test.context.DynamicPropertySource;
|
||||||
|
import org.testcontainers.containers.KafkaContainer;
|
||||||
|
import org.testcontainers.junit.jupiter.Container;
|
||||||
|
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||||
|
import org.testcontainers.utility.DockerImageName;
|
||||||
|
|
||||||
|
@Testcontainers
|
||||||
|
@SpringBootTest(classes = Application.class)
|
||||||
|
class DeserializationExceptionLiveTest {
|
||||||
|
|
||||||
|
@Container
|
||||||
|
private static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
|
||||||
|
|
||||||
|
private static KafkaProducer<String, String> testKafkaProducer;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private EmailService emailService;
|
||||||
|
|
||||||
|
@DynamicPropertySource
|
||||||
|
static void setProps(DynamicPropertyRegistry registry) {
|
||||||
|
registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
static void beforeAll() {
|
||||||
|
Properties props = new Properties();
|
||||||
|
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
|
||||||
|
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||||
|
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||||
|
testKafkaProducer = new KafkaProducer<>(props);
|
||||||
|
|
||||||
|
Awaitility.setDefaultTimeout(ofSeconds(5));
|
||||||
|
Awaitility.setDefaultPollInterval(ofMillis(50));
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void beforeEach() {
|
||||||
|
emailService.getArticles().clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void whenPublishingInvalidArticleEvent_thenHandleExceptionAndContinueProcessing() {
|
||||||
|
publishArticle("{ \"article\": \"Introduction to Kafka\" }");
|
||||||
|
publishArticle(" !! Invalid JSON !! ");
|
||||||
|
publishArticle("{ \"article\": \"Kafka Streams Tutorial\" }");
|
||||||
|
|
||||||
|
await().untilAsserted(() -> assertThat(emailService.getArticles())
|
||||||
|
.containsExactlyInAnyOrder(
|
||||||
|
"Introduction to Kafka",
|
||||||
|
"Kafka Streams Tutorial"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void whenPublishingValidArticleEvent_thenProcessWithoutErrors() {
|
||||||
|
publishArticle("{ \"article\": \"Kotlin for Java Developers\" }");
|
||||||
|
publishArticle("{ \"article\": \"The S.O.L.I.D. Principles\" }");
|
||||||
|
|
||||||
|
await().untilAsserted(() -> assertThat(emailService.getArticles())
|
||||||
|
.containsExactlyInAnyOrder(
|
||||||
|
"Kotlin for Java Developers",
|
||||||
|
"The S.O.L.I.D. Principles"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void publishArticle(String jsonBody) {
|
||||||
|
ProducerRecord<String, String> record = new ProducerRecord<>("baeldung.articles.published", jsonBody);
|
||||||
|
try {
|
||||||
|
testKafkaProducer.send(record).get();
|
||||||
|
} catch (ExecutionException | InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,42 @@
|
||||||
|
package com.baeldung.spring.kafka.trusted.packages;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||||
|
import org.springframework.kafka.core.ConsumerFactory;
|
||||||
|
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||||
|
import org.springframework.kafka.support.serializer.JsonDeserializer;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
public class ListenerConfiguration {
|
||||||
|
|
||||||
|
@Bean("messageListenerContainer")
|
||||||
|
public ConcurrentKafkaListenerContainerFactory<String, SomeData> messageListenerContainer() {
|
||||||
|
ConcurrentKafkaListenerContainerFactory<String, SomeData> container = new ConcurrentKafkaListenerContainerFactory<>();
|
||||||
|
container.setConsumerFactory(someDataConsumerFactory());
|
||||||
|
return container;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConsumerFactory<String, SomeData> someDataConsumerFactory() {
|
||||||
|
JsonDeserializer<SomeData> payloadJsonDeserializer = new JsonDeserializer<>();
|
||||||
|
payloadJsonDeserializer.trustedPackages("com.baeldung.spring.kafka");
|
||||||
|
return new DefaultKafkaConsumerFactory<>(
|
||||||
|
consumerConfigs(),
|
||||||
|
new StringDeserializer(),
|
||||||
|
payloadJsonDeserializer
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Map<String, Object> consumerConfigs() {
|
||||||
|
return Map.of(
|
||||||
|
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "PLAINTEXT://localhost:9092",
|
||||||
|
ConsumerConfig.GROUP_ID_CONFIG, "some-group-id"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,39 @@
|
||||||
|
package com.baeldung.spring.kafka.trusted.packages;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||||
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
|
import org.springframework.kafka.core.ProducerFactory;
|
||||||
|
import org.springframework.kafka.support.serializer.JsonSerializer;
|
||||||
|
import org.springframework.kafka.support.serializer.StringOrBytesSerializer;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
public class ProducerConfiguration {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public KafkaTemplate<Object, SomeData> kafkaTemplate() {
|
||||||
|
return new KafkaTemplate<>(producerFactory());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ProducerFactory<Object, SomeData> producerFactory() {
|
||||||
|
JsonSerializer<SomeData> jsonSerializer = new JsonSerializer<>();
|
||||||
|
jsonSerializer.setAddTypeInfo(true);
|
||||||
|
return new DefaultKafkaProducerFactory<>(
|
||||||
|
producerFactoryConfig(),
|
||||||
|
new StringOrBytesSerializer(),
|
||||||
|
jsonSerializer
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Map<String, Object> producerFactoryConfig() {
|
||||||
|
return Map.of(
|
||||||
|
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "PLAINTEXT://localhost:9092"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,59 @@
|
||||||
|
package com.baeldung.spring.kafka.trusted.packages;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.Disabled;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
|
import org.springframework.boot.test.mock.mockito.SpyBean;
|
||||||
|
import org.springframework.kafka.annotation.KafkaListener;
|
||||||
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test requires a running instance of kafka to be present
|
||||||
|
*/
|
||||||
|
@SpringBootTest
|
||||||
|
@Disabled("This test requires a running instance of kafka to be present - manually run it")
|
||||||
|
public class TrustedPackagesLiveTest {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private KafkaTemplate<Object, SomeData> kafkaTemplate;
|
||||||
|
|
||||||
|
@SpyBean
|
||||||
|
TestConsumer testConsumer;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenMessageInTheTopic_whenTypeInfoPackageIsTrusted_thenMessageIsSuccessfullyConsumed() throws InterruptedException {
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
Mockito.doAnswer(invocationOnMock -> {
|
||||||
|
try {
|
||||||
|
latch.countDown();
|
||||||
|
return invocationOnMock.callRealMethod();
|
||||||
|
} catch (Exception e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).when(testConsumer).onMessage(Mockito.any());
|
||||||
|
|
||||||
|
SomeData someData = new SomeData("1", "active", "sent", Instant.now());
|
||||||
|
kafkaTemplate.send(new ProducerRecord<>("sourceTopic", null, someData));
|
||||||
|
|
||||||
|
Assertions.assertTrue(latch.await(10, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Component
|
||||||
|
static class TestConsumer {
|
||||||
|
|
||||||
|
@KafkaListener(topics = "sourceTopic", containerFactory = "messageListenerContainer")
|
||||||
|
public void onMessage(SomeData someData) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,2 @@
|
||||||
|
## Relevant Articles
|
||||||
|
- [Spring Kafka Trusted Packages Feature](https://www.baeldung.com/spring-kafka-trusted-packages-feature)
|
|
@ -0,0 +1,36 @@
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<parent>
|
||||||
|
<groupId>com.baeldung</groupId>
|
||||||
|
<artifactId>parent-boot-2</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
<relativePath>../parent-boot-2</relativePath>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<artifactId>spring-kafka-3</artifactId>
|
||||||
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
|
<name>spring-kafka-3</name>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.kafka</groupId>
|
||||||
|
<artifactId>spring-kafka</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
<artifactId>jackson-databind</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.kafka</groupId>
|
||||||
|
<artifactId>spring-kafka-test</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<kafka-version>3.0.12</kafka-version>
|
||||||
|
</properties>
|
||||||
|
</project>
|
Loading…
Reference in New Issue