BAEL-3337 main code + live tests (#8777)

This commit is contained in:
Mathieu Fortin 2020-02-25 20:24:42 -05:00 committed by GitHub
parent 0df2b1bb4b
commit 8073a286b0
7 changed files with 401 additions and 0 deletions

View File

@ -0,0 +1,11 @@
package com.baeldung.springamqp.exponentialbackoff;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ExponentialBackoffApp {
public static void main(String[] args) {
SpringApplication.run(ExponentialBackoffApp.class, args);
}
}

View File

@ -0,0 +1,21 @@
package com.baeldung.springamqp.exponentialbackoff;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
public class ObservableRejectAndDontRequeueRecoverer extends RejectAndDontRequeueRecoverer {
private Runnable observer;
@Override
public void recover(Message message, Throwable cause) {
if(observer != null) {
observer.run();
}
super.recover(message, cause);
}
void setObserver(Runnable observer){
this.observer = observer;
}
}

View File

@ -0,0 +1,162 @@
package com.baeldung.springamqp.exponentialbackoff;
import org.aopalliance.aop.Advice;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
import com.rabbitmq.client.Channel;
@EnableRabbit
@Configuration
public class RabbitConfiguration {
private static Logger logger = LoggerFactory.getLogger(RabbitConfiguration.class);
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue blockingQueue() {
return QueueBuilder.nonDurable("blocking-queue")
.build();
}
@Bean
public Queue nonBlockingQueue() {
return QueueBuilder.nonDurable("non-blocking-queue")
.build();
}
@Bean
public Queue retryWaitEndedQueue() {
return QueueBuilder.nonDurable("retry-wait-ended-queue")
.build();
}
@Bean
public Queue retryQueue1() {
return QueueBuilder.nonDurable("retry-queue-1")
.deadLetterExchange("")
.deadLetterRoutingKey("retry-wait-ended-queue")
.build();
}
@Bean
public Queue retryQueue2() {
return QueueBuilder.nonDurable("retry-queue-2")
.deadLetterExchange("")
.deadLetterRoutingKey("retry-wait-ended-queue")
.build();
}
@Bean
public Queue retryQueue3() {
return QueueBuilder.nonDurable("retry-queue-3")
.deadLetterExchange("")
.deadLetterRoutingKey("retry-wait-ended-queue")
.build();
}
@Bean
public RetryQueues retryQueues() {
return new RetryQueues(1000, 3.0, 10000, retryQueue1(), retryQueue2(), retryQueue3());
}
@Bean
public ObservableRejectAndDontRequeueRecoverer observableRecoverer() {
return new ObservableRejectAndDontRequeueRecoverer();
}
@Bean
public RetryOperationsInterceptor retryInterceptor() {
return RetryInterceptorBuilder.stateless()
.backOffOptions(1000, 3.0, 10000)
.maxAttempts(5)
.recoverer(observableRecoverer())
.build();
}
@Bean
public RetryQueuesInterceptor retryQueuesInterceptor(RabbitTemplate rabbitTemplate, RetryQueues retryQueues) {
return new RetryQueuesInterceptor(rabbitTemplate, retryQueues);
}
@Bean
public SimpleRabbitListenerContainerFactory defaultContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory retryContainerFactory(ConnectionFactory connectionFactory, RetryOperationsInterceptor retryInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
Advice[] adviceChain = { retryInterceptor };
factory.setAdviceChain(adviceChain);
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory retryQueuesContainerFactory(ConnectionFactory connectionFactory, RetryQueuesInterceptor retryInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
Advice[] adviceChain = { retryInterceptor };
factory.setAdviceChain(adviceChain);
return factory;
}
@RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory")
public void consumeBlocking(String payload) throws Exception {
logger.info("Processing message from blocking-queue: {}", payload);
throw new Exception("exception occured!");
}
@RabbitListener(queues = "non-blocking-queue", containerFactory = "retryQueuesContainerFactory", ackMode = "MANUAL")
public void consumeNonBlocking(String payload) throws Exception {
logger.info("Processing message from non-blocking-queue: {}", payload);
throw new Exception("Error occured!");
}
@RabbitListener(queues = "retry-wait-ended-queue", containerFactory = "defaultContainerFactory")
public void consumeRetryWaitEndedMessage(String payload, Message message, Channel channel) throws Exception {
MessageProperties props = message.getMessageProperties();
rabbitTemplate().convertAndSend(props.getHeader("x-original-exchange"), props.getHeader("x-original-routing-key"), message);
}
}

View File

@ -0,0 +1,34 @@
package com.baeldung.springamqp.exponentialbackoff;
import org.springframework.amqp.core.Queue;
public class RetryQueues {
private Queue[] queues;
private long initialInterval;
private double factor;
private long maxWait;
public RetryQueues(long initialInterval, double factor, long maxWait, Queue... queues) {
this.queues = queues;
this.initialInterval = initialInterval;
this.factor = factor;
this.maxWait = maxWait;
}
public boolean retriesExhausted(int retry) {
return retry >= queues.length;
}
public String getQueueName(int retry) {
return queues[retry].getName();
}
public long getTimeToWait(int retry) {
double time = initialInterval * Math.pow(factor, (double) retry);
if (time > maxWait) {
return maxWait;
}
return (long) time;
}
}

View File

@ -0,0 +1,109 @@
package com.baeldung.springamqp.exponentialbackoff;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import com.rabbitmq.client.Channel;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
public class RetryQueuesInterceptor implements MethodInterceptor {
private RabbitTemplate rabbitTemplate;
private RetryQueues retryQueues;
private Runnable observer;
public RetryQueuesInterceptor(RabbitTemplate rabbitTemplate, RetryQueues retryQueues) {
this.rabbitTemplate = rabbitTemplate;
this.retryQueues = retryQueues;
}
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
return tryConsume(invocation, this::ack, (mac, e) -> {
try {
int retryCount = tryGetRetryCountOrFail(mac, e);
sendToNextRetryQueue(mac, retryCount);
} catch (Throwable t) {
if (observer != null) {
observer.run();
}
throw new RuntimeException(t);
}
});
}
void setObserver(Runnable observer) {
this.observer = observer;
}
private Object tryConsume(MethodInvocation invocation, Consumer<MessageAndChannel> successHandler, BiConsumer<MessageAndChannel, Throwable> errorHandler) throws Throwable {
MessageAndChannel mac = new MessageAndChannel((Message) invocation.getArguments()[1], (Channel) invocation.getArguments()[0]);
Object ret = null;
try {
ret = invocation.proceed();
successHandler.accept(mac);
} catch (Throwable e) {
errorHandler.accept(mac, e);
}
return ret;
}
private void ack(MessageAndChannel mac) {
try {
mac.channel.basicAck(mac.message.getMessageProperties()
.getDeliveryTag(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private int tryGetRetryCountOrFail(MessageAndChannel mac, Throwable originalError) throws Throwable {
MessageProperties props = mac.message.getMessageProperties();
String xRetriedCountHeader = (String) props.getHeader("x-retried-count");
final int xRetriedCount = xRetriedCountHeader == null ? 0 : Integer.valueOf(xRetriedCountHeader);
if (retryQueues.retriesExhausted(xRetriedCount)) {
mac.channel.basicReject(props.getDeliveryTag(), false);
throw originalError;
}
return xRetriedCount;
}
private void sendToNextRetryQueue(MessageAndChannel mac, int retryCount) throws Exception {
String retryQueueName = retryQueues.getQueueName(retryCount);
rabbitTemplate.convertAndSend(retryQueueName, mac.message, m -> {
MessageProperties props = m.getMessageProperties();
props.setExpiration(String.valueOf(retryQueues.getTimeToWait(retryCount)));
props.setHeader("x-retried-count", String.valueOf(retryCount + 1));
props.setHeader("x-original-exchange", props.getReceivedExchange());
props.setHeader("x-original-routing-key", props.getReceivedRoutingKey());
return m;
});
mac.channel.basicReject(mac.message.getMessageProperties()
.getDeliveryTag(), false);
}
private class MessageAndChannel {
private Message message;
private Channel channel;
private MessageAndChannel(Message message, Channel channel) {
this.message = message;
this.channel = channel;
}
}
}

View File

@ -0,0 +1,58 @@
package com.baeldung.springamqp.exponentialbackoff;
import java.util.concurrent.CountDownLatch;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* This live test requires:
*
* - A running RabbitMQ instance on localhost (e.g. docker run -p 5672:5672 -p 15672:15672 --name rabbit rabbitmq:3-management)
*
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { RabbitConfiguration.class })
public class ExponentialBackoffLiveTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ObservableRejectAndDontRequeueRecoverer observableRecoverer;
@Autowired
private RetryQueuesInterceptor retryQueues;
@Test
public void whenSendToBlockingQueue_thenAllMessagesProcessed() throws Exception {
int nb = 2;
CountDownLatch latch = new CountDownLatch(nb);
observableRecoverer.setObserver(() -> latch.countDown());
for (int i = 1; i <= nb; i++) {
rabbitTemplate.convertAndSend("blocking-queue", "blocking message " + i);
}
latch.await();
}
@Test
public void whenSendToNonBlockingQueue_thenAllMessageProcessed() throws Exception {
int nb = 2;
CountDownLatch latch = new CountDownLatch(nb);
retryQueues.setObserver(() -> latch.countDown());
for (int i = 1; i <= nb; i++) {
rabbitTemplate.convertAndSend("non-blocking-queue", "non-blocking message " + i);
}
latch.await();
}
}

View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include
resource="org/springframework/boot/logging/logback/base.xml" />
<logger name="org.springframework" level="INFO" />
</configuration>