Fix issues, segregate configurations

This commit is contained in:
Alexander Molochko 2019-11-02 23:57:12 +03:00
parent 70d5e7c57d
commit 9829300a62
13 changed files with 267 additions and 71 deletions

View File

@ -11,6 +11,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class ErrorHandlingApp {
@Autowired
MessageProducer messageProducer;

View File

@ -1,19 +1,31 @@
package com.baeldung.springamqp.errorhandling.configuration;
import org.springframework.amqp.core.*;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.*;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
//@Configuration
@Configuration
@ConditionalOnProperty(
value = "amqp.configuration.current",
havingValue = "dlx-custom")
public class DLXCustomAmqpConfiguration {
public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
@Bean
Queue messagesQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
.build();
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
.build();
}
@Bean

View File

@ -1,11 +1,23 @@
package com.baeldung.springamqp.errorhandling.configuration;
import org.springframework.amqp.core.*;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.*;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
//@Configuration
@Configuration
@ConditionalOnProperty(
value = "amqp.configuration.current",
havingValue = "parking-lot-dlx")
public class DLXParkingLotAmqpConfiguration {
public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot";
@ -29,8 +41,8 @@ public class DLXParkingLotAmqpConfiguration {
@Bean
Queue messagesQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
.build();
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
.build();
}
@Bean

View File

@ -1,12 +1,17 @@
package com.baeldung.springamqp.errorhandling.configuration;
import com.baeldung.springamqp.errorhandling.errorhandler.CustomFatalExceptionStrategy;
import org.springframework.amqp.core.*;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ErrorHandler;
@ -15,11 +20,15 @@ import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpC
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
@Configuration
@ConditionalOnProperty(
value = "amqp.configuration.current",
havingValue = "fatal-error-strategy")
public class FatalExceptionStrategyAmqpConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setErrorHandler(errorHandler());
@ -39,7 +48,7 @@ public class FatalExceptionStrategyAmqpConfiguration {
@Bean
Queue messagesQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES)
.build();
.build();
}
@Bean

View File

@ -1,17 +1,26 @@
package com.baeldung.springamqp.errorhandling.configuration;
import com.baeldung.springamqp.errorhandling.errorhandler.CustomErrorHandler;
import org.springframework.amqp.core.*;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ErrorHandler;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
//@Configuration
@Configuration
@ConditionalOnProperty(
value = "amqp.configuration.current",
havingValue = "listener-error")
public class ListenerErrorHandlerAmqpConfiguration {
@Bean
@ -31,7 +40,7 @@ public class ListenerErrorHandlerAmqpConfiguration {
@Bean
Queue messagesQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES)
.build();
.build();
}
@Bean

View File

@ -1,19 +1,31 @@
package com.baeldung.springamqp.errorhandling.configuration;
import org.springframework.amqp.core.*;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.*;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
//@Configuration
public class DLXDefaultAmqpConfiguration {
@Configuration
@ConditionalOnProperty(
value = "amqp.configuration.current",
havingValue = "routing-dlq")
public class RoutingKeyDLQAmqpConfiguration {
public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
@Bean
Queue messagesQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
.build();
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
.build();
}
@Bean

View File

@ -1,9 +1,18 @@
package com.baeldung.springamqp.errorhandling.configuration;
import org.springframework.amqp.core.*;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//@Configuration
@Configuration
@ConditionalOnProperty(
value = "amqp.configuration.current",
havingValue = "simple-dlq")
public class SimpleDLQAmqpConfiguration {
public static final String QUEUE_MESSAGES = "baeldung-messages-queue";
public static final String QUEUE_MESSAGES_DLQ = QUEUE_MESSAGES + ".dlq";
@ -12,9 +21,9 @@ public class SimpleDLQAmqpConfiguration {
@Bean
Queue messagesQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ)
.build();
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ)
.build();
}
@Bean

View File

@ -0,0 +1,35 @@
package com.baeldung.springamqp.errorhandling.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
import static com.baeldung.springamqp.errorhandling.consumer.MessagesConsumer.HEADER_X_RETRIES_COUNT;
public class DLQCustomAmqpContainer {
private static final Logger log = LoggerFactory.getLogger(DLQCustomAmqpContainer.class);
private final RabbitTemplate rabbitTemplate;
public static final int MAX_RETRIES_COUNT = 2;
public DLQCustomAmqpContainer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRetryHeaders(Message failedMessage) {
Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT);
if (retriesCnt == null)
retriesCnt = 1;
if (retriesCnt > MAX_RETRIES_COUNT) {
log.info("Discarding message");
return;
}
log.info("Retrying message for the {} time", retriesCnt);
failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}
}

View File

@ -7,15 +7,14 @@ import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static com.baeldung.springamqp.errorhandling.configuration.DLXParkingLotAmqpConfiguration.EXCHANGE_PARKING_LOT;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
@Service
@Configuration
public class MessagesConsumer {
public static final String HEADER_X_RETRIES_COUNT = "x-retries-count";
public static final int MAX_RETRIES_COUNT = 1;
private static final Logger log = LoggerFactory.getLogger(MessagesConsumer.class);
private final RabbitTemplate rabbitTemplate;
@ -25,51 +24,40 @@ public class MessagesConsumer {
}
@RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES)
public void receiveMessage(final Message message) throws BusinessException {
public void receiveMessage(Message message) throws BusinessException {
log.info("Received message: {}", message.toString());
throw new BusinessException();
}
//@RabbitListener(queues = DLXCustomAmqpConfiguration.QUEUE_MESSAGES_DLQ)
public void processFailedMessages(final Message message) {
log.info("Received failed message: {}", message.toString());
@Bean
@ConditionalOnProperty(
value = "amqp.configuration.current",
havingValue = "simple-dlq")
public SimpleDLQAmqpContainer simpleAmqpContainer() {
return new SimpleDLQAmqpContainer(rabbitTemplate);
}
//@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRequeue(final Message failedMessage) {
log.info("Received failed message, requeueing: {}", failedMessage.toString());
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
@Bean
@ConditionalOnProperty(
value = "amqp.configuration.current",
havingValue = "routing-dlq")
public RoutingDLQAmqpContainer routingDLQAmqpContainer() {
return new RoutingDLQAmqpContainer(rabbitTemplate);
}
//@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRetryHeaders(final Message failedMessage) {
Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT);
if (retriesCnt == null) retriesCnt = 0;
log.info("Retrying message for the {} time", retriesCnt);
if (retriesCnt > MAX_RETRIES_COUNT) {
log.info("Discarding message");
return;
}
failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
@Bean
@ConditionalOnProperty(
value = "amqp.configuration.current",
havingValue = "dlx-custom")
public DLQCustomAmqpContainer dlqAmqpContainer() {
return new DLQCustomAmqpContainer(rabbitTemplate);
}
// @RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRetryWithParkingLot(final Message failedMessage) {
Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT);
if (retriesCnt == null) retriesCnt = 0;
log.info("Retrying message for the {} time", retriesCnt);
if (retriesCnt > MAX_RETRIES_COUNT) {
log.info("Sending message to the parking lot queue");
rabbitTemplate.send(EXCHANGE_PARKING_LOT, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
return;
}
failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}
//@RabbitListener(queues = QUEUE_PARKING_LOT)
public void processParkingLotQueue(final Message failedMessage) {
log.info("Received message in parking lot queue");
@Bean
@ConditionalOnProperty(
value = "amqp.configuration.current",
havingValue = "parking-lot-dlx")
public ParkingLotDLQAmqpContainer parkingLotDLQAmqpContainer() {
return new ParkingLotDLQAmqpContainer(rabbitTemplate);
}
}

View File

@ -0,0 +1,43 @@
package com.baeldung.springamqp.errorhandling.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import static com.baeldung.springamqp.errorhandling.configuration.DLXParkingLotAmqpConfiguration.EXCHANGE_PARKING_LOT;
import static com.baeldung.springamqp.errorhandling.configuration.DLXParkingLotAmqpConfiguration.QUEUE_PARKING_LOT;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
import static com.baeldung.springamqp.errorhandling.consumer.MessagesConsumer.HEADER_X_RETRIES_COUNT;
public class ParkingLotDLQAmqpContainer {
private static final Logger log = LoggerFactory.getLogger(ParkingLotDLQAmqpContainer.class);
private final RabbitTemplate rabbitTemplate;
public static final int MAX_RETRIES_COUNT = 2;
public ParkingLotDLQAmqpContainer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRetryWithParkingLot(Message failedMessage) {
Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT);
if (retriesCnt == null)
retriesCnt = 1;
if (retriesCnt > MAX_RETRIES_COUNT) {
log.info("Sending message to the parking lot queue");
rabbitTemplate.send(EXCHANGE_PARKING_LOT, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
return;
}
log.info("Retrying message for the {} time", retriesCnt);
failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}
@RabbitListener(queues = QUEUE_PARKING_LOT)
public void processParkingLotQueue(Message failedMessage) {
log.info("Received message in parking lot queue {}", failedMessage.toString());
}
}

View File

@ -0,0 +1,33 @@
package com.baeldung.springamqp.errorhandling.consumer;
import com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration;
import com.baeldung.springamqp.errorhandling.errorhandler.BusinessException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
public class RoutingDLQAmqpContainer {
private static final Logger log = LoggerFactory.getLogger(RoutingDLQAmqpContainer.class);
private final RabbitTemplate rabbitTemplate;
public RoutingDLQAmqpContainer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES)
public void receiveMessage(Message message) throws BusinessException {
log.info("Received message: {}", message.toString());
throw new BusinessException();
}
@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRequeue(Message failedMessage) {
log.info("Received failed message, requeueing: {}", failedMessage.toString());
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}
}

View File

@ -0,0 +1,31 @@
package com.baeldung.springamqp.errorhandling.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES;
import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ;
public class SimpleDLQAmqpContainer {
private static final Logger log = LoggerFactory.getLogger(SimpleDLQAmqpContainer.class);
private final RabbitTemplate rabbitTemplate;
public SimpleDLQAmqpContainer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessages(Message message) {
log.info("Received failed message: {}", message.toString());
}
@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRequeue(Message failedMessage) {
log.info("Received failed message, requeueing: {}", failedMessage.toString());
log.info("Received failed message, requeueing: {}", failedMessage.getMessageProperties().getReceivedRoutingKey());
rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}
}

View File

@ -1 +1,3 @@
spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring.main.allow-bean-definition-overriding=true
amqp.configuration.current=parking-lot-dlx