BAEL-7593: added implementation and test for kafka delayed message consumption

This commit is contained in:
balasr3 2024-03-26 23:43:48 +00:00
parent f682eba397
commit 98fcb6edef
8 changed files with 393 additions and 0 deletions

View File

@ -22,6 +22,7 @@
<dependency> <dependency>
<groupId>org.springframework.kafka</groupId> <groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId> <artifactId>spring-kafka</artifactId>
<version>2.9.13</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
@ -50,6 +51,16 @@
<version>${awaitility.version}</version> <version>${awaitility.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson-datatype.version}</version>
</dependency>
</dependencies> </dependencies>
<properties> <properties>
@ -57,5 +68,6 @@
<kafka-version>3.0.12</kafka-version> <kafka-version>3.0.12</kafka-version>
<testcontainers.version>1.19.3</testcontainers.version> <testcontainers.version>1.19.3</testcontainers.version>
<awaitility.version>4.2.0</awaitility.version> <awaitility.version>4.2.0</awaitility.version>
<jackson-datatype.version>2.13.5</jackson-datatype.version>
</properties> </properties>
</project> </project>

View File

@ -0,0 +1,93 @@
package com.baeldung.spring.kafka.delay;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.KafkaBackoffException;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
public class DelayedMessageListenerAdapter<K, V> extends AbstractDelegatingMessageListenerAdapter<MessageListener<K, V>> implements AcknowledgingConsumerAwareMessageListener<K, V> {
private static final Duration DEFAULT_DELAY_VALUE = Duration.of(0, ChronoUnit.SECONDS);
private final String listenerId;
private final KafkaConsumerBackoffManager kafkaConsumerBackoffManager;
private final Map<String, Duration> delaysPerTopic = new ConcurrentHashMap<>();
private Duration defaultDelay = DEFAULT_DELAY_VALUE;
public DelayedMessageListenerAdapter(MessageListener<K, V> delegate, KafkaConsumerBackoffManager kafkaConsumerBackoffManager, String listenerId) {
super(delegate);
Objects.requireNonNull(kafkaConsumerBackoffManager, "kafkaConsumerBackoffManager cannot be null");
Objects.requireNonNull(listenerId, "listenerId cannot be null");
this.kafkaConsumerBackoffManager = kafkaConsumerBackoffManager;
this.listenerId = listenerId;
}
@Override
public void onMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) throws KafkaBackoffException {
this.kafkaConsumerBackoffManager.backOffIfNecessary(createContext(consumerRecord, consumerRecord.timestamp() + delaysPerTopic.getOrDefault(consumerRecord.topic(), this.defaultDelay)
.toMillis(), consumer));
invokeDelegateOnMessage(consumerRecord, acknowledgment, consumer);
}
public void setDelayForTopic(String topic, Duration delay) {
Objects.requireNonNull(topic, "Topic cannot be null");
Objects.requireNonNull(delay, "Delay cannot be null");
this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId));
this.delaysPerTopic.put(topic, delay);
}
public void setDefaultDelay(Duration delay) {
Objects.requireNonNull(delay, "Delay cannot be null");
this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId));
this.defaultDelay = delay;
}
private void invokeDelegateOnMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
switch (this.delegateType) {
case ACKNOWLEDGING_CONSUMER_AWARE:
this.delegate.onMessage(consumerRecord, acknowledgment, consumer);
break;
case ACKNOWLEDGING:
this.delegate.onMessage(consumerRecord, acknowledgment);
break;
case CONSUMER_AWARE:
this.delegate.onMessage(consumerRecord, consumer);
break;
case SIMPLE:
this.delegate.onMessage(consumerRecord);
}
}
private KafkaConsumerBackoffManager.Context createContext(ConsumerRecord<K, V> data, long nextExecutionTimestamp, Consumer<?, ?> consumer) {
return this.kafkaConsumerBackoffManager.createContext(nextExecutionTimestamp, this.listenerId, new TopicPartition(data.topic(), data.partition()), consumer);
}
@Override
public void onMessage(ConsumerRecord<K, V> data) {
onMessage(data, null, null);
}
@Override
public void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment) {
onMessage(data, acknowledgment, null);
}
@Override
public void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer) {
onMessage(data, null, consumer);
}
}

View File

@ -0,0 +1,64 @@
package com.baeldung.spring.kafka.delay;
import java.time.Duration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManager;
import org.springframework.kafka.listener.ContainerPausingBackOffHandler;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.ListenerContainerPauseService;
import org.springframework.kafka.listener.ListenerContainerRegistry;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(ConsumerFactory<Object, Object> consumerFactory, ListenerContainerRegistry registry, TaskScheduler scheduler) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
KafkaConsumerBackoffManager backOffManager = createBackOffManager(registry, scheduler);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.RECORD);
factory.setContainerCustomizer(container -> {
DelayedMessageListenerAdapter<Object, Object> delayedAdapter = wrapWithDelayedMessageListenerAdapter(backOffManager, container);
delayedAdapter.setDelayForTopic("web.orders", Duration.ofSeconds(10));
delayedAdapter.setDefaultDelay(Duration.ZERO);
container.setupMessageListener(delayedAdapter);
});
return factory;
}
@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper().registerModule(new JavaTimeModule())
.configure(DeserializationFeature.READ_DATE_TIMESTAMPS_AS_NANOSECONDS, false);
}
@Bean
public TaskScheduler taskScheduler() {
return new ThreadPoolTaskScheduler();
}
@SuppressWarnings("unchecked")
private DelayedMessageListenerAdapter<Object, Object> wrapWithDelayedMessageListenerAdapter(KafkaConsumerBackoffManager backOffManager, ConcurrentMessageListenerContainer<Object, Object> container) {
return new DelayedMessageListenerAdapter<>((MessageListener<Object, Object>) container.getContainerProperties()
.getMessageListener(), backOffManager, container.getListenerId());
}
private ContainerPartitionPausingBackOffManager createBackOffManager(ListenerContainerRegistry registry, TaskScheduler scheduler) {
return new ContainerPartitionPausingBackOffManager(registry, new ContainerPausingBackOffHandler(new ListenerContainerPauseService(registry, scheduler)));
}
}

View File

@ -0,0 +1,13 @@
package com.baeldung.spring.kafka.delay;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
@EnableKafka
@SpringBootApplication
public class KafkaDelayApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaDelayApplication.class, args);
}
}

View File

@ -0,0 +1,27 @@
package com.baeldung.spring.kafka.delay;
import java.time.LocalDateTime;
import java.util.List;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {
private UUID orderId;
private LocalDateTime orderGeneratedDateTime;
private LocalDateTime orderProcessedTime;
private List<String> address;
private double price;
}

View File

@ -0,0 +1,37 @@
package com.baeldung.spring.kafka.delay;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.listener.KafkaBackoffException;
import org.springframework.kafka.retrytopic.DltStrategy;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import groovy.util.logging.Slf4j;
@Slf4j
@Component
public class OrderListener {
private final OrderService orderService;
private final ObjectMapper objectMapper;
public OrderListener(OrderService orderService, ObjectMapper objectMapper) {
this.orderService = orderService;
this.objectMapper = objectMapper;
}
@RetryableTopic(attempts = "1", include = KafkaBackoffException.class, dltStrategy = DltStrategy.NO_DLT)
@KafkaListener(topics = { "web.orders", "web.internal.orders" }, groupId = "orders")
public void handleOrders(String order) throws JsonProcessingException {
Order orderDetails = objectMapper.readValue(order, Order.class);
OrderService.Status orderStatus = orderService.findStatusById(orderDetails.getOrderId());
if (orderStatus.equals(OrderService.Status.ORDER_CONFIRMED)) {
orderService.processOrder(orderDetails);
}
}
}

View File

@ -0,0 +1,31 @@
package com.baeldung.spring.kafka.delay;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.springframework.stereotype.Service;
@Service
public class OrderService {
HashMap<UUID, Order> orders = new HashMap<>();
public Status findStatusById(UUID orderId) {
return Status.ORDER_CONFIRMED;
}
public void processOrder(Order order) {
order.setOrderProcessedTime(LocalDateTime.now());
orders.put(order.getOrderId(), order);
}
public Map<UUID, Order> getOrders() {
return orders;
}
enum Status {
CREATED, ORDER_CONFIRMED, ORDER_PROCESSED, DELETED
}
}

View File

@ -0,0 +1,116 @@
package com.baeldung.spring.kafka.delay;
import static org.awaitility.Awaitility.await;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
@Testcontainers
@SpringBootTest(classes = KafkaDelayApplication.class)
class KafkaDelayIntegrationTest {
@Container
private static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
private static KafkaProducer<String, String> testKafkaProducer;
private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule())
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
@Autowired
OrderService orderService;
@DynamicPropertySource
static void setProps(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
}
@BeforeAll
static void beforeAll() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
testKafkaProducer = new KafkaProducer<>(props);
}
@Test
void givenKafkaBrokerExists_whenCreteOrderIsReceived_thenMessageShouldBeDelayed() throws Exception {
// Given
var orderId = UUID.randomUUID();
Order order = Order.builder()
.orderId(orderId)
.price(1.0)
.orderGeneratedDateTime(LocalDateTime.now())
.address(List.of("41 Felix Avenue, Luton"))
.build();
String orderString = objectMapper.writeValueAsString(order);
ProducerRecord<String, String> record = new ProducerRecord<>("web.orders", orderString);
// When
testKafkaProducer.send(record)
.get();
await().atMost(Duration.ofSeconds(1800))
.until(() -> {
// then
Map<UUID, Order> orders = orderService.getOrders();
return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId)
.getOrderGeneratedDateTime(), orders.get(orderId)
.getOrderProcessedTime())
.getSeconds() >= 10;
});
}
@Test
void givenKafkaBrokerExists_whenCreteOrderIsReceivedForOtherTopics_thenMessageShouldNotBeDelayed() throws Exception {
// Given
var orderId = UUID.randomUUID();
Order order = Order.builder()
.orderId(orderId)
.price(1.0)
.orderGeneratedDateTime(LocalDateTime.now())
.address(List.of("41 Felix Avenue, Luton"))
.build();
String orderString = objectMapper.writeValueAsString(order);
ProducerRecord<String, String> record = new ProducerRecord<>("web.internal.orders", orderString);
// When
testKafkaProducer.send(record)
.get();
await().atMost(Duration.ofSeconds(1800))
.until(() -> {
// Then
Map<UUID, Order> orders = orderService.getOrders();
System.out.println("Time...." + Duration.between(orders.get(orderId)
.getOrderGeneratedDateTime(), orders.get(orderId)
.getOrderProcessedTime())
.getSeconds());
return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId)
.getOrderGeneratedDateTime(), orders.get(orderId)
.getOrderProcessedTime())
.getSeconds() <= 1;
});
}
}