Revert "BAEL-7593: added implementation and test for kafka delayed message consumption"
This reverts commit 98fcb6edef
.
This commit is contained in:
parent
98fcb6edef
commit
95b7834f6a
|
@ -22,7 +22,6 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.kafka</groupId>
|
<groupId>org.springframework.kafka</groupId>
|
||||||
<artifactId>spring-kafka</artifactId>
|
<artifactId>spring-kafka</artifactId>
|
||||||
<version>2.9.13</version>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.fasterxml.jackson.core</groupId>
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
@ -51,16 +50,6 @@
|
||||||
<version>${awaitility.version}</version>
|
<version>${awaitility.version}</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.projectlombok</groupId>
|
|
||||||
<artifactId>lombok</artifactId>
|
|
||||||
<version>${lombok.version}</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.fasterxml.jackson.datatype</groupId>
|
|
||||||
<artifactId>jackson-datatype-jsr310</artifactId>
|
|
||||||
<version>${jackson-datatype.version}</version>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
|
@ -68,6 +57,5 @@
|
||||||
<kafka-version>3.0.12</kafka-version>
|
<kafka-version>3.0.12</kafka-version>
|
||||||
<testcontainers.version>1.19.3</testcontainers.version>
|
<testcontainers.version>1.19.3</testcontainers.version>
|
||||||
<awaitility.version>4.2.0</awaitility.version>
|
<awaitility.version>4.2.0</awaitility.version>
|
||||||
<jackson-datatype.version>2.13.5</jackson-datatype.version>
|
|
||||||
</properties>
|
</properties>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -1,93 +0,0 @@
|
||||||
package com.baeldung.spring.kafka.delay;
|
|
||||||
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.time.temporal.ChronoUnit;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
|
|
||||||
import org.apache.kafka.clients.consumer.Consumer;
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
||||||
import org.apache.kafka.common.TopicPartition;
|
|
||||||
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
|
|
||||||
import org.springframework.kafka.listener.KafkaBackoffException;
|
|
||||||
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
|
|
||||||
import org.springframework.kafka.listener.MessageListener;
|
|
||||||
import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter;
|
|
||||||
import org.springframework.kafka.support.Acknowledgment;
|
|
||||||
|
|
||||||
public class DelayedMessageListenerAdapter<K, V> extends AbstractDelegatingMessageListenerAdapter<MessageListener<K, V>> implements AcknowledgingConsumerAwareMessageListener<K, V> {
|
|
||||||
|
|
||||||
private static final Duration DEFAULT_DELAY_VALUE = Duration.of(0, ChronoUnit.SECONDS);
|
|
||||||
|
|
||||||
private final String listenerId;
|
|
||||||
|
|
||||||
private final KafkaConsumerBackoffManager kafkaConsumerBackoffManager;
|
|
||||||
|
|
||||||
private final Map<String, Duration> delaysPerTopic = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
private Duration defaultDelay = DEFAULT_DELAY_VALUE;
|
|
||||||
|
|
||||||
public DelayedMessageListenerAdapter(MessageListener<K, V> delegate, KafkaConsumerBackoffManager kafkaConsumerBackoffManager, String listenerId) {
|
|
||||||
super(delegate);
|
|
||||||
Objects.requireNonNull(kafkaConsumerBackoffManager, "kafkaConsumerBackoffManager cannot be null");
|
|
||||||
Objects.requireNonNull(listenerId, "listenerId cannot be null");
|
|
||||||
this.kafkaConsumerBackoffManager = kafkaConsumerBackoffManager;
|
|
||||||
this.listenerId = listenerId;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) throws KafkaBackoffException {
|
|
||||||
this.kafkaConsumerBackoffManager.backOffIfNecessary(createContext(consumerRecord, consumerRecord.timestamp() + delaysPerTopic.getOrDefault(consumerRecord.topic(), this.defaultDelay)
|
|
||||||
.toMillis(), consumer));
|
|
||||||
invokeDelegateOnMessage(consumerRecord, acknowledgment, consumer);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setDelayForTopic(String topic, Duration delay) {
|
|
||||||
Objects.requireNonNull(topic, "Topic cannot be null");
|
|
||||||
Objects.requireNonNull(delay, "Delay cannot be null");
|
|
||||||
this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId));
|
|
||||||
this.delaysPerTopic.put(topic, delay);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setDefaultDelay(Duration delay) {
|
|
||||||
Objects.requireNonNull(delay, "Delay cannot be null");
|
|
||||||
this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId));
|
|
||||||
this.defaultDelay = delay;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void invokeDelegateOnMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
|
|
||||||
switch (this.delegateType) {
|
|
||||||
case ACKNOWLEDGING_CONSUMER_AWARE:
|
|
||||||
this.delegate.onMessage(consumerRecord, acknowledgment, consumer);
|
|
||||||
break;
|
|
||||||
case ACKNOWLEDGING:
|
|
||||||
this.delegate.onMessage(consumerRecord, acknowledgment);
|
|
||||||
break;
|
|
||||||
case CONSUMER_AWARE:
|
|
||||||
this.delegate.onMessage(consumerRecord, consumer);
|
|
||||||
break;
|
|
||||||
case SIMPLE:
|
|
||||||
this.delegate.onMessage(consumerRecord);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private KafkaConsumerBackoffManager.Context createContext(ConsumerRecord<K, V> data, long nextExecutionTimestamp, Consumer<?, ?> consumer) {
|
|
||||||
return this.kafkaConsumerBackoffManager.createContext(nextExecutionTimestamp, this.listenerId, new TopicPartition(data.topic(), data.partition()), consumer);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onMessage(ConsumerRecord<K, V> data) {
|
|
||||||
onMessage(data, null, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment) {
|
|
||||||
onMessage(data, acknowledgment, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer) {
|
|
||||||
onMessage(data, null, consumer);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,64 +0,0 @@
|
||||||
package com.baeldung.spring.kafka.delay;
|
|
||||||
|
|
||||||
import java.time.Duration;
|
|
||||||
|
|
||||||
import org.springframework.context.annotation.Bean;
|
|
||||||
import org.springframework.context.annotation.Configuration;
|
|
||||||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
|
||||||
import org.springframework.kafka.core.ConsumerFactory;
|
|
||||||
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
|
||||||
import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManager;
|
|
||||||
import org.springframework.kafka.listener.ContainerPausingBackOffHandler;
|
|
||||||
import org.springframework.kafka.listener.ContainerProperties;
|
|
||||||
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
|
|
||||||
import org.springframework.kafka.listener.ListenerContainerPauseService;
|
|
||||||
import org.springframework.kafka.listener.ListenerContainerRegistry;
|
|
||||||
import org.springframework.kafka.listener.MessageListener;
|
|
||||||
import org.springframework.scheduling.TaskScheduler;
|
|
||||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
|
||||||
|
|
||||||
@Configuration
|
|
||||||
public class KafkaConsumerConfig {
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(ConsumerFactory<Object, Object> consumerFactory, ListenerContainerRegistry registry, TaskScheduler scheduler) {
|
|
||||||
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
|
||||||
factory.setConsumerFactory(consumerFactory);
|
|
||||||
KafkaConsumerBackoffManager backOffManager = createBackOffManager(registry, scheduler);
|
|
||||||
factory.getContainerProperties()
|
|
||||||
.setAckMode(ContainerProperties.AckMode.RECORD);
|
|
||||||
factory.setContainerCustomizer(container -> {
|
|
||||||
DelayedMessageListenerAdapter<Object, Object> delayedAdapter = wrapWithDelayedMessageListenerAdapter(backOffManager, container);
|
|
||||||
delayedAdapter.setDelayForTopic("web.orders", Duration.ofSeconds(10));
|
|
||||||
delayedAdapter.setDefaultDelay(Duration.ZERO);
|
|
||||||
container.setupMessageListener(delayedAdapter);
|
|
||||||
});
|
|
||||||
return factory;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public ObjectMapper objectMapper() {
|
|
||||||
return new ObjectMapper().registerModule(new JavaTimeModule())
|
|
||||||
.configure(DeserializationFeature.READ_DATE_TIMESTAMPS_AS_NANOSECONDS, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public TaskScheduler taskScheduler() {
|
|
||||||
return new ThreadPoolTaskScheduler();
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private DelayedMessageListenerAdapter<Object, Object> wrapWithDelayedMessageListenerAdapter(KafkaConsumerBackoffManager backOffManager, ConcurrentMessageListenerContainer<Object, Object> container) {
|
|
||||||
return new DelayedMessageListenerAdapter<>((MessageListener<Object, Object>) container.getContainerProperties()
|
|
||||||
.getMessageListener(), backOffManager, container.getListenerId());
|
|
||||||
}
|
|
||||||
|
|
||||||
private ContainerPartitionPausingBackOffManager createBackOffManager(ListenerContainerRegistry registry, TaskScheduler scheduler) {
|
|
||||||
return new ContainerPartitionPausingBackOffManager(registry, new ContainerPausingBackOffHandler(new ListenerContainerPauseService(registry, scheduler)));
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,13 +0,0 @@
|
||||||
package com.baeldung.spring.kafka.delay;
|
|
||||||
|
|
||||||
import org.springframework.boot.SpringApplication;
|
|
||||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
|
||||||
import org.springframework.kafka.annotation.EnableKafka;
|
|
||||||
|
|
||||||
@EnableKafka
|
|
||||||
@SpringBootApplication
|
|
||||||
public class KafkaDelayApplication {
|
|
||||||
public static void main(String[] args) {
|
|
||||||
SpringApplication.run(KafkaDelayApplication.class, args);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,27 +0,0 @@
|
||||||
package com.baeldung.spring.kafka.delay;
|
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.Builder;
|
|
||||||
import lombok.Data;
|
|
||||||
import lombok.NoArgsConstructor;
|
|
||||||
|
|
||||||
@Data
|
|
||||||
@Builder
|
|
||||||
@AllArgsConstructor
|
|
||||||
@NoArgsConstructor
|
|
||||||
public class Order {
|
|
||||||
|
|
||||||
private UUID orderId;
|
|
||||||
|
|
||||||
private LocalDateTime orderGeneratedDateTime;
|
|
||||||
|
|
||||||
private LocalDateTime orderProcessedTime;
|
|
||||||
|
|
||||||
private List<String> address;
|
|
||||||
|
|
||||||
private double price;
|
|
||||||
}
|
|
|
@ -1,37 +0,0 @@
|
||||||
package com.baeldung.spring.kafka.delay;
|
|
||||||
|
|
||||||
import org.springframework.kafka.annotation.KafkaListener;
|
|
||||||
import org.springframework.kafka.annotation.RetryableTopic;
|
|
||||||
import org.springframework.kafka.listener.KafkaBackoffException;
|
|
||||||
import org.springframework.kafka.retrytopic.DltStrategy;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
|
|
||||||
import groovy.util.logging.Slf4j;
|
|
||||||
|
|
||||||
@Slf4j
|
|
||||||
@Component
|
|
||||||
public class OrderListener {
|
|
||||||
|
|
||||||
private final OrderService orderService;
|
|
||||||
|
|
||||||
private final ObjectMapper objectMapper;
|
|
||||||
|
|
||||||
public OrderListener(OrderService orderService, ObjectMapper objectMapper) {
|
|
||||||
this.orderService = orderService;
|
|
||||||
this.objectMapper = objectMapper;
|
|
||||||
}
|
|
||||||
|
|
||||||
@RetryableTopic(attempts = "1", include = KafkaBackoffException.class, dltStrategy = DltStrategy.NO_DLT)
|
|
||||||
@KafkaListener(topics = { "web.orders", "web.internal.orders" }, groupId = "orders")
|
|
||||||
public void handleOrders(String order) throws JsonProcessingException {
|
|
||||||
Order orderDetails = objectMapper.readValue(order, Order.class);
|
|
||||||
OrderService.Status orderStatus = orderService.findStatusById(orderDetails.getOrderId());
|
|
||||||
if (orderStatus.equals(OrderService.Status.ORDER_CONFIRMED)) {
|
|
||||||
orderService.processOrder(orderDetails);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,31 +0,0 @@
|
||||||
package com.baeldung.spring.kafka.delay;
|
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
@Service
|
|
||||||
public class OrderService {
|
|
||||||
|
|
||||||
HashMap<UUID, Order> orders = new HashMap<>();
|
|
||||||
|
|
||||||
public Status findStatusById(UUID orderId) {
|
|
||||||
return Status.ORDER_CONFIRMED;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void processOrder(Order order) {
|
|
||||||
order.setOrderProcessedTime(LocalDateTime.now());
|
|
||||||
orders.put(order.getOrderId(), order);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Map<UUID, Order> getOrders() {
|
|
||||||
return orders;
|
|
||||||
}
|
|
||||||
|
|
||||||
enum Status {
|
|
||||||
CREATED, ORDER_CONFIRMED, ORDER_PROCESSED, DELETED
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,116 +0,0 @@
|
||||||
package com.baeldung.spring.kafka.delay;
|
|
||||||
|
|
||||||
import static org.awaitility.Awaitility.await;
|
|
||||||
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Properties;
|
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
|
||||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
|
||||||
import org.apache.kafka.common.serialization.StringSerializer;
|
|
||||||
import org.junit.jupiter.api.BeforeAll;
|
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.boot.test.context.SpringBootTest;
|
|
||||||
import org.springframework.test.context.DynamicPropertyRegistry;
|
|
||||||
import org.springframework.test.context.DynamicPropertySource;
|
|
||||||
import org.testcontainers.containers.KafkaContainer;
|
|
||||||
import org.testcontainers.junit.jupiter.Container;
|
|
||||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
|
||||||
import org.testcontainers.utility.DockerImageName;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import com.fasterxml.jackson.databind.SerializationFeature;
|
|
||||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
|
||||||
|
|
||||||
@Testcontainers
|
|
||||||
@SpringBootTest(classes = KafkaDelayApplication.class)
|
|
||||||
class KafkaDelayIntegrationTest {
|
|
||||||
|
|
||||||
@Container
|
|
||||||
private static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
|
|
||||||
private static KafkaProducer<String, String> testKafkaProducer;
|
|
||||||
private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule())
|
|
||||||
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
OrderService orderService;
|
|
||||||
|
|
||||||
@DynamicPropertySource
|
|
||||||
static void setProps(DynamicPropertyRegistry registry) {
|
|
||||||
registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
|
|
||||||
}
|
|
||||||
|
|
||||||
@BeforeAll
|
|
||||||
static void beforeAll() {
|
|
||||||
Properties props = new Properties();
|
|
||||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
|
|
||||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
|
||||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
|
||||||
testKafkaProducer = new KafkaProducer<>(props);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
void givenKafkaBrokerExists_whenCreteOrderIsReceived_thenMessageShouldBeDelayed() throws Exception {
|
|
||||||
// Given
|
|
||||||
var orderId = UUID.randomUUID();
|
|
||||||
Order order = Order.builder()
|
|
||||||
.orderId(orderId)
|
|
||||||
.price(1.0)
|
|
||||||
.orderGeneratedDateTime(LocalDateTime.now())
|
|
||||||
.address(List.of("41 Felix Avenue, Luton"))
|
|
||||||
.build();
|
|
||||||
|
|
||||||
String orderString = objectMapper.writeValueAsString(order);
|
|
||||||
ProducerRecord<String, String> record = new ProducerRecord<>("web.orders", orderString);
|
|
||||||
// When
|
|
||||||
testKafkaProducer.send(record)
|
|
||||||
.get();
|
|
||||||
await().atMost(Duration.ofSeconds(1800))
|
|
||||||
.until(() -> {
|
|
||||||
// then
|
|
||||||
Map<UUID, Order> orders = orderService.getOrders();
|
|
||||||
return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId)
|
|
||||||
.getOrderGeneratedDateTime(), orders.get(orderId)
|
|
||||||
.getOrderProcessedTime())
|
|
||||||
.getSeconds() >= 10;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
void givenKafkaBrokerExists_whenCreteOrderIsReceivedForOtherTopics_thenMessageShouldNotBeDelayed() throws Exception {
|
|
||||||
// Given
|
|
||||||
var orderId = UUID.randomUUID();
|
|
||||||
Order order = Order.builder()
|
|
||||||
.orderId(orderId)
|
|
||||||
.price(1.0)
|
|
||||||
.orderGeneratedDateTime(LocalDateTime.now())
|
|
||||||
.address(List.of("41 Felix Avenue, Luton"))
|
|
||||||
.build();
|
|
||||||
|
|
||||||
String orderString = objectMapper.writeValueAsString(order);
|
|
||||||
ProducerRecord<String, String> record = new ProducerRecord<>("web.internal.orders", orderString);
|
|
||||||
// When
|
|
||||||
testKafkaProducer.send(record)
|
|
||||||
.get();
|
|
||||||
await().atMost(Duration.ofSeconds(1800))
|
|
||||||
.until(() -> {
|
|
||||||
// Then
|
|
||||||
Map<UUID, Order> orders = orderService.getOrders();
|
|
||||||
System.out.println("Time...." + Duration.between(orders.get(orderId)
|
|
||||||
.getOrderGeneratedDateTime(), orders.get(orderId)
|
|
||||||
.getOrderProcessedTime())
|
|
||||||
.getSeconds());
|
|
||||||
return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId)
|
|
||||||
.getOrderGeneratedDateTime(), orders.get(orderId)
|
|
||||||
.getOrderProcessedTime())
|
|
||||||
.getSeconds() <= 1;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
Loading…
Reference in New Issue