diff --git a/spring-kafka-3/pom.xml b/spring-kafka-3/pom.xml index 2d6def9c2c..894eab2576 100644 --- a/spring-kafka-3/pom.xml +++ b/spring-kafka-3/pom.xml @@ -22,7 +22,6 @@ org.springframework.kafka spring-kafka - 2.9.13 com.fasterxml.jackson.core @@ -51,16 +50,6 @@ ${awaitility.version} test - - org.projectlombok - lombok - ${lombok.version} - - - com.fasterxml.jackson.datatype - jackson-datatype-jsr310 - ${jackson-datatype.version} - @@ -68,6 +57,5 @@ 3.0.12 1.19.3 4.2.0 - 2.13.5 diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/DelayedMessageListenerAdapter.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/DelayedMessageListenerAdapter.java deleted file mode 100644 index 339f18677a..0000000000 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/DelayedMessageListenerAdapter.java +++ /dev/null @@ -1,93 +0,0 @@ -package com.baeldung.spring.kafka.delay; - -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; -import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener; -import org.springframework.kafka.listener.KafkaBackoffException; -import org.springframework.kafka.listener.KafkaConsumerBackoffManager; -import org.springframework.kafka.listener.MessageListener; -import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter; -import org.springframework.kafka.support.Acknowledgment; - -public class DelayedMessageListenerAdapter extends AbstractDelegatingMessageListenerAdapter> implements AcknowledgingConsumerAwareMessageListener { - - private static final Duration DEFAULT_DELAY_VALUE = Duration.of(0, ChronoUnit.SECONDS); - - private final String listenerId; - - private final KafkaConsumerBackoffManager kafkaConsumerBackoffManager; - - private final Map delaysPerTopic = new ConcurrentHashMap<>(); - - private Duration defaultDelay = DEFAULT_DELAY_VALUE; - - public DelayedMessageListenerAdapter(MessageListener delegate, KafkaConsumerBackoffManager kafkaConsumerBackoffManager, String listenerId) { - super(delegate); - Objects.requireNonNull(kafkaConsumerBackoffManager, "kafkaConsumerBackoffManager cannot be null"); - Objects.requireNonNull(listenerId, "listenerId cannot be null"); - this.kafkaConsumerBackoffManager = kafkaConsumerBackoffManager; - this.listenerId = listenerId; - } - - @Override - public void onMessage(ConsumerRecord consumerRecord, Acknowledgment acknowledgment, Consumer consumer) throws KafkaBackoffException { - this.kafkaConsumerBackoffManager.backOffIfNecessary(createContext(consumerRecord, consumerRecord.timestamp() + delaysPerTopic.getOrDefault(consumerRecord.topic(), this.defaultDelay) - .toMillis(), consumer)); - invokeDelegateOnMessage(consumerRecord, acknowledgment, consumer); - } - - public void setDelayForTopic(String topic, Duration delay) { - Objects.requireNonNull(topic, "Topic cannot be null"); - Objects.requireNonNull(delay, "Delay cannot be null"); - this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId)); - this.delaysPerTopic.put(topic, delay); - } - - public void setDefaultDelay(Duration delay) { - Objects.requireNonNull(delay, "Delay cannot be null"); - this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId)); - this.defaultDelay = delay; - } - - private void invokeDelegateOnMessage(ConsumerRecord consumerRecord, Acknowledgment acknowledgment, Consumer consumer) { - switch (this.delegateType) { - case ACKNOWLEDGING_CONSUMER_AWARE: - this.delegate.onMessage(consumerRecord, acknowledgment, consumer); - break; - case ACKNOWLEDGING: - this.delegate.onMessage(consumerRecord, acknowledgment); - break; - case CONSUMER_AWARE: - this.delegate.onMessage(consumerRecord, consumer); - break; - case SIMPLE: - this.delegate.onMessage(consumerRecord); - } - } - - private KafkaConsumerBackoffManager.Context createContext(ConsumerRecord data, long nextExecutionTimestamp, Consumer consumer) { - return this.kafkaConsumerBackoffManager.createContext(nextExecutionTimestamp, this.listenerId, new TopicPartition(data.topic(), data.partition()), consumer); - } - - @Override - public void onMessage(ConsumerRecord data) { - onMessage(data, null, null); - } - - @Override - public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) { - onMessage(data, acknowledgment, null); - } - - @Override - public void onMessage(ConsumerRecord data, Consumer consumer) { - onMessage(data, null, consumer); - } -} \ No newline at end of file diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaConsumerConfig.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaConsumerConfig.java deleted file mode 100644 index f84b7d3f2a..0000000000 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaConsumerConfig.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.baeldung.spring.kafka.delay; - -import java.time.Duration; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; -import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManager; -import org.springframework.kafka.listener.ContainerPausingBackOffHandler; -import org.springframework.kafka.listener.ContainerProperties; -import org.springframework.kafka.listener.KafkaConsumerBackoffManager; -import org.springframework.kafka.listener.ListenerContainerPauseService; -import org.springframework.kafka.listener.ListenerContainerRegistry; -import org.springframework.kafka.listener.MessageListener; -import org.springframework.scheduling.TaskScheduler; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; - -@Configuration -public class KafkaConsumerConfig { - - @Bean - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory consumerFactory, ListenerContainerRegistry registry, TaskScheduler scheduler) { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory); - KafkaConsumerBackoffManager backOffManager = createBackOffManager(registry, scheduler); - factory.getContainerProperties() - .setAckMode(ContainerProperties.AckMode.RECORD); - factory.setContainerCustomizer(container -> { - DelayedMessageListenerAdapter delayedAdapter = wrapWithDelayedMessageListenerAdapter(backOffManager, container); - delayedAdapter.setDelayForTopic("web.orders", Duration.ofSeconds(10)); - delayedAdapter.setDefaultDelay(Duration.ZERO); - container.setupMessageListener(delayedAdapter); - }); - return factory; - } - - @Bean - public ObjectMapper objectMapper() { - return new ObjectMapper().registerModule(new JavaTimeModule()) - .configure(DeserializationFeature.READ_DATE_TIMESTAMPS_AS_NANOSECONDS, false); - } - - @Bean - public TaskScheduler taskScheduler() { - return new ThreadPoolTaskScheduler(); - } - - @SuppressWarnings("unchecked") - private DelayedMessageListenerAdapter wrapWithDelayedMessageListenerAdapter(KafkaConsumerBackoffManager backOffManager, ConcurrentMessageListenerContainer container) { - return new DelayedMessageListenerAdapter<>((MessageListener) container.getContainerProperties() - .getMessageListener(), backOffManager, container.getListenerId()); - } - - private ContainerPartitionPausingBackOffManager createBackOffManager(ListenerContainerRegistry registry, TaskScheduler scheduler) { - return new ContainerPartitionPausingBackOffManager(registry, new ContainerPausingBackOffHandler(new ListenerContainerPauseService(registry, scheduler))); - } - -} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaDelayApplication.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaDelayApplication.java deleted file mode 100644 index 6303c045bc..0000000000 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/KafkaDelayApplication.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.baeldung.spring.kafka.delay; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.kafka.annotation.EnableKafka; - -@EnableKafka -@SpringBootApplication -public class KafkaDelayApplication { - public static void main(String[] args) { - SpringApplication.run(KafkaDelayApplication.class, args); - } -} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/Order.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/Order.java deleted file mode 100644 index e781e76466..0000000000 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/Order.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.baeldung.spring.kafka.delay; - -import java.time.LocalDateTime; -import java.util.List; -import java.util.UUID; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@Builder -@AllArgsConstructor -@NoArgsConstructor -public class Order { - - private UUID orderId; - - private LocalDateTime orderGeneratedDateTime; - - private LocalDateTime orderProcessedTime; - - private List address; - - private double price; -} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderListener.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderListener.java deleted file mode 100644 index 3c5c4a69e5..0000000000 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderListener.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.baeldung.spring.kafka.delay; - -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.annotation.RetryableTopic; -import org.springframework.kafka.listener.KafkaBackoffException; -import org.springframework.kafka.retrytopic.DltStrategy; -import org.springframework.stereotype.Component; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - -import groovy.util.logging.Slf4j; - -@Slf4j -@Component -public class OrderListener { - - private final OrderService orderService; - - private final ObjectMapper objectMapper; - - public OrderListener(OrderService orderService, ObjectMapper objectMapper) { - this.orderService = orderService; - this.objectMapper = objectMapper; - } - - @RetryableTopic(attempts = "1", include = KafkaBackoffException.class, dltStrategy = DltStrategy.NO_DLT) - @KafkaListener(topics = { "web.orders", "web.internal.orders" }, groupId = "orders") - public void handleOrders(String order) throws JsonProcessingException { - Order orderDetails = objectMapper.readValue(order, Order.class); - OrderService.Status orderStatus = orderService.findStatusById(orderDetails.getOrderId()); - if (orderStatus.equals(OrderService.Status.ORDER_CONFIRMED)) { - orderService.processOrder(orderDetails); - } - } - -} diff --git a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderService.java b/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderService.java deleted file mode 100644 index f91e67c1d1..0000000000 --- a/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/delay/OrderService.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.baeldung.spring.kafka.delay; - -import java.time.LocalDateTime; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -import org.springframework.stereotype.Service; - -@Service -public class OrderService { - - HashMap orders = new HashMap<>(); - - public Status findStatusById(UUID orderId) { - return Status.ORDER_CONFIRMED; - } - - public void processOrder(Order order) { - order.setOrderProcessedTime(LocalDateTime.now()); - orders.put(order.getOrderId(), order); - } - - public Map getOrders() { - return orders; - } - - enum Status { - CREATED, ORDER_CONFIRMED, ORDER_PROCESSED, DELETED - } -} diff --git a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/delay/KafkaDelayIntegrationTest.java b/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/delay/KafkaDelayIntegrationTest.java deleted file mode 100644 index 8264e67bb2..0000000000 --- a/spring-kafka-3/src/test/java/com/baeldung/spring/kafka/delay/KafkaDelayIntegrationTest.java +++ /dev/null @@ -1,116 +0,0 @@ -package com.baeldung.spring.kafka.delay; - -import static org.awaitility.Awaitility.await; - -import java.time.Duration; -import java.time.LocalDateTime; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.UUID; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.DynamicPropertyRegistry; -import org.springframework.test.context.DynamicPropertySource; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; - -@Testcontainers -@SpringBootTest(classes = KafkaDelayApplication.class) -class KafkaDelayIntegrationTest { - - @Container - private static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); - private static KafkaProducer testKafkaProducer; - private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()) - .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); - - @Autowired - OrderService orderService; - - @DynamicPropertySource - static void setProps(DynamicPropertyRegistry registry) { - registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers); - } - - @BeforeAll - static void beforeAll() { - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - testKafkaProducer = new KafkaProducer<>(props); - } - - @Test - void givenKafkaBrokerExists_whenCreteOrderIsReceived_thenMessageShouldBeDelayed() throws Exception { - // Given - var orderId = UUID.randomUUID(); - Order order = Order.builder() - .orderId(orderId) - .price(1.0) - .orderGeneratedDateTime(LocalDateTime.now()) - .address(List.of("41 Felix Avenue, Luton")) - .build(); - - String orderString = objectMapper.writeValueAsString(order); - ProducerRecord record = new ProducerRecord<>("web.orders", orderString); - // When - testKafkaProducer.send(record) - .get(); - await().atMost(Duration.ofSeconds(1800)) - .until(() -> { - // then - Map orders = orderService.getOrders(); - return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId) - .getOrderGeneratedDateTime(), orders.get(orderId) - .getOrderProcessedTime()) - .getSeconds() >= 10; - }); - } - - @Test - void givenKafkaBrokerExists_whenCreteOrderIsReceivedForOtherTopics_thenMessageShouldNotBeDelayed() throws Exception { - // Given - var orderId = UUID.randomUUID(); - Order order = Order.builder() - .orderId(orderId) - .price(1.0) - .orderGeneratedDateTime(LocalDateTime.now()) - .address(List.of("41 Felix Avenue, Luton")) - .build(); - - String orderString = objectMapper.writeValueAsString(order); - ProducerRecord record = new ProducerRecord<>("web.internal.orders", orderString); - // When - testKafkaProducer.send(record) - .get(); - await().atMost(Duration.ofSeconds(1800)) - .until(() -> { - // Then - Map orders = orderService.getOrders(); - System.out.println("Time...." + Duration.between(orders.get(orderId) - .getOrderGeneratedDateTime(), orders.get(orderId) - .getOrderProcessedTime()) - .getSeconds()); - return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId) - .getOrderGeneratedDateTime(), orders.get(orderId) - .getOrderProcessedTime()) - .getSeconds() <= 1; - }); - } - -} \ No newline at end of file