mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-4362 - Broker-based redelivery plugin - support for maximumRedeliveries="-1"
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1454330 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2d46399ff3
commit
fc15f130d1
|
@ -135,8 +135,9 @@ public class RedeliveryPlugin extends BrokerPluginSupport {
|
||||||
Destination regionDestination = (Destination) messageReference.getRegionDestination();
|
Destination regionDestination = (Destination) messageReference.getRegionDestination();
|
||||||
final RedeliveryPolicy redeliveryPolicy = redeliveryPolicyMap.getEntryFor(regionDestination.getActiveMQDestination());
|
final RedeliveryPolicy redeliveryPolicy = redeliveryPolicyMap.getEntryFor(regionDestination.getActiveMQDestination());
|
||||||
if (redeliveryPolicy != null) {
|
if (redeliveryPolicy != null) {
|
||||||
|
final int maximumRedeliveries = redeliveryPolicy.getMaximumRedeliveries();
|
||||||
int redeliveryCount = messageReference.getRedeliveryCounter();
|
int redeliveryCount = messageReference.getRedeliveryCounter();
|
||||||
if (redeliveryCount < redeliveryPolicy.getMaximumRedeliveries()) {
|
if (RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES == maximumRedeliveries || redeliveryCount < maximumRedeliveries) {
|
||||||
|
|
||||||
long delay = ( redeliveryCount == 0 ?
|
long delay = ( redeliveryCount == 0 ?
|
||||||
redeliveryPolicy.getInitialRedeliveryDelay() :
|
redeliveryPolicy.getInitialRedeliveryDelay() :
|
||||||
|
@ -146,7 +147,7 @@ public class RedeliveryPlugin extends BrokerPluginSupport {
|
||||||
} else if (isSendToDlqIfMaxRetriesExceeded()) {
|
} else if (isSendToDlqIfMaxRetriesExceeded()) {
|
||||||
super.sendToDeadLetterQueue(context, messageReference, subscription);
|
super.sendToDeadLetterQueue(context, messageReference, subscription);
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Discarding message that exceeds max redelivery count, " + messageReference.getMessageId());
|
LOG.debug("Discarding message that exceeds max redelivery count( " + maximumRedeliveries + "), " + messageReference.getMessageId());
|
||||||
}
|
}
|
||||||
} else if (isFallbackToDeadLetter()) {
|
} else if (isFallbackToDeadLetter()) {
|
||||||
super.sendToDeadLetterQueue(context, messageReference, subscription);
|
super.sendToDeadLetterQueue(context, messageReference, subscription);
|
||||||
|
|
|
@ -32,11 +32,13 @@ import org.apache.activemq.util.IntrospectionSupport;
|
||||||
public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable, Serializable {
|
public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable, Serializable {
|
||||||
|
|
||||||
public static final int NO_MAXIMUM_REDELIVERIES = -1;
|
public static final int NO_MAXIMUM_REDELIVERIES = -1;
|
||||||
|
public static final int DEFAULT_MAXIMUM_REDELIVERIES = 6;
|
||||||
|
|
||||||
private static Random randomNumberGenerator;
|
private static Random randomNumberGenerator;
|
||||||
|
|
||||||
// +/-15% for a 30% spread -cgs
|
// +/-15% for a 30% spread -cgs
|
||||||
protected double collisionAvoidanceFactor = 0.15d;
|
protected double collisionAvoidanceFactor = 0.15d;
|
||||||
protected int maximumRedeliveries = 6;
|
protected int maximumRedeliveries = DEFAULT_MAXIMUM_REDELIVERIES;
|
||||||
protected long maximumRedeliveryDelay = -1;
|
protected long maximumRedeliveryDelay = -1;
|
||||||
protected long initialRedeliveryDelay = 1000L;
|
protected long initialRedeliveryDelay = 1000L;
|
||||||
protected boolean useCollisionAvoidance;
|
protected boolean useCollisionAvoidance;
|
||||||
|
|
|
@ -40,10 +40,20 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
|
||||||
final ActiveMQQueue destination = new ActiveMQQueue("Redelivery");
|
final ActiveMQQueue destination = new ActiveMQQueue("Redelivery");
|
||||||
final String data = "hi";
|
final String data = "hi";
|
||||||
final long redeliveryDelayMillis = 2000;
|
final long redeliveryDelayMillis = 2000;
|
||||||
final int maxBrokerRedeliveries = 2;
|
int maxBrokerRedeliveries = 2;
|
||||||
|
|
||||||
public void testScheduledRedelivery() throws Exception {
|
public void testScheduledRedelivery() throws Exception {
|
||||||
|
doTestScheduledRedelivery(maxBrokerRedeliveries, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testInfiniteRedelivery() throws Exception {
|
||||||
|
maxBrokerRedeliveries = RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES;
|
||||||
|
doTestScheduledRedelivery(RedeliveryPolicy.DEFAULT_MAXIMUM_REDELIVERIES + 1, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void doTestScheduledRedelivery(int maxBrokerRedeliveriesToValidate, boolean validateDLQ) throws Exception {
|
||||||
|
|
||||||
|
startBroker(true);
|
||||||
sendMessage(0);
|
sendMessage(0);
|
||||||
|
|
||||||
ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection();
|
ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection();
|
||||||
|
@ -59,7 +69,7 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
|
||||||
LOG.info("got: " + message);
|
LOG.info("got: " + message);
|
||||||
consumerSession.rollback();
|
consumerSession.rollback();
|
||||||
|
|
||||||
for (int i=0;i<maxBrokerRedeliveries;i++) {
|
for (int i=0;i<maxBrokerRedeliveriesToValidate;i++) {
|
||||||
Message shouldBeNull = consumer.receive(500);
|
Message shouldBeNull = consumer.receive(500);
|
||||||
assertNull("did not get message after redelivery count exceeded: " + shouldBeNull, shouldBeNull);
|
assertNull("did not get message after redelivery count exceeded: " + shouldBeNull, shouldBeNull);
|
||||||
|
|
||||||
|
@ -74,15 +84,25 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
|
||||||
consumerSession.rollback();
|
consumerSession.rollback();
|
||||||
}
|
}
|
||||||
|
|
||||||
// validate DLQ
|
if (validateDLQ) {
|
||||||
MessageConsumer dlqConsumer = consumerSession.createConsumer(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
|
MessageConsumer dlqConsumer = consumerSession.createConsumer(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
|
||||||
Message dlqMessage = dlqConsumer.receive(2000);
|
Message dlqMessage = dlqConsumer.receive(2000);
|
||||||
assertNotNull("Got message from dql", dlqMessage);
|
assertNotNull("Got message from dql", dlqMessage);
|
||||||
assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data"));
|
assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data"));
|
||||||
consumerSession.commit();
|
consumerSession.commit();
|
||||||
|
} else {
|
||||||
|
// consume/commit ok
|
||||||
|
message = consumer.receive(3000);
|
||||||
|
assertNotNull("got message", message);
|
||||||
|
assertEquals("redeliveries accounted for", maxBrokerRedeliveriesToValidate + 2, message.getLongProperty("JMSXDeliveryCount"));
|
||||||
|
consumerSession.commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
consumerConnection.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testNoScheduledRedeliveryOfExpired() throws Exception {
|
public void testNoScheduledRedeliveryOfExpired() throws Exception {
|
||||||
|
startBroker(true);
|
||||||
ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection();
|
ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection();
|
||||||
consumerConnection.start();
|
consumerConnection.start();
|
||||||
Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||||
|
@ -155,12 +175,6 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
|
||||||
return new ActiveMQConnectionFactory("vm://localhost");
|
return new ActiveMQConnectionFactory("vm://localhost");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void setUp() throws Exception {
|
|
||||||
super.setUp();
|
|
||||||
startBroker(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void tearDown() throws Exception {
|
protected void tearDown() throws Exception {
|
||||||
stopBroker();
|
stopBroker();
|
||||||
|
|
Loading…
Reference in New Issue