diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/groupId/KafkaConfig.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/groupId/KafkaConfig.java new file mode 100644 index 0000000000..6514816424 --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/groupId/KafkaConfig.java @@ -0,0 +1,47 @@ +package com.baeldung.spring.kafka.groupId; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.CommonErrorHandler; + +@EnableKafka +@Configuration +class KafkaConfig { + + @Bean + ConsumerFactory consumerFactory(@Value("${spring.kafka.bootstrap-servers:localhost:9092}") String bootstrapServers) { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "${kafka.consumer.groupId:test-consumer-group}"); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "rex"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new StringDeserializer()); + } + + @Bean + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory consumerFactory, + CommonErrorHandler commonErrorHandler) { + var factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory); + factory.setCommonErrorHandler(commonErrorHandler); + return factory; + } + + @Bean + CommonErrorHandler kafkaErrorHandler() { + return new KafkaErrorHandler(); + } + +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/groupId/KafkaErrorHandler.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/groupId/KafkaErrorHandler.java new file mode 100644 index 0000000000..c334925141 --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/groupId/KafkaErrorHandler.java @@ -0,0 +1,30 @@ +package com.baeldung.spring.kafka.groupId; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.listener.CommonErrorHandler; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.lang.NonNull; + +class KafkaErrorHandler implements CommonErrorHandler { + + private static final Logger log = LoggerFactory.getLogger(KafkaErrorHandler.class); + + @Override + public void handleOtherException(@NonNull Exception exception, @NonNull Consumer consumer, @NonNull MessageListenerContainer container, + boolean batchListener) { + handle(exception, consumer); + } + + private void handle(Exception exception, Consumer consumer) { + log.error("Exception thrown", exception); + if (exception instanceof RecordDeserializationException ex) { + consumer.seek(ex.topicPartition(), ex.offset() + 1L); + consumer.commitSync(); + } else { + log.error("Exception not handled", exception); + } + } +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/groupId/Main.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/groupId/Main.java new file mode 100644 index 0000000000..589b20c68f --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/groupId/Main.java @@ -0,0 +1,14 @@ +package com.baeldung.spring.kafka.groupId; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.ComponentScan; + +@SpringBootApplication +@ComponentScan(basePackages = "com.baeldung.spring.kafka.groupId") +public class Main { + + public static void main(String[] args) { + SpringApplication.run(Main.class, args); + } +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/groupId/MyKafkaConsumer.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/groupId/MyKafkaConsumer.java new file mode 100644 index 0000000000..f26b605a7b --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/groupId/MyKafkaConsumer.java @@ -0,0 +1,41 @@ +package com.baeldung.spring.kafka.groupId; + +import java.util.concurrent.CountDownLatch; + +import org.apache.kafka.clients.consumer.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Service; + +@Service +public class MyKafkaConsumer { + + private static final Logger LOGGER = LoggerFactory.getLogger(MyKafkaConsumer.class); + + private CountDownLatch latch = new CountDownLatch(1); + + private String payload; + + @KafkaListener(topics = "${kafka.topic.name:test-topic}", clientIdPrefix = "neo", groupId = "${kafka.consumer.groupId:test-consumer-group}", concurrency = "4") + public void receive(@Payload String payload, Consumer consumer) { + LOGGER.info("Consumer='{}' received payload='{}'", consumer.groupMetadata() + .memberId(), payload); + this.payload = payload; + + latch.countDown(); + } + + public CountDownLatch getLatch() { + return latch; + } + + public void resetLatch() { + latch = new CountDownLatch(1); + } + + public String getPayload() { + return payload; + } +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/groupId/MyKafkaProducer.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/groupId/MyKafkaProducer.java new file mode 100644 index 0000000000..f8642a99a5 --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/groupId/MyKafkaProducer.java @@ -0,0 +1,24 @@ +package com.baeldung.spring.kafka.groupId; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@Service +public class MyKafkaProducer { + + private static final Logger LOGGER = LoggerFactory.getLogger(MyKafkaProducer.class); + + @Value("${kafka.topic.name:test-topic}") + private String topic; + @Autowired + private KafkaTemplate kafkaTemplate; + + public void send(String payload) { + LOGGER.info("Sending payload='{}' to topic='{}'", payload, topic); + kafkaTemplate.send(topic, payload); + } +} diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/groupId/MainLiveTest.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/groupId/MainLiveTest.java new file mode 100644 index 0000000000..2c48d25ca2 --- /dev/null +++ b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/groupId/MainLiveTest.java @@ -0,0 +1,44 @@ +package com.baeldung.spring.kafka.groupId; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ActiveProfiles; + +@SpringBootTest(classes = Main.class) +@ActiveProfiles("groupId") +@ComponentScan(basePackages = "com.baeldung.spring.kafka.groupId") +@DirtiesContext +@EmbeddedKafka(partitions = 4, topics = { "${kafka.topic.name:test-topic}" }, brokerProperties = { "listeners=PLAINTEXT://localhost:8000", "port=8000" }) +public class MainLiveTest { + + @Autowired + private MyKafkaConsumer consumer; + @Autowired + private MyKafkaProducer producer; + + @BeforeEach + void setup() { + consumer.resetLatch(); + } + + @Test + public void givenEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived() throws Exception { + String data = "Test 123..."; + producer.send(data); + boolean messageConsumed = consumer.getLatch() + .await(10, TimeUnit.SECONDS); + assertTrue(messageConsumed); + assertThat(consumer.getPayload(), containsString(data)); + } +} diff --git a/spring-kafka-3/src/test/resources/application-groupId.properties b/spring-kafka-3/src/test/resources/application-groupId.properties new file mode 100644 index 0000000000..648ae1b603 --- /dev/null +++ b/spring-kafka-3/src/test/resources/application-groupId.properties @@ -0,0 +1 @@ +spring.kafka.bootstrap-servers=localhost:8000