From 8073a286b0385f64e504ed55e260b08ee5e01858 Mon Sep 17 00:00:00 2001 From: Mathieu Fortin Date: Tue, 25 Feb 2020 20:24:42 -0500 Subject: [PATCH] BAEL-3337 main code + live tests (#8777) --- .../ExponentialBackoffApp.java | 11 ++ ...servableRejectAndDontRequeueRecoverer.java | 21 +++ .../RabbitConfiguration.java | 162 ++++++++++++++++++ .../exponentialbackoff/RetryQueues.java | 34 ++++ .../RetryQueuesInterceptor.java | 109 ++++++++++++ .../ExponentialBackoffLiveTest.java | 58 +++++++ .../src/test/resources/logback-test.xml | 6 + 7 files changed, 401 insertions(+) create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/exponentialbackoff/ExponentialBackoffApp.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/exponentialbackoff/ObservableRejectAndDontRequeueRecoverer.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/exponentialbackoff/RabbitConfiguration.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/exponentialbackoff/RetryQueues.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/exponentialbackoff/RetryQueuesInterceptor.java create mode 100644 spring-amqp/src/test/java/com/baeldung/springamqp/exponentialbackoff/ExponentialBackoffLiveTest.java create mode 100644 spring-amqp/src/test/resources/logback-test.xml diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/exponentialbackoff/ExponentialBackoffApp.java b/spring-amqp/src/main/java/com/baeldung/springamqp/exponentialbackoff/ExponentialBackoffApp.java new file mode 100644 index 0000000000..182880cc7e --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/exponentialbackoff/ExponentialBackoffApp.java @@ -0,0 +1,11 @@ +package com.baeldung.springamqp.exponentialbackoff; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class ExponentialBackoffApp { + public static void main(String[] args) { + SpringApplication.run(ExponentialBackoffApp.class, args); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/exponentialbackoff/ObservableRejectAndDontRequeueRecoverer.java b/spring-amqp/src/main/java/com/baeldung/springamqp/exponentialbackoff/ObservableRejectAndDontRequeueRecoverer.java new file mode 100644 index 0000000000..700f7ab725 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/exponentialbackoff/ObservableRejectAndDontRequeueRecoverer.java @@ -0,0 +1,21 @@ +package com.baeldung.springamqp.exponentialbackoff; + +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer; + +public class ObservableRejectAndDontRequeueRecoverer extends RejectAndDontRequeueRecoverer { + private Runnable observer; + + @Override + public void recover(Message message, Throwable cause) { + if(observer != null) { + observer.run(); + } + + super.recover(message, cause); + } + + void setObserver(Runnable observer){ + this.observer = observer; + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/exponentialbackoff/RabbitConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/exponentialbackoff/RabbitConfiguration.java new file mode 100644 index 0000000000..2335ecbf58 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/exponentialbackoff/RabbitConfiguration.java @@ -0,0 +1,162 @@ +package com.baeldung.springamqp.exponentialbackoff; + +import org.aopalliance.aop.Advice; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.AmqpAdmin; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.amqp.rabbit.annotation.EnableRabbit; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.retry.interceptor.RetryOperationsInterceptor; + +import com.rabbitmq.client.Channel; + +@EnableRabbit +@Configuration +public class RabbitConfiguration { + + private static Logger logger = LoggerFactory.getLogger(RabbitConfiguration.class); + + @Bean + public ConnectionFactory connectionFactory() { + return new CachingConnectionFactory("localhost"); + } + + @Bean + public AmqpAdmin amqpAdmin() { + return new RabbitAdmin(connectionFactory()); + } + + @Bean + public RabbitTemplate rabbitTemplate() { + return new RabbitTemplate(connectionFactory()); + } + + @Bean + public Queue blockingQueue() { + return QueueBuilder.nonDurable("blocking-queue") + .build(); + } + + @Bean + public Queue nonBlockingQueue() { + return QueueBuilder.nonDurable("non-blocking-queue") + .build(); + } + + @Bean + public Queue retryWaitEndedQueue() { + return QueueBuilder.nonDurable("retry-wait-ended-queue") + .build(); + } + + @Bean + public Queue retryQueue1() { + return QueueBuilder.nonDurable("retry-queue-1") + .deadLetterExchange("") + .deadLetterRoutingKey("retry-wait-ended-queue") + .build(); + } + + @Bean + public Queue retryQueue2() { + return QueueBuilder.nonDurable("retry-queue-2") + .deadLetterExchange("") + .deadLetterRoutingKey("retry-wait-ended-queue") + .build(); + } + + @Bean + public Queue retryQueue3() { + return QueueBuilder.nonDurable("retry-queue-3") + .deadLetterExchange("") + .deadLetterRoutingKey("retry-wait-ended-queue") + .build(); + } + + @Bean + public RetryQueues retryQueues() { + return new RetryQueues(1000, 3.0, 10000, retryQueue1(), retryQueue2(), retryQueue3()); + } + + @Bean + public ObservableRejectAndDontRequeueRecoverer observableRecoverer() { + return new ObservableRejectAndDontRequeueRecoverer(); + } + + @Bean + public RetryOperationsInterceptor retryInterceptor() { + return RetryInterceptorBuilder.stateless() + .backOffOptions(1000, 3.0, 10000) + .maxAttempts(5) + .recoverer(observableRecoverer()) + .build(); + } + + @Bean + public RetryQueuesInterceptor retryQueuesInterceptor(RabbitTemplate rabbitTemplate, RetryQueues retryQueues) { + return new RetryQueuesInterceptor(rabbitTemplate, retryQueues); + } + + @Bean + public SimpleRabbitListenerContainerFactory defaultContainerFactory(ConnectionFactory connectionFactory) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + factory.setConnectionFactory(connectionFactory); + + return factory; + } + + @Bean + public SimpleRabbitListenerContainerFactory retryContainerFactory(ConnectionFactory connectionFactory, RetryOperationsInterceptor retryInterceptor) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + factory.setConnectionFactory(connectionFactory); + + Advice[] adviceChain = { retryInterceptor }; + factory.setAdviceChain(adviceChain); + + return factory; + } + + @Bean + public SimpleRabbitListenerContainerFactory retryQueuesContainerFactory(ConnectionFactory connectionFactory, RetryQueuesInterceptor retryInterceptor) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + factory.setConnectionFactory(connectionFactory); + + Advice[] adviceChain = { retryInterceptor }; + factory.setAdviceChain(adviceChain); + + return factory; + } + + @RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory") + public void consumeBlocking(String payload) throws Exception { + logger.info("Processing message from blocking-queue: {}", payload); + + throw new Exception("exception occured!"); + } + + @RabbitListener(queues = "non-blocking-queue", containerFactory = "retryQueuesContainerFactory", ackMode = "MANUAL") + public void consumeNonBlocking(String payload) throws Exception { + logger.info("Processing message from non-blocking-queue: {}", payload); + + throw new Exception("Error occured!"); + } + + @RabbitListener(queues = "retry-wait-ended-queue", containerFactory = "defaultContainerFactory") + public void consumeRetryWaitEndedMessage(String payload, Message message, Channel channel) throws Exception { + MessageProperties props = message.getMessageProperties(); + + rabbitTemplate().convertAndSend(props.getHeader("x-original-exchange"), props.getHeader("x-original-routing-key"), message); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/exponentialbackoff/RetryQueues.java b/spring-amqp/src/main/java/com/baeldung/springamqp/exponentialbackoff/RetryQueues.java new file mode 100644 index 0000000000..f52415b3e6 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/exponentialbackoff/RetryQueues.java @@ -0,0 +1,34 @@ +package com.baeldung.springamqp.exponentialbackoff; + +import org.springframework.amqp.core.Queue; + +public class RetryQueues { + private Queue[] queues; + private long initialInterval; + private double factor; + private long maxWait; + + public RetryQueues(long initialInterval, double factor, long maxWait, Queue... queues) { + this.queues = queues; + this.initialInterval = initialInterval; + this.factor = factor; + this.maxWait = maxWait; + } + + public boolean retriesExhausted(int retry) { + return retry >= queues.length; + } + + public String getQueueName(int retry) { + return queues[retry].getName(); + } + + public long getTimeToWait(int retry) { + double time = initialInterval * Math.pow(factor, (double) retry); + if (time > maxWait) { + return maxWait; + } + + return (long) time; + } +} \ No newline at end of file diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/exponentialbackoff/RetryQueuesInterceptor.java b/spring-amqp/src/main/java/com/baeldung/springamqp/exponentialbackoff/RetryQueuesInterceptor.java new file mode 100644 index 0000000000..f09a71df7c --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/exponentialbackoff/RetryQueuesInterceptor.java @@ -0,0 +1,109 @@ +package com.baeldung.springamqp.exponentialbackoff; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import com.rabbitmq.client.Channel; + +import org.aopalliance.intercept.MethodInterceptor; +import org.aopalliance.intercept.MethodInvocation; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +public class RetryQueuesInterceptor implements MethodInterceptor { + + private RabbitTemplate rabbitTemplate; + + private RetryQueues retryQueues; + + private Runnable observer; + + public RetryQueuesInterceptor(RabbitTemplate rabbitTemplate, RetryQueues retryQueues) { + this.rabbitTemplate = rabbitTemplate; + this.retryQueues = retryQueues; + } + + @Override + public Object invoke(MethodInvocation invocation) throws Throwable { + return tryConsume(invocation, this::ack, (mac, e) -> { + try { + int retryCount = tryGetRetryCountOrFail(mac, e); + sendToNextRetryQueue(mac, retryCount); + } catch (Throwable t) { + if (observer != null) { + observer.run(); + } + + throw new RuntimeException(t); + } + }); + } + + void setObserver(Runnable observer) { + this.observer = observer; + } + + private Object tryConsume(MethodInvocation invocation, Consumer successHandler, BiConsumer errorHandler) throws Throwable { + MessageAndChannel mac = new MessageAndChannel((Message) invocation.getArguments()[1], (Channel) invocation.getArguments()[0]); + Object ret = null; + try { + ret = invocation.proceed(); + successHandler.accept(mac); + } catch (Throwable e) { + errorHandler.accept(mac, e); + } + return ret; + } + + private void ack(MessageAndChannel mac) { + try { + mac.channel.basicAck(mac.message.getMessageProperties() + .getDeliveryTag(), false); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private int tryGetRetryCountOrFail(MessageAndChannel mac, Throwable originalError) throws Throwable { + MessageProperties props = mac.message.getMessageProperties(); + + String xRetriedCountHeader = (String) props.getHeader("x-retried-count"); + final int xRetriedCount = xRetriedCountHeader == null ? 0 : Integer.valueOf(xRetriedCountHeader); + + if (retryQueues.retriesExhausted(xRetriedCount)) { + mac.channel.basicReject(props.getDeliveryTag(), false); + + throw originalError; + } + + return xRetriedCount; + } + + private void sendToNextRetryQueue(MessageAndChannel mac, int retryCount) throws Exception { + String retryQueueName = retryQueues.getQueueName(retryCount); + + rabbitTemplate.convertAndSend(retryQueueName, mac.message, m -> { + MessageProperties props = m.getMessageProperties(); + props.setExpiration(String.valueOf(retryQueues.getTimeToWait(retryCount))); + props.setHeader("x-retried-count", String.valueOf(retryCount + 1)); + props.setHeader("x-original-exchange", props.getReceivedExchange()); + props.setHeader("x-original-routing-key", props.getReceivedRoutingKey()); + + return m; + }); + + mac.channel.basicReject(mac.message.getMessageProperties() + .getDeliveryTag(), false); + } + + private class MessageAndChannel { + private Message message; + private Channel channel; + + private MessageAndChannel(Message message, Channel channel) { + this.message = message; + this.channel = channel; + } + } +} \ No newline at end of file diff --git a/spring-amqp/src/test/java/com/baeldung/springamqp/exponentialbackoff/ExponentialBackoffLiveTest.java b/spring-amqp/src/test/java/com/baeldung/springamqp/exponentialbackoff/ExponentialBackoffLiveTest.java new file mode 100644 index 0000000000..04bd463a72 --- /dev/null +++ b/spring-amqp/src/test/java/com/baeldung/springamqp/exponentialbackoff/ExponentialBackoffLiveTest.java @@ -0,0 +1,58 @@ +package com.baeldung.springamqp.exponentialbackoff; + +import java.util.concurrent.CountDownLatch; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +/** + * This live test requires: + * + * - A running RabbitMQ instance on localhost (e.g. docker run -p 5672:5672 -p 15672:15672 --name rabbit rabbitmq:3-management) + * + */ +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = { RabbitConfiguration.class }) +public class ExponentialBackoffLiveTest { + + @Autowired + private RabbitTemplate rabbitTemplate; + + @Autowired + private ObservableRejectAndDontRequeueRecoverer observableRecoverer; + + @Autowired + private RetryQueuesInterceptor retryQueues; + + @Test + public void whenSendToBlockingQueue_thenAllMessagesProcessed() throws Exception { + int nb = 2; + + CountDownLatch latch = new CountDownLatch(nb); + observableRecoverer.setObserver(() -> latch.countDown()); + + for (int i = 1; i <= nb; i++) { + rabbitTemplate.convertAndSend("blocking-queue", "blocking message " + i); + } + + latch.await(); + } + + @Test + public void whenSendToNonBlockingQueue_thenAllMessageProcessed() throws Exception { + int nb = 2; + + CountDownLatch latch = new CountDownLatch(nb); + retryQueues.setObserver(() -> latch.countDown()); + + for (int i = 1; i <= nb; i++) { + rabbitTemplate.convertAndSend("non-blocking-queue", "non-blocking message " + i); + } + + latch.await(); + } +} diff --git a/spring-amqp/src/test/resources/logback-test.xml b/spring-amqp/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..7eba9aea8b --- /dev/null +++ b/spring-amqp/src/test/resources/logback-test.xml @@ -0,0 +1,6 @@ + + + + +