Merge pull request #8035 from CROSP/BAEL-3200
BAEL-3200 Error handling with Spring AMQP
This commit is contained in:
commit
be6021a4af
|
@ -0,0 +1,26 @@
|
|||
package com.baeldung.springamqp.errorhandling;
|
||||
|
||||
import com.baeldung.springamqp.errorhandling.producer.MessageProducer;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
@SpringBootApplication
|
||||
@EnableScheduling
|
||||
public class ErrorHandlingApp {
|
||||
|
||||
@Autowired
|
||||
MessageProducer messageProducer;
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ErrorHandlingApp.class, args);
|
||||
}
|
||||
|
||||
@EventListener(ApplicationReadyEvent.class)
|
||||
public void doSomethingAfterStartup() {
|
||||
messageProducer.sendMessage();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
package com.baeldung.springamqp.errorhandling.configuration;
|
||||
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.DirectExchange;
|
||||
import org.springframework.amqp.core.FanoutExchange;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.amqp.core.QueueBuilder;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnProperty(
|
||||
value = "amqp.configuration.current",
|
||||
havingValue = "dlx-custom")
|
||||
public class DLXCustomAmqpConfiguration {
|
||||
public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
|
||||
|
||||
@Bean
|
||||
Queue messagesQueue() {
|
||||
return QueueBuilder.durable(QUEUE_MESSAGES)
|
||||
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
FanoutExchange deadLetterExchange() {
|
||||
return new FanoutExchange(DLX_EXCHANGE_MESSAGES);
|
||||
}
|
||||
|
||||
@Bean
|
||||
Queue deadLetterQueue() {
|
||||
return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
Binding deadLetterBinding() {
|
||||
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
|
||||
}
|
||||
|
||||
@Bean
|
||||
DirectExchange messagesExchange() {
|
||||
return new DirectExchange(EXCHANGE_MESSAGES);
|
||||
}
|
||||
|
||||
@Bean
|
||||
Binding bindingMessages() {
|
||||
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
package com.baeldung.springamqp.errorhandling.configuration;
|
||||
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.DirectExchange;
|
||||
import org.springframework.amqp.core.FanoutExchange;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.amqp.core.QueueBuilder;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnProperty(
|
||||
value = "amqp.configuration.current",
|
||||
havingValue = "parking-lot-dlx")
|
||||
public class DLXParkingLotAmqpConfiguration {
|
||||
public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
|
||||
public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot";
|
||||
public static final String EXCHANGE_PARKING_LOT = QUEUE_MESSAGES + "exchange.parking-lot";
|
||||
|
||||
@Bean
|
||||
FanoutExchange parkingLotExchange() {
|
||||
return new FanoutExchange(EXCHANGE_PARKING_LOT);
|
||||
}
|
||||
|
||||
@Bean
|
||||
Queue parkingLotQueue() {
|
||||
return QueueBuilder.durable(QUEUE_PARKING_LOT).build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
Binding parkingLotBinding() {
|
||||
return BindingBuilder.bind(parkingLotQueue()).to(parkingLotExchange());
|
||||
}
|
||||
|
||||
@Bean
|
||||
Queue messagesQueue() {
|
||||
return QueueBuilder.durable(QUEUE_MESSAGES)
|
||||
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
FanoutExchange deadLetterExchange() {
|
||||
return new FanoutExchange(DLX_EXCHANGE_MESSAGES);
|
||||
}
|
||||
|
||||
@Bean
|
||||
Queue deadLetterQueue() {
|
||||
return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
Binding deadLetterBinding() {
|
||||
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
|
||||
}
|
||||
|
||||
@Bean
|
||||
DirectExchange messagesExchange() {
|
||||
return new DirectExchange(EXCHANGE_MESSAGES);
|
||||
}
|
||||
|
||||
@Bean
|
||||
Binding bindingMessages() {
|
||||
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,63 @@
|
|||
package com.baeldung.springamqp.errorhandling.configuration;
|
||||
|
||||
import com.baeldung.springamqp.errorhandling.errorhandler.CustomFatalExceptionStrategy;
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.DirectExchange;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.amqp.core.QueueBuilder;
|
||||
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
|
||||
import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy;
|
||||
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.util.ErrorHandler;
|
||||
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnProperty(
|
||||
value = "amqp.configuration.current",
|
||||
havingValue = "fatal-error-strategy")
|
||||
public class FatalExceptionStrategyAmqpConfiguration {
|
||||
|
||||
@Bean
|
||||
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
|
||||
ConnectionFactory connectionFactory,
|
||||
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
|
||||
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
||||
configurer.configure(factory, connectionFactory);
|
||||
factory.setErrorHandler(errorHandler());
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ErrorHandler errorHandler() {
|
||||
return new ConditionalRejectingErrorHandler(customExceptionStrategy());
|
||||
}
|
||||
|
||||
@Bean
|
||||
FatalExceptionStrategy customExceptionStrategy() {
|
||||
return new CustomFatalExceptionStrategy();
|
||||
}
|
||||
|
||||
@Bean
|
||||
Queue messagesQueue() {
|
||||
return QueueBuilder.durable(QUEUE_MESSAGES)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
DirectExchange messagesExchange() {
|
||||
return new DirectExchange(EXCHANGE_MESSAGES);
|
||||
}
|
||||
|
||||
@Bean
|
||||
Binding bindingMessages() {
|
||||
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
package com.baeldung.springamqp.errorhandling.configuration;
|
||||
|
||||
import com.baeldung.springamqp.errorhandling.errorhandler.CustomErrorHandler;
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.DirectExchange;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.amqp.core.QueueBuilder;
|
||||
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.util.ErrorHandler;
|
||||
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnProperty(
|
||||
value = "amqp.configuration.current",
|
||||
havingValue = "listener-error")
|
||||
public class ListenerErrorHandlerAmqpConfiguration {
|
||||
|
||||
@Bean
|
||||
public ErrorHandler errorHandler() {
|
||||
return new CustomErrorHandler();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
|
||||
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
|
||||
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
||||
configurer.configure(factory, connectionFactory);
|
||||
factory.setErrorHandler(errorHandler());
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
Queue messagesQueue() {
|
||||
return QueueBuilder.durable(QUEUE_MESSAGES)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
DirectExchange messagesExchange() {
|
||||
return new DirectExchange(EXCHANGE_MESSAGES);
|
||||
}
|
||||
|
||||
@Bean
|
||||
Binding bindingMessages() {
|
||||
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
package com.baeldung.springamqp.errorhandling.configuration;
|
||||
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.DirectExchange;
|
||||
import org.springframework.amqp.core.FanoutExchange;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.amqp.core.QueueBuilder;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnProperty(
|
||||
value = "amqp.configuration.current",
|
||||
havingValue = "routing-dlq")
|
||||
public class RoutingKeyDLQAmqpConfiguration {
|
||||
public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
|
||||
|
||||
@Bean
|
||||
Queue messagesQueue() {
|
||||
return QueueBuilder.durable(QUEUE_MESSAGES)
|
||||
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
FanoutExchange deadLetterExchange() {
|
||||
return new FanoutExchange(DLX_EXCHANGE_MESSAGES);
|
||||
}
|
||||
|
||||
@Bean
|
||||
Queue deadLetterQueue() {
|
||||
return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
Binding deadLetterBinding() {
|
||||
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
|
||||
}
|
||||
|
||||
@Bean
|
||||
DirectExchange messagesExchange() {
|
||||
return new DirectExchange(EXCHANGE_MESSAGES);
|
||||
}
|
||||
|
||||
@Bean
|
||||
Binding bindingMessages() {
|
||||
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
package com.baeldung.springamqp.errorhandling.configuration;
|
||||
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.DirectExchange;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.amqp.core.QueueBuilder;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnProperty(
|
||||
value = "amqp.configuration.current",
|
||||
havingValue = "simple-dlq")
|
||||
public class SimpleDLQAmqpConfiguration {
|
||||
public static final String QUEUE_MESSAGES = "baeldung-messages-queue";
|
||||
public static final String QUEUE_MESSAGES_DLQ = QUEUE_MESSAGES + ".dlq";
|
||||
public static final String EXCHANGE_MESSAGES = "baeldung-messages-exchange";
|
||||
|
||||
@Bean
|
||||
Queue messagesQueue() {
|
||||
return QueueBuilder.durable(QUEUE_MESSAGES)
|
||||
.withArgument("x-dead-letter-exchange", "")
|
||||
.withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
Queue deadLetterQueue() {
|
||||
return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
DirectExchange messagesExchange() {
|
||||
return new DirectExchange(EXCHANGE_MESSAGES);
|
||||
}
|
||||
|
||||
@Bean
|
||||
Binding bindingMessages() {
|
||||
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
package com.baeldung.springamqp.errorhandling.consumer;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
|
||||
import static com.baeldung.springamqp.errorhandling.consumer.MessagesConsumer.HEADER_X_RETRIES_COUNT;
|
||||
|
||||
public class DLQCustomAmqpContainer {
|
||||
private static final Logger log = LoggerFactory.getLogger(DLQCustomAmqpContainer.class);
|
||||
private final RabbitTemplate rabbitTemplate;
|
||||
public static final int MAX_RETRIES_COUNT = 2;
|
||||
|
||||
public DLQCustomAmqpContainer(RabbitTemplate rabbitTemplate) {
|
||||
this.rabbitTemplate = rabbitTemplate;
|
||||
}
|
||||
|
||||
@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
|
||||
public void processFailedMessagesRetryHeaders(Message failedMessage) {
|
||||
Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT);
|
||||
if (retriesCnt == null)
|
||||
retriesCnt = 1;
|
||||
if (retriesCnt > MAX_RETRIES_COUNT) {
|
||||
log.info("Discarding message");
|
||||
return;
|
||||
}
|
||||
log.info("Retrying message for the {} time", retriesCnt);
|
||||
failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
|
||||
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,63 @@
|
|||
package com.baeldung.springamqp.errorhandling.consumer;
|
||||
|
||||
import com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration;
|
||||
import com.baeldung.springamqp.errorhandling.errorhandler.BusinessException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
public class MessagesConsumer {
|
||||
public static final String HEADER_X_RETRIES_COUNT = "x-retries-count";
|
||||
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MessagesConsumer.class);
|
||||
private final RabbitTemplate rabbitTemplate;
|
||||
|
||||
public MessagesConsumer(RabbitTemplate rabbitTemplate) {
|
||||
this.rabbitTemplate = rabbitTemplate;
|
||||
}
|
||||
|
||||
@RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES)
|
||||
public void receiveMessage(Message message) throws BusinessException {
|
||||
log.info("Received message: {}", message.toString());
|
||||
throw new BusinessException();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(
|
||||
value = "amqp.configuration.current",
|
||||
havingValue = "simple-dlq")
|
||||
public SimpleDLQAmqpContainer simpleAmqpContainer() {
|
||||
return new SimpleDLQAmqpContainer(rabbitTemplate);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(
|
||||
value = "amqp.configuration.current",
|
||||
havingValue = "routing-dlq")
|
||||
public RoutingDLQAmqpContainer routingDLQAmqpContainer() {
|
||||
return new RoutingDLQAmqpContainer(rabbitTemplate);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(
|
||||
value = "amqp.configuration.current",
|
||||
havingValue = "dlx-custom")
|
||||
public DLQCustomAmqpContainer dlqAmqpContainer() {
|
||||
return new DLQCustomAmqpContainer(rabbitTemplate);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(
|
||||
value = "amqp.configuration.current",
|
||||
havingValue = "parking-lot-dlx")
|
||||
public ParkingLotDLQAmqpContainer parkingLotDLQAmqpContainer() {
|
||||
return new ParkingLotDLQAmqpContainer(rabbitTemplate);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
package com.baeldung.springamqp.errorhandling.consumer;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.DLXParkingLotAmqpConfiguration.EXCHANGE_PARKING_LOT;
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.DLXParkingLotAmqpConfiguration.QUEUE_PARKING_LOT;
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
|
||||
import static com.baeldung.springamqp.errorhandling.consumer.MessagesConsumer.HEADER_X_RETRIES_COUNT;
|
||||
|
||||
public class ParkingLotDLQAmqpContainer {
|
||||
private static final Logger log = LoggerFactory.getLogger(ParkingLotDLQAmqpContainer.class);
|
||||
private final RabbitTemplate rabbitTemplate;
|
||||
public static final int MAX_RETRIES_COUNT = 2;
|
||||
|
||||
public ParkingLotDLQAmqpContainer(RabbitTemplate rabbitTemplate) {
|
||||
this.rabbitTemplate = rabbitTemplate;
|
||||
}
|
||||
|
||||
@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
|
||||
public void processFailedMessagesRetryWithParkingLot(Message failedMessage) {
|
||||
Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT);
|
||||
if (retriesCnt == null)
|
||||
retriesCnt = 1;
|
||||
if (retriesCnt > MAX_RETRIES_COUNT) {
|
||||
log.info("Sending message to the parking lot queue");
|
||||
rabbitTemplate.send(EXCHANGE_PARKING_LOT, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
|
||||
return;
|
||||
}
|
||||
log.info("Retrying message for the {} time", retriesCnt);
|
||||
failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
|
||||
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
|
||||
}
|
||||
|
||||
@RabbitListener(queues = QUEUE_PARKING_LOT)
|
||||
public void processParkingLotQueue(Message failedMessage) {
|
||||
log.info("Received message in parking lot queue {}", failedMessage.toString());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
package com.baeldung.springamqp.errorhandling.consumer;
|
||||
|
||||
import com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration;
|
||||
import com.baeldung.springamqp.errorhandling.errorhandler.BusinessException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
|
||||
|
||||
public class RoutingDLQAmqpContainer {
|
||||
private static final Logger log = LoggerFactory.getLogger(RoutingDLQAmqpContainer.class);
|
||||
private final RabbitTemplate rabbitTemplate;
|
||||
|
||||
public RoutingDLQAmqpContainer(RabbitTemplate rabbitTemplate) {
|
||||
this.rabbitTemplate = rabbitTemplate;
|
||||
}
|
||||
|
||||
@RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES)
|
||||
public void receiveMessage(Message message) throws BusinessException {
|
||||
log.info("Received message: {}", message.toString());
|
||||
throw new BusinessException();
|
||||
}
|
||||
|
||||
@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
|
||||
public void processFailedMessagesRequeue(Message failedMessage) {
|
||||
log.info("Received failed message, requeueing: {}", failedMessage.toString());
|
||||
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
package com.baeldung.springamqp.errorhandling.consumer;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
|
||||
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
|
||||
|
||||
public class SimpleDLQAmqpContainer {
|
||||
private static final Logger log = LoggerFactory.getLogger(SimpleDLQAmqpContainer.class);
|
||||
private final RabbitTemplate rabbitTemplate;
|
||||
|
||||
public SimpleDLQAmqpContainer(RabbitTemplate rabbitTemplate) {
|
||||
this.rabbitTemplate = rabbitTemplate;
|
||||
}
|
||||
|
||||
@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
|
||||
public void processFailedMessages(Message message) {
|
||||
log.info("Received failed message: {}", message.toString());
|
||||
}
|
||||
|
||||
@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
|
||||
public void processFailedMessagesRequeue(Message failedMessage) {
|
||||
log.info("Received failed message, requeueing: {}", failedMessage.toString());
|
||||
log.info("Received failed message, requeueing: {}", failedMessage.getMessageProperties().getReceivedRoutingKey());
|
||||
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,4 @@
|
|||
package com.baeldung.springamqp.errorhandling.errorhandler;
|
||||
|
||||
public class BusinessException extends Exception {
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
package com.baeldung.springamqp.errorhandling.errorhandler;
|
||||
|
||||
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
|
||||
import org.springframework.util.ErrorHandler;
|
||||
|
||||
public class CustomErrorHandler implements ErrorHandler {
|
||||
|
||||
@Override
|
||||
public void handleError(Throwable t) {
|
||||
if (!(t.getCause() instanceof BusinessException)) {
|
||||
throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
package com.baeldung.springamqp.errorhandling.errorhandler;
|
||||
|
||||
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
|
||||
|
||||
public class CustomFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
|
||||
@Override
|
||||
public boolean isFatal(Throwable t) {
|
||||
return !(t.getCause() instanceof BusinessException);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
package com.baeldung.springamqp.errorhandling.producer;
|
||||
|
||||
import com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
public class MessageProducer {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MessageProducer.class);
|
||||
private int messageNumber = 0;
|
||||
private final RabbitTemplate rabbitTemplate;
|
||||
|
||||
public MessageProducer(final RabbitTemplate rabbitTemplate) {
|
||||
this.rabbitTemplate = rabbitTemplate;
|
||||
}
|
||||
|
||||
public void sendMessage() {
|
||||
log.info("Sending message...");
|
||||
rabbitTemplate.convertAndSend(SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES, SimpleDLQAmqpConfiguration.QUEUE_MESSAGES, "Some message id:" + messageNumber++);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,3 @@
|
|||
spring.rabbitmq.listener.simple.default-requeue-rejected=false
|
||||
spring.main.allow-bean-definition-overriding=true
|
||||
amqp.configuration.current=parking-lot-dlx
|
Loading…
Reference in New Issue