diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java b/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java index d0abf0002d..05a55727e9 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java @@ -135,8 +135,9 @@ public class RedeliveryPlugin extends BrokerPluginSupport { Destination regionDestination = (Destination) messageReference.getRegionDestination(); final RedeliveryPolicy redeliveryPolicy = redeliveryPolicyMap.getEntryFor(regionDestination.getActiveMQDestination()); if (redeliveryPolicy != null) { + final int maximumRedeliveries = redeliveryPolicy.getMaximumRedeliveries(); int redeliveryCount = messageReference.getRedeliveryCounter(); - if (redeliveryCount < redeliveryPolicy.getMaximumRedeliveries()) { + if (RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES == maximumRedeliveries || redeliveryCount < maximumRedeliveries) { long delay = ( redeliveryCount == 0 ? redeliveryPolicy.getInitialRedeliveryDelay() : @@ -146,7 +147,7 @@ public class RedeliveryPlugin extends BrokerPluginSupport { } else if (isSendToDlqIfMaxRetriesExceeded()) { super.sendToDeadLetterQueue(context, messageReference, subscription); } else { - LOG.debug("Discarding message that exceeds max redelivery count, " + messageReference.getMessageId()); + LOG.debug("Discarding message that exceeds max redelivery count( " + maximumRedeliveries + "), " + messageReference.getMessageId()); } } else if (isFallbackToDeadLetter()) { super.sendToDeadLetterQueue(context, messageReference, subscription); diff --git a/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java b/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java index c53b3d5892..c6824f7aef 100644 --- a/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java +++ b/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java @@ -32,11 +32,13 @@ import org.apache.activemq.util.IntrospectionSupport; public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable, Serializable { public static final int NO_MAXIMUM_REDELIVERIES = -1; + public static final int DEFAULT_MAXIMUM_REDELIVERIES = 6; + private static Random randomNumberGenerator; // +/-15% for a 30% spread -cgs protected double collisionAvoidanceFactor = 0.15d; - protected int maximumRedeliveries = 6; + protected int maximumRedeliveries = DEFAULT_MAXIMUM_REDELIVERIES; protected long maximumRedeliveryDelay = -1; protected long initialRedeliveryDelay = 1000L; protected boolean useCollisionAvoidance; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java index da746f0b4a..a07f6df6ef 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java @@ -40,10 +40,20 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport { final ActiveMQQueue destination = new ActiveMQQueue("Redelivery"); final String data = "hi"; final long redeliveryDelayMillis = 2000; - final int maxBrokerRedeliveries = 2; + int maxBrokerRedeliveries = 2; public void testScheduledRedelivery() throws Exception { + doTestScheduledRedelivery(maxBrokerRedeliveries, true); + } + public void testInfiniteRedelivery() throws Exception { + maxBrokerRedeliveries = RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES; + doTestScheduledRedelivery(RedeliveryPolicy.DEFAULT_MAXIMUM_REDELIVERIES + 1, false); + } + + public void doTestScheduledRedelivery(int maxBrokerRedeliveriesToValidate, boolean validateDLQ) throws Exception { + + startBroker(true); sendMessage(0); ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection(); @@ -59,7 +69,7 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport { LOG.info("got: " + message); consumerSession.rollback(); - for (int i=0;i