From e50e8e96a0931fca35ff41e447f540cfc5b5a7e5 Mon Sep 17 00:00:00 2001 From: balasr3 Date: Sat, 24 Feb 2024 16:52:26 +0000 Subject: [PATCH 1/3] BAEL-7272: Removed CodeCustomizer config which is not mandatory for Global serialization/deserialization config --- .../config/CodecCustomizerConfig.java | 27 ------------------- 1 file changed, 27 deletions(-) delete mode 100644 spring-reactive-modules/spring-reactive-3/src/main/java/com/baeldung/custom/deserialization/config/CodecCustomizerConfig.java diff --git a/spring-reactive-modules/spring-reactive-3/src/main/java/com/baeldung/custom/deserialization/config/CodecCustomizerConfig.java b/spring-reactive-modules/spring-reactive-3/src/main/java/com/baeldung/custom/deserialization/config/CodecCustomizerConfig.java deleted file mode 100644 index ef3eb1e97f..0000000000 --- a/spring-reactive-modules/spring-reactive-3/src/main/java/com/baeldung/custom/deserialization/config/CodecCustomizerConfig.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.baeldung.custom.deserialization.config; - -import org.springframework.boot.web.codec.CodecCustomizer; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.http.MediaType; -import org.springframework.http.codec.CodecConfigurer; -import org.springframework.http.codec.json.Jackson2JsonDecoder; -import org.springframework.http.codec.json.Jackson2JsonEncoder; -import org.springframework.util.MimeType; - -import com.fasterxml.jackson.databind.ObjectMapper; - -@Configuration -public class CodecCustomizerConfig { - - @Bean - public CodecCustomizer codecCustomizer(ObjectMapper customObjectMapper) { - return configurer -> { - MimeType mimeType = MimeType.valueOf(MediaType.APPLICATION_JSON_VALUE); - CodecConfigurer.CustomCodecs customCodecs = configurer.customCodecs(); - customCodecs.register(new Jackson2JsonDecoder(customObjectMapper, mimeType)); - customCodecs.register(new Jackson2JsonEncoder(customObjectMapper, mimeType)); - }; - } - -} From 98fcb6edefa15e3ffda8c5649f324b76386334bf Mon Sep 17 00:00:00 2001 From: balasr3 Date: Tue, 26 Mar 2024 23:43:48 +0000 Subject: [PATCH 2/3] BAEL-7593: added implementation and test for kafka delayed message consumption --- spring-kafka-3/pom.xml | 12 ++ .../delay/DelayedMessageListenerAdapter.java | 93 ++++++++++++++ .../kafka/delay/KafkaConsumerConfig.java | 64 ++++++++++ .../kafka/delay/KafkaDelayApplication.java | 13 ++ .../baeldung/spring/kafka/delay/Order.java | 27 ++++ .../spring/kafka/delay/OrderListener.java | 37 ++++++ .../spring/kafka/delay/OrderService.java | 31 +++++ .../delay/KafkaDelayIntegrationTest.java | 116 ++++++++++++++++++ 8 files changed, 393 insertions(+) create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/DelayedMessageListenerAdapter.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaConsumerConfig.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaDelayApplication.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/Order.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderListener.java create mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderService.java create mode 100644 spring-kafka-3/src/test/java/com/baeldung/spring/kafka/delay/KafkaDelayIntegrationTest.java diff --git a/spring-kafka-3/pom.xml b/spring-kafka-3/pom.xml index 894eab2576..2d6def9c2c 100644 --- a/spring-kafka-3/pom.xml +++ b/spring-kafka-3/pom.xml @@ -22,6 +22,7 @@ org.springframework.kafka spring-kafka + 2.9.13 com.fasterxml.jackson.core @@ -50,6 +51,16 @@ ${awaitility.version} test + + org.projectlombok + lombok + ${lombok.version} + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + ${jackson-datatype.version} + @@ -57,5 +68,6 @@ 3.0.12 1.19.3 4.2.0 + 2.13.5 diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/DelayedMessageListenerAdapter.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/DelayedMessageListenerAdapter.java new file mode 100644 index 0000000000..339f18677a --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/DelayedMessageListenerAdapter.java @@ -0,0 +1,93 @@ +package com.baeldung.spring.kafka.delay; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener; +import org.springframework.kafka.listener.KafkaBackoffException; +import org.springframework.kafka.listener.KafkaConsumerBackoffManager; +import org.springframework.kafka.listener.MessageListener; +import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter; +import org.springframework.kafka.support.Acknowledgment; + +public class DelayedMessageListenerAdapter extends AbstractDelegatingMessageListenerAdapter> implements AcknowledgingConsumerAwareMessageListener { + + private static final Duration DEFAULT_DELAY_VALUE = Duration.of(0, ChronoUnit.SECONDS); + + private final String listenerId; + + private final KafkaConsumerBackoffManager kafkaConsumerBackoffManager; + + private final Map delaysPerTopic = new ConcurrentHashMap<>(); + + private Duration defaultDelay = DEFAULT_DELAY_VALUE; + + public DelayedMessageListenerAdapter(MessageListener delegate, KafkaConsumerBackoffManager kafkaConsumerBackoffManager, String listenerId) { + super(delegate); + Objects.requireNonNull(kafkaConsumerBackoffManager, "kafkaConsumerBackoffManager cannot be null"); + Objects.requireNonNull(listenerId, "listenerId cannot be null"); + this.kafkaConsumerBackoffManager = kafkaConsumerBackoffManager; + this.listenerId = listenerId; + } + + @Override + public void onMessage(ConsumerRecord consumerRecord, Acknowledgment acknowledgment, Consumer consumer) throws KafkaBackoffException { + this.kafkaConsumerBackoffManager.backOffIfNecessary(createContext(consumerRecord, consumerRecord.timestamp() + delaysPerTopic.getOrDefault(consumerRecord.topic(), this.defaultDelay) + .toMillis(), consumer)); + invokeDelegateOnMessage(consumerRecord, acknowledgment, consumer); + } + + public void setDelayForTopic(String topic, Duration delay) { + Objects.requireNonNull(topic, "Topic cannot be null"); + Objects.requireNonNull(delay, "Delay cannot be null"); + this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId)); + this.delaysPerTopic.put(topic, delay); + } + + public void setDefaultDelay(Duration delay) { + Objects.requireNonNull(delay, "Delay cannot be null"); + this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId)); + this.defaultDelay = delay; + } + + private void invokeDelegateOnMessage(ConsumerRecord consumerRecord, Acknowledgment acknowledgment, Consumer consumer) { + switch (this.delegateType) { + case ACKNOWLEDGING_CONSUMER_AWARE: + this.delegate.onMessage(consumerRecord, acknowledgment, consumer); + break; + case ACKNOWLEDGING: + this.delegate.onMessage(consumerRecord, acknowledgment); + break; + case CONSUMER_AWARE: + this.delegate.onMessage(consumerRecord, consumer); + break; + case SIMPLE: + this.delegate.onMessage(consumerRecord); + } + } + + private KafkaConsumerBackoffManager.Context createContext(ConsumerRecord data, long nextExecutionTimestamp, Consumer consumer) { + return this.kafkaConsumerBackoffManager.createContext(nextExecutionTimestamp, this.listenerId, new TopicPartition(data.topic(), data.partition()), consumer); + } + + @Override + public void onMessage(ConsumerRecord data) { + onMessage(data, null, null); + } + + @Override + public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) { + onMessage(data, acknowledgment, null); + } + + @Override + public void onMessage(ConsumerRecord data, Consumer consumer) { + onMessage(data, null, consumer); + } +} \ No newline at end of file diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaConsumerConfig.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaConsumerConfig.java new file mode 100644 index 0000000000..f84b7d3f2a --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaConsumerConfig.java @@ -0,0 +1,64 @@ +package com.baeldung.spring.kafka.delay; + +import java.time.Duration; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManager; +import org.springframework.kafka.listener.ContainerPausingBackOffHandler; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.KafkaConsumerBackoffManager; +import org.springframework.kafka.listener.ListenerContainerPauseService; +import org.springframework.kafka.listener.ListenerContainerRegistry; +import org.springframework.kafka.listener.MessageListener; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +@Configuration +public class KafkaConsumerConfig { + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory consumerFactory, ListenerContainerRegistry registry, TaskScheduler scheduler) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + KafkaConsumerBackoffManager backOffManager = createBackOffManager(registry, scheduler); + factory.getContainerProperties() + .setAckMode(ContainerProperties.AckMode.RECORD); + factory.setContainerCustomizer(container -> { + DelayedMessageListenerAdapter delayedAdapter = wrapWithDelayedMessageListenerAdapter(backOffManager, container); + delayedAdapter.setDelayForTopic("web.orders", Duration.ofSeconds(10)); + delayedAdapter.setDefaultDelay(Duration.ZERO); + container.setupMessageListener(delayedAdapter); + }); + return factory; + } + + @Bean + public ObjectMapper objectMapper() { + return new ObjectMapper().registerModule(new JavaTimeModule()) + .configure(DeserializationFeature.READ_DATE_TIMESTAMPS_AS_NANOSECONDS, false); + } + + @Bean + public TaskScheduler taskScheduler() { + return new ThreadPoolTaskScheduler(); + } + + @SuppressWarnings("unchecked") + private DelayedMessageListenerAdapter wrapWithDelayedMessageListenerAdapter(KafkaConsumerBackoffManager backOffManager, ConcurrentMessageListenerContainer container) { + return new DelayedMessageListenerAdapter<>((MessageListener) container.getContainerProperties() + .getMessageListener(), backOffManager, container.getListenerId()); + } + + private ContainerPartitionPausingBackOffManager createBackOffManager(ListenerContainerRegistry registry, TaskScheduler scheduler) { + return new ContainerPartitionPausingBackOffManager(registry, new ContainerPausingBackOffHandler(new ListenerContainerPauseService(registry, scheduler))); + } + +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaDelayApplication.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaDelayApplication.java new file mode 100644 index 0000000000..6303c045bc --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaDelayApplication.java @@ -0,0 +1,13 @@ +package com.baeldung.spring.kafka.delay; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.kafka.annotation.EnableKafka; + +@EnableKafka +@SpringBootApplication +public class KafkaDelayApplication { + public static void main(String[] args) { + SpringApplication.run(KafkaDelayApplication.class, args); + } +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/Order.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/Order.java new file mode 100644 index 0000000000..e781e76466 --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/Order.java @@ -0,0 +1,27 @@ +package com.baeldung.spring.kafka.delay; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.UUID; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class Order { + + private UUID orderId; + + private LocalDateTime orderGeneratedDateTime; + + private LocalDateTime orderProcessedTime; + + private List address; + + private double price; +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderListener.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderListener.java new file mode 100644 index 0000000000..3c5c4a69e5 --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderListener.java @@ -0,0 +1,37 @@ +package com.baeldung.spring.kafka.delay; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.RetryableTopic; +import org.springframework.kafka.listener.KafkaBackoffException; +import org.springframework.kafka.retrytopic.DltStrategy; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import groovy.util.logging.Slf4j; + +@Slf4j +@Component +public class OrderListener { + + private final OrderService orderService; + + private final ObjectMapper objectMapper; + + public OrderListener(OrderService orderService, ObjectMapper objectMapper) { + this.orderService = orderService; + this.objectMapper = objectMapper; + } + + @RetryableTopic(attempts = "1", include = KafkaBackoffException.class, dltStrategy = DltStrategy.NO_DLT) + @KafkaListener(topics = { "web.orders", "web.internal.orders" }, groupId = "orders") + public void handleOrders(String order) throws JsonProcessingException { + Order orderDetails = objectMapper.readValue(order, Order.class); + OrderService.Status orderStatus = orderService.findStatusById(orderDetails.getOrderId()); + if (orderStatus.equals(OrderService.Status.ORDER_CONFIRMED)) { + orderService.processOrder(orderDetails); + } + } + +} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderService.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderService.java new file mode 100644 index 0000000000..f91e67c1d1 --- /dev/null +++ b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderService.java @@ -0,0 +1,31 @@ +package com.baeldung.spring.kafka.delay; + +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.springframework.stereotype.Service; + +@Service +public class OrderService { + + HashMap orders = new HashMap<>(); + + public Status findStatusById(UUID orderId) { + return Status.ORDER_CONFIRMED; + } + + public void processOrder(Order order) { + order.setOrderProcessedTime(LocalDateTime.now()); + orders.put(order.getOrderId(), order); + } + + public Map getOrders() { + return orders; + } + + enum Status { + CREATED, ORDER_CONFIRMED, ORDER_PROCESSED, DELETED + } +} diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/delay/KafkaDelayIntegrationTest.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/delay/KafkaDelayIntegrationTest.java new file mode 100644 index 0000000000..8264e67bb2 --- /dev/null +++ b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/delay/KafkaDelayIntegrationTest.java @@ -0,0 +1,116 @@ +package com.baeldung.spring.kafka.delay; + +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +@Testcontainers +@SpringBootTest(classes = KafkaDelayApplication.class) +class KafkaDelayIntegrationTest { + + @Container + private static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + private static KafkaProducer testKafkaProducer; + private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()) + .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); + + @Autowired + OrderService orderService; + + @DynamicPropertySource + static void setProps(DynamicPropertyRegistry registry) { + registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers); + } + + @BeforeAll + static void beforeAll() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + testKafkaProducer = new KafkaProducer<>(props); + } + + @Test + void givenKafkaBrokerExists_whenCreteOrderIsReceived_thenMessageShouldBeDelayed() throws Exception { + // Given + var orderId = UUID.randomUUID(); + Order order = Order.builder() + .orderId(orderId) + .price(1.0) + .orderGeneratedDateTime(LocalDateTime.now()) + .address(List.of("41 Felix Avenue, Luton")) + .build(); + + String orderString = objectMapper.writeValueAsString(order); + ProducerRecord record = new ProducerRecord<>("web.orders", orderString); + // When + testKafkaProducer.send(record) + .get(); + await().atMost(Duration.ofSeconds(1800)) + .until(() -> { + // then + Map orders = orderService.getOrders(); + return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId) + .getOrderGeneratedDateTime(), orders.get(orderId) + .getOrderProcessedTime()) + .getSeconds() >= 10; + }); + } + + @Test + void givenKafkaBrokerExists_whenCreteOrderIsReceivedForOtherTopics_thenMessageShouldNotBeDelayed() throws Exception { + // Given + var orderId = UUID.randomUUID(); + Order order = Order.builder() + .orderId(orderId) + .price(1.0) + .orderGeneratedDateTime(LocalDateTime.now()) + .address(List.of("41 Felix Avenue, Luton")) + .build(); + + String orderString = objectMapper.writeValueAsString(order); + ProducerRecord record = new ProducerRecord<>("web.internal.orders", orderString); + // When + testKafkaProducer.send(record) + .get(); + await().atMost(Duration.ofSeconds(1800)) + .until(() -> { + // Then + Map orders = orderService.getOrders(); + System.out.println("Time...." + Duration.between(orders.get(orderId) + .getOrderGeneratedDateTime(), orders.get(orderId) + .getOrderProcessedTime()) + .getSeconds()); + return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId) + .getOrderGeneratedDateTime(), orders.get(orderId) + .getOrderProcessedTime()) + .getSeconds() <= 1; + }); + } + +} \ No newline at end of file From 95b7834f6a21b5a40c9f4538de1b4af8b0800477 Mon Sep 17 00:00:00 2001 From: balasr3 Date: Thu, 28 Mar 2024 23:27:17 +0000 Subject: [PATCH 3/3] Revert "BAEL-7593: added implementation and test for kafka delayed message consumption" This reverts commit 98fcb6edefa15e3ffda8c5649f324b76386334bf. --- spring-kafka-3/pom.xml | 12 -- .../delay/DelayedMessageListenerAdapter.java | 93 -------------- .../kafka/delay/KafkaConsumerConfig.java | 64 ---------- .../kafka/delay/KafkaDelayApplication.java | 13 -- .../baeldung/spring/kafka/delay/Order.java | 27 ---- .../spring/kafka/delay/OrderListener.java | 37 ------ .../spring/kafka/delay/OrderService.java | 31 ----- .../delay/KafkaDelayIntegrationTest.java | 116 ------------------ 8 files changed, 393 deletions(-) delete mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/DelayedMessageListenerAdapter.java delete mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaConsumerConfig.java delete mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaDelayApplication.java delete mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/Order.java delete mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderListener.java delete mode 100644 spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderService.java delete mode 100644 spring-kafka-3/src/test/java/com/baeldung/spring/kafka/delay/KafkaDelayIntegrationTest.java diff --git a/spring-kafka-3/pom.xml b/spring-kafka-3/pom.xml index 2d6def9c2c..894eab2576 100644 --- a/spring-kafka-3/pom.xml +++ b/spring-kafka-3/pom.xml @@ -22,7 +22,6 @@ org.springframework.kafka spring-kafka - 2.9.13 com.fasterxml.jackson.core @@ -51,16 +50,6 @@ ${awaitility.version} test - - org.projectlombok - lombok - ${lombok.version} - - - com.fasterxml.jackson.datatype - jackson-datatype-jsr310 - ${jackson-datatype.version} - @@ -68,6 +57,5 @@ 3.0.12 1.19.3 4.2.0 - 2.13.5 diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/DelayedMessageListenerAdapter.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/DelayedMessageListenerAdapter.java deleted file mode 100644 index 339f18677a..0000000000 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/DelayedMessageListenerAdapter.java +++ /dev/null @@ -1,93 +0,0 @@ -package com.baeldung.spring.kafka.delay; - -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; -import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener; -import org.springframework.kafka.listener.KafkaBackoffException; -import org.springframework.kafka.listener.KafkaConsumerBackoffManager; -import org.springframework.kafka.listener.MessageListener; -import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter; -import org.springframework.kafka.support.Acknowledgment; - -public class DelayedMessageListenerAdapter extends AbstractDelegatingMessageListenerAdapter> implements AcknowledgingConsumerAwareMessageListener { - - private static final Duration DEFAULT_DELAY_VALUE = Duration.of(0, ChronoUnit.SECONDS); - - private final String listenerId; - - private final KafkaConsumerBackoffManager kafkaConsumerBackoffManager; - - private final Map delaysPerTopic = new ConcurrentHashMap<>(); - - private Duration defaultDelay = DEFAULT_DELAY_VALUE; - - public DelayedMessageListenerAdapter(MessageListener delegate, KafkaConsumerBackoffManager kafkaConsumerBackoffManager, String listenerId) { - super(delegate); - Objects.requireNonNull(kafkaConsumerBackoffManager, "kafkaConsumerBackoffManager cannot be null"); - Objects.requireNonNull(listenerId, "listenerId cannot be null"); - this.kafkaConsumerBackoffManager = kafkaConsumerBackoffManager; - this.listenerId = listenerId; - } - - @Override - public void onMessage(ConsumerRecord consumerRecord, Acknowledgment acknowledgment, Consumer consumer) throws KafkaBackoffException { - this.kafkaConsumerBackoffManager.backOffIfNecessary(createContext(consumerRecord, consumerRecord.timestamp() + delaysPerTopic.getOrDefault(consumerRecord.topic(), this.defaultDelay) - .toMillis(), consumer)); - invokeDelegateOnMessage(consumerRecord, acknowledgment, consumer); - } - - public void setDelayForTopic(String topic, Duration delay) { - Objects.requireNonNull(topic, "Topic cannot be null"); - Objects.requireNonNull(delay, "Delay cannot be null"); - this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId)); - this.delaysPerTopic.put(topic, delay); - } - - public void setDefaultDelay(Duration delay) { - Objects.requireNonNull(delay, "Delay cannot be null"); - this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId)); - this.defaultDelay = delay; - } - - private void invokeDelegateOnMessage(ConsumerRecord consumerRecord, Acknowledgment acknowledgment, Consumer consumer) { - switch (this.delegateType) { - case ACKNOWLEDGING_CONSUMER_AWARE: - this.delegate.onMessage(consumerRecord, acknowledgment, consumer); - break; - case ACKNOWLEDGING: - this.delegate.onMessage(consumerRecord, acknowledgment); - break; - case CONSUMER_AWARE: - this.delegate.onMessage(consumerRecord, consumer); - break; - case SIMPLE: - this.delegate.onMessage(consumerRecord); - } - } - - private KafkaConsumerBackoffManager.Context createContext(ConsumerRecord data, long nextExecutionTimestamp, Consumer consumer) { - return this.kafkaConsumerBackoffManager.createContext(nextExecutionTimestamp, this.listenerId, new TopicPartition(data.topic(), data.partition()), consumer); - } - - @Override - public void onMessage(ConsumerRecord data) { - onMessage(data, null, null); - } - - @Override - public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) { - onMessage(data, acknowledgment, null); - } - - @Override - public void onMessage(ConsumerRecord data, Consumer consumer) { - onMessage(data, null, consumer); - } -} \ No newline at end of file diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaConsumerConfig.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaConsumerConfig.java deleted file mode 100644 index f84b7d3f2a..0000000000 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaConsumerConfig.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.baeldung.spring.kafka.delay; - -import java.time.Duration; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; -import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManager; -import org.springframework.kafka.listener.ContainerPausingBackOffHandler; -import org.springframework.kafka.listener.ContainerProperties; -import org.springframework.kafka.listener.KafkaConsumerBackoffManager; -import org.springframework.kafka.listener.ListenerContainerPauseService; -import org.springframework.kafka.listener.ListenerContainerRegistry; -import org.springframework.kafka.listener.MessageListener; -import org.springframework.scheduling.TaskScheduler; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; - -@Configuration -public class KafkaConsumerConfig { - - @Bean - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory consumerFactory, ListenerContainerRegistry registry, TaskScheduler scheduler) { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory); - KafkaConsumerBackoffManager backOffManager = createBackOffManager(registry, scheduler); - factory.getContainerProperties() - .setAckMode(ContainerProperties.AckMode.RECORD); - factory.setContainerCustomizer(container -> { - DelayedMessageListenerAdapter delayedAdapter = wrapWithDelayedMessageListenerAdapter(backOffManager, container); - delayedAdapter.setDelayForTopic("web.orders", Duration.ofSeconds(10)); - delayedAdapter.setDefaultDelay(Duration.ZERO); - container.setupMessageListener(delayedAdapter); - }); - return factory; - } - - @Bean - public ObjectMapper objectMapper() { - return new ObjectMapper().registerModule(new JavaTimeModule()) - .configure(DeserializationFeature.READ_DATE_TIMESTAMPS_AS_NANOSECONDS, false); - } - - @Bean - public TaskScheduler taskScheduler() { - return new ThreadPoolTaskScheduler(); - } - - @SuppressWarnings("unchecked") - private DelayedMessageListenerAdapter wrapWithDelayedMessageListenerAdapter(KafkaConsumerBackoffManager backOffManager, ConcurrentMessageListenerContainer container) { - return new DelayedMessageListenerAdapter<>((MessageListener) container.getContainerProperties() - .getMessageListener(), backOffManager, container.getListenerId()); - } - - private ContainerPartitionPausingBackOffManager createBackOffManager(ListenerContainerRegistry registry, TaskScheduler scheduler) { - return new ContainerPartitionPausingBackOffManager(registry, new ContainerPausingBackOffHandler(new ListenerContainerPauseService(registry, scheduler))); - } - -} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaDelayApplication.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaDelayApplication.java deleted file mode 100644 index 6303c045bc..0000000000 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaDelayApplication.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.baeldung.spring.kafka.delay; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.kafka.annotation.EnableKafka; - -@EnableKafka -@SpringBootApplication -public class KafkaDelayApplication { - public static void main(String[] args) { - SpringApplication.run(KafkaDelayApplication.class, args); - } -} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/Order.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/Order.java deleted file mode 100644 index e781e76466..0000000000 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/Order.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.baeldung.spring.kafka.delay; - -import java.time.LocalDateTime; -import java.util.List; -import java.util.UUID; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@Builder -@AllArgsConstructor -@NoArgsConstructor -public class Order { - - private UUID orderId; - - private LocalDateTime orderGeneratedDateTime; - - private LocalDateTime orderProcessedTime; - - private List address; - - private double price; -} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderListener.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderListener.java deleted file mode 100644 index 3c5c4a69e5..0000000000 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderListener.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.baeldung.spring.kafka.delay; - -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.annotation.RetryableTopic; -import org.springframework.kafka.listener.KafkaBackoffException; -import org.springframework.kafka.retrytopic.DltStrategy; -import org.springframework.stereotype.Component; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - -import groovy.util.logging.Slf4j; - -@Slf4j -@Component -public class OrderListener { - - private final OrderService orderService; - - private final ObjectMapper objectMapper; - - public OrderListener(OrderService orderService, ObjectMapper objectMapper) { - this.orderService = orderService; - this.objectMapper = objectMapper; - } - - @RetryableTopic(attempts = "1", include = KafkaBackoffException.class, dltStrategy = DltStrategy.NO_DLT) - @KafkaListener(topics = { "web.orders", "web.internal.orders" }, groupId = "orders") - public void handleOrders(String order) throws JsonProcessingException { - Order orderDetails = objectMapper.readValue(order, Order.class); - OrderService.Status orderStatus = orderService.findStatusById(orderDetails.getOrderId()); - if (orderStatus.equals(OrderService.Status.ORDER_CONFIRMED)) { - orderService.processOrder(orderDetails); - } - } - -} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderService.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderService.java deleted file mode 100644 index f91e67c1d1..0000000000 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderService.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.baeldung.spring.kafka.delay; - -import java.time.LocalDateTime; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -import org.springframework.stereotype.Service; - -@Service -public class OrderService { - - HashMap orders = new HashMap<>(); - - public Status findStatusById(UUID orderId) { - return Status.ORDER_CONFIRMED; - } - - public void processOrder(Order order) { - order.setOrderProcessedTime(LocalDateTime.now()); - orders.put(order.getOrderId(), order); - } - - public Map getOrders() { - return orders; - } - - enum Status { - CREATED, ORDER_CONFIRMED, ORDER_PROCESSED, DELETED - } -} diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/delay/KafkaDelayIntegrationTest.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/delay/KafkaDelayIntegrationTest.java deleted file mode 100644 index 8264e67bb2..0000000000 --- a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/delay/KafkaDelayIntegrationTest.java +++ /dev/null @@ -1,116 +0,0 @@ -package com.baeldung.spring.kafka.delay; - -import static org.awaitility.Awaitility.await; - -import java.time.Duration; -import java.time.LocalDateTime; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.UUID; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.DynamicPropertyRegistry; -import org.springframework.test.context.DynamicPropertySource; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; - -@Testcontainers -@SpringBootTest(classes = KafkaDelayApplication.class) -class KafkaDelayIntegrationTest { - - @Container - private static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); - private static KafkaProducer testKafkaProducer; - private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()) - .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); - - @Autowired - OrderService orderService; - - @DynamicPropertySource - static void setProps(DynamicPropertyRegistry registry) { - registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers); - } - - @BeforeAll - static void beforeAll() { - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - testKafkaProducer = new KafkaProducer<>(props); - } - - @Test - void givenKafkaBrokerExists_whenCreteOrderIsReceived_thenMessageShouldBeDelayed() throws Exception { - // Given - var orderId = UUID.randomUUID(); - Order order = Order.builder() - .orderId(orderId) - .price(1.0) - .orderGeneratedDateTime(LocalDateTime.now()) - .address(List.of("41 Felix Avenue, Luton")) - .build(); - - String orderString = objectMapper.writeValueAsString(order); - ProducerRecord record = new ProducerRecord<>("web.orders", orderString); - // When - testKafkaProducer.send(record) - .get(); - await().atMost(Duration.ofSeconds(1800)) - .until(() -> { - // then - Map orders = orderService.getOrders(); - return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId) - .getOrderGeneratedDateTime(), orders.get(orderId) - .getOrderProcessedTime()) - .getSeconds() >= 10; - }); - } - - @Test - void givenKafkaBrokerExists_whenCreteOrderIsReceivedForOtherTopics_thenMessageShouldNotBeDelayed() throws Exception { - // Given - var orderId = UUID.randomUUID(); - Order order = Order.builder() - .orderId(orderId) - .price(1.0) - .orderGeneratedDateTime(LocalDateTime.now()) - .address(List.of("41 Felix Avenue, Luton")) - .build(); - - String orderString = objectMapper.writeValueAsString(order); - ProducerRecord record = new ProducerRecord<>("web.internal.orders", orderString); - // When - testKafkaProducer.send(record) - .get(); - await().atMost(Duration.ofSeconds(1800)) - .until(() -> { - // Then - Map orders = orderService.getOrders(); - System.out.println("Time...." + Duration.between(orders.get(orderId) - .getOrderGeneratedDateTime(), orders.get(orderId) - .getOrderProcessedTime()) - .getSeconds()); - return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId) - .getOrderGeneratedDateTime(), orders.get(orderId) - .getOrderProcessedTime()) - .getSeconds() <= 1; - }); - } - -} \ No newline at end of file