commit
fffce2e335
|
@ -0,0 +1,11 @@
|
||||||
|
package com.baeldung.springamqp.exponentialbackoff;
|
||||||
|
|
||||||
|
import org.springframework.boot.SpringApplication;
|
||||||
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
|
||||||
|
@SpringBootApplication
|
||||||
|
public class ExponentialBackoffApp {
|
||||||
|
public static void main(String[] args) {
|
||||||
|
SpringApplication.run(ExponentialBackoffApp.class, args);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,21 @@
|
||||||
|
package com.baeldung.springamqp.exponentialbackoff;
|
||||||
|
|
||||||
|
import org.springframework.amqp.core.Message;
|
||||||
|
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
|
||||||
|
|
||||||
|
public class ObservableRejectAndDontRequeueRecoverer extends RejectAndDontRequeueRecoverer {
|
||||||
|
private Runnable observer;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void recover(Message message, Throwable cause) {
|
||||||
|
if(observer != null) {
|
||||||
|
observer.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
super.recover(message, cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
void setObserver(Runnable observer){
|
||||||
|
this.observer = observer;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,162 @@
|
||||||
|
package com.baeldung.springamqp.exponentialbackoff;
|
||||||
|
|
||||||
|
import org.aopalliance.aop.Advice;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.amqp.core.AmqpAdmin;
|
||||||
|
import org.springframework.amqp.core.Message;
|
||||||
|
import org.springframework.amqp.core.MessageProperties;
|
||||||
|
import org.springframework.amqp.core.Queue;
|
||||||
|
import org.springframework.amqp.core.QueueBuilder;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
|
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
|
||||||
|
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||||
|
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
||||||
|
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
|
||||||
|
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
|
||||||
|
@EnableRabbit
|
||||||
|
@Configuration
|
||||||
|
public class RabbitConfiguration {
|
||||||
|
|
||||||
|
private static Logger logger = LoggerFactory.getLogger(RabbitConfiguration.class);
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConnectionFactory connectionFactory() {
|
||||||
|
return new CachingConnectionFactory("localhost");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public AmqpAdmin amqpAdmin() {
|
||||||
|
return new RabbitAdmin(connectionFactory());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public RabbitTemplate rabbitTemplate() {
|
||||||
|
return new RabbitTemplate(connectionFactory());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Queue blockingQueue() {
|
||||||
|
return QueueBuilder.nonDurable("blocking-queue")
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Queue nonBlockingQueue() {
|
||||||
|
return QueueBuilder.nonDurable("non-blocking-queue")
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Queue retryWaitEndedQueue() {
|
||||||
|
return QueueBuilder.nonDurable("retry-wait-ended-queue")
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Queue retryQueue1() {
|
||||||
|
return QueueBuilder.nonDurable("retry-queue-1")
|
||||||
|
.deadLetterExchange("")
|
||||||
|
.deadLetterRoutingKey("retry-wait-ended-queue")
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Queue retryQueue2() {
|
||||||
|
return QueueBuilder.nonDurable("retry-queue-2")
|
||||||
|
.deadLetterExchange("")
|
||||||
|
.deadLetterRoutingKey("retry-wait-ended-queue")
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Queue retryQueue3() {
|
||||||
|
return QueueBuilder.nonDurable("retry-queue-3")
|
||||||
|
.deadLetterExchange("")
|
||||||
|
.deadLetterRoutingKey("retry-wait-ended-queue")
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public RetryQueues retryQueues() {
|
||||||
|
return new RetryQueues(1000, 3.0, 10000, retryQueue1(), retryQueue2(), retryQueue3());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ObservableRejectAndDontRequeueRecoverer observableRecoverer() {
|
||||||
|
return new ObservableRejectAndDontRequeueRecoverer();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public RetryOperationsInterceptor retryInterceptor() {
|
||||||
|
return RetryInterceptorBuilder.stateless()
|
||||||
|
.backOffOptions(1000, 3.0, 10000)
|
||||||
|
.maxAttempts(5)
|
||||||
|
.recoverer(observableRecoverer())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public RetryQueuesInterceptor retryQueuesInterceptor(RabbitTemplate rabbitTemplate, RetryQueues retryQueues) {
|
||||||
|
return new RetryQueuesInterceptor(rabbitTemplate, retryQueues);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public SimpleRabbitListenerContainerFactory defaultContainerFactory(ConnectionFactory connectionFactory) {
|
||||||
|
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
||||||
|
factory.setConnectionFactory(connectionFactory);
|
||||||
|
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public SimpleRabbitListenerContainerFactory retryContainerFactory(ConnectionFactory connectionFactory, RetryOperationsInterceptor retryInterceptor) {
|
||||||
|
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
||||||
|
factory.setConnectionFactory(connectionFactory);
|
||||||
|
|
||||||
|
Advice[] adviceChain = { retryInterceptor };
|
||||||
|
factory.setAdviceChain(adviceChain);
|
||||||
|
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public SimpleRabbitListenerContainerFactory retryQueuesContainerFactory(ConnectionFactory connectionFactory, RetryQueuesInterceptor retryInterceptor) {
|
||||||
|
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
||||||
|
factory.setConnectionFactory(connectionFactory);
|
||||||
|
|
||||||
|
Advice[] adviceChain = { retryInterceptor };
|
||||||
|
factory.setAdviceChain(adviceChain);
|
||||||
|
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory")
|
||||||
|
public void consumeBlocking(String payload) throws Exception {
|
||||||
|
logger.info("Processing message from blocking-queue: {}", payload);
|
||||||
|
|
||||||
|
throw new Exception("exception occured!");
|
||||||
|
}
|
||||||
|
|
||||||
|
@RabbitListener(queues = "non-blocking-queue", containerFactory = "retryQueuesContainerFactory", ackMode = "MANUAL")
|
||||||
|
public void consumeNonBlocking(String payload) throws Exception {
|
||||||
|
logger.info("Processing message from non-blocking-queue: {}", payload);
|
||||||
|
|
||||||
|
throw new Exception("Error occured!");
|
||||||
|
}
|
||||||
|
|
||||||
|
@RabbitListener(queues = "retry-wait-ended-queue", containerFactory = "defaultContainerFactory")
|
||||||
|
public void consumeRetryWaitEndedMessage(String payload, Message message, Channel channel) throws Exception {
|
||||||
|
MessageProperties props = message.getMessageProperties();
|
||||||
|
|
||||||
|
rabbitTemplate().convertAndSend(props.getHeader("x-original-exchange"), props.getHeader("x-original-routing-key"), message);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
package com.baeldung.springamqp.exponentialbackoff;
|
||||||
|
|
||||||
|
import org.springframework.amqp.core.Queue;
|
||||||
|
|
||||||
|
public class RetryQueues {
|
||||||
|
private Queue[] queues;
|
||||||
|
private long initialInterval;
|
||||||
|
private double factor;
|
||||||
|
private long maxWait;
|
||||||
|
|
||||||
|
public RetryQueues(long initialInterval, double factor, long maxWait, Queue... queues) {
|
||||||
|
this.queues = queues;
|
||||||
|
this.initialInterval = initialInterval;
|
||||||
|
this.factor = factor;
|
||||||
|
this.maxWait = maxWait;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean retriesExhausted(int retry) {
|
||||||
|
return retry >= queues.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getQueueName(int retry) {
|
||||||
|
return queues[retry].getName();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getTimeToWait(int retry) {
|
||||||
|
double time = initialInterval * Math.pow(factor, (double) retry);
|
||||||
|
if (time > maxWait) {
|
||||||
|
return maxWait;
|
||||||
|
}
|
||||||
|
|
||||||
|
return (long) time;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,109 @@
|
||||||
|
package com.baeldung.springamqp.exponentialbackoff;
|
||||||
|
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
import com.rabbitmq.client.Channel;
|
||||||
|
|
||||||
|
import org.aopalliance.intercept.MethodInterceptor;
|
||||||
|
import org.aopalliance.intercept.MethodInvocation;
|
||||||
|
import org.springframework.amqp.core.Message;
|
||||||
|
import org.springframework.amqp.core.MessageProperties;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
|
|
||||||
|
public class RetryQueuesInterceptor implements MethodInterceptor {
|
||||||
|
|
||||||
|
private RabbitTemplate rabbitTemplate;
|
||||||
|
|
||||||
|
private RetryQueues retryQueues;
|
||||||
|
|
||||||
|
private Runnable observer;
|
||||||
|
|
||||||
|
public RetryQueuesInterceptor(RabbitTemplate rabbitTemplate, RetryQueues retryQueues) {
|
||||||
|
this.rabbitTemplate = rabbitTemplate;
|
||||||
|
this.retryQueues = retryQueues;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object invoke(MethodInvocation invocation) throws Throwable {
|
||||||
|
return tryConsume(invocation, this::ack, (mac, e) -> {
|
||||||
|
try {
|
||||||
|
int retryCount = tryGetRetryCountOrFail(mac, e);
|
||||||
|
sendToNextRetryQueue(mac, retryCount);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
if (observer != null) {
|
||||||
|
observer.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new RuntimeException(t);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void setObserver(Runnable observer) {
|
||||||
|
this.observer = observer;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Object tryConsume(MethodInvocation invocation, Consumer<MessageAndChannel> successHandler, BiConsumer<MessageAndChannel, Throwable> errorHandler) throws Throwable {
|
||||||
|
MessageAndChannel mac = new MessageAndChannel((Message) invocation.getArguments()[1], (Channel) invocation.getArguments()[0]);
|
||||||
|
Object ret = null;
|
||||||
|
try {
|
||||||
|
ret = invocation.proceed();
|
||||||
|
successHandler.accept(mac);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
errorHandler.accept(mac, e);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ack(MessageAndChannel mac) {
|
||||||
|
try {
|
||||||
|
mac.channel.basicAck(mac.message.getMessageProperties()
|
||||||
|
.getDeliveryTag(), false);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private int tryGetRetryCountOrFail(MessageAndChannel mac, Throwable originalError) throws Throwable {
|
||||||
|
MessageProperties props = mac.message.getMessageProperties();
|
||||||
|
|
||||||
|
String xRetriedCountHeader = (String) props.getHeader("x-retried-count");
|
||||||
|
final int xRetriedCount = xRetriedCountHeader == null ? 0 : Integer.valueOf(xRetriedCountHeader);
|
||||||
|
|
||||||
|
if (retryQueues.retriesExhausted(xRetriedCount)) {
|
||||||
|
mac.channel.basicReject(props.getDeliveryTag(), false);
|
||||||
|
|
||||||
|
throw originalError;
|
||||||
|
}
|
||||||
|
|
||||||
|
return xRetriedCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendToNextRetryQueue(MessageAndChannel mac, int retryCount) throws Exception {
|
||||||
|
String retryQueueName = retryQueues.getQueueName(retryCount);
|
||||||
|
|
||||||
|
rabbitTemplate.convertAndSend(retryQueueName, mac.message, m -> {
|
||||||
|
MessageProperties props = m.getMessageProperties();
|
||||||
|
props.setExpiration(String.valueOf(retryQueues.getTimeToWait(retryCount)));
|
||||||
|
props.setHeader("x-retried-count", String.valueOf(retryCount + 1));
|
||||||
|
props.setHeader("x-original-exchange", props.getReceivedExchange());
|
||||||
|
props.setHeader("x-original-routing-key", props.getReceivedRoutingKey());
|
||||||
|
|
||||||
|
return m;
|
||||||
|
});
|
||||||
|
|
||||||
|
mac.channel.basicReject(mac.message.getMessageProperties()
|
||||||
|
.getDeliveryTag(), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private class MessageAndChannel {
|
||||||
|
private Message message;
|
||||||
|
private Channel channel;
|
||||||
|
|
||||||
|
private MessageAndChannel(Message message, Channel channel) {
|
||||||
|
this.message = message;
|
||||||
|
this.channel = channel;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,58 @@
|
||||||
|
package com.baeldung.springamqp.exponentialbackoff;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.test.context.ContextConfiguration;
|
||||||
|
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This live test requires:
|
||||||
|
*
|
||||||
|
* - A running RabbitMQ instance on localhost (e.g. docker run -p 5672:5672 -p 15672:15672 --name rabbit rabbitmq:3-management)
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@RunWith(SpringJUnit4ClassRunner.class)
|
||||||
|
@ContextConfiguration(classes = { RabbitConfiguration.class })
|
||||||
|
public class ExponentialBackoffLiveTest {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private RabbitTemplate rabbitTemplate;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ObservableRejectAndDontRequeueRecoverer observableRecoverer;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private RetryQueuesInterceptor retryQueues;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenSendToBlockingQueue_thenAllMessagesProcessed() throws Exception {
|
||||||
|
int nb = 2;
|
||||||
|
|
||||||
|
CountDownLatch latch = new CountDownLatch(nb);
|
||||||
|
observableRecoverer.setObserver(() -> latch.countDown());
|
||||||
|
|
||||||
|
for (int i = 1; i <= nb; i++) {
|
||||||
|
rabbitTemplate.convertAndSend("blocking-queue", "blocking message " + i);
|
||||||
|
}
|
||||||
|
|
||||||
|
latch.await();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenSendToNonBlockingQueue_thenAllMessageProcessed() throws Exception {
|
||||||
|
int nb = 2;
|
||||||
|
|
||||||
|
CountDownLatch latch = new CountDownLatch(nb);
|
||||||
|
retryQueues.setObserver(() -> latch.countDown());
|
||||||
|
|
||||||
|
for (int i = 1; i <= nb; i++) {
|
||||||
|
rabbitTemplate.convertAndSend("non-blocking-queue", "non-blocking message " + i);
|
||||||
|
}
|
||||||
|
|
||||||
|
latch.await();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,6 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<configuration>
|
||||||
|
<include
|
||||||
|
resource="org/springframework/boot/logging/logback/base.xml" />
|
||||||
|
<logger name="org.springframework" level="INFO" />
|
||||||
|
</configuration>
|
Loading…
Reference in New Issue