[AMQ-6517] make pre dispatch redelivery policy check optional jms.redeliveryPolicy.preDispatchCheck=false

This commit is contained in:
gtully 2016-11-24 17:02:37 +00:00
parent 8bc3ee29cd
commit b6bca3976c
6 changed files with 150 additions and 7 deletions

View File

@ -506,7 +506,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
sendPullCommand(timeout);
} else if (redeliveryExceeded(md)) {
LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md);
posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
posionAck(md, "Dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
if (timeout > 0) {
timeout = Math.max(deadline - System.currentTimeMillis(), 0);
}
@ -539,6 +539,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
try {
return session.getTransacted()
&& redeliveryPolicy != null
&& redeliveryPolicy.isPreDispatchCheck()
&& redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
&& md.getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()
// redeliveryCounter > x expected after resend via brokerRedeliveryPlugin
@ -1255,7 +1256,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
ack.setFirstMessageId(firstMsgId);
ack.setPoisonCause(new Throwable("Exceeded redelivery policy limit:" + redeliveryPolicy
ack.setPoisonCause(new Throwable("Delivery[" + lastMd.getMessage().getRedeliveryCounter() + "] exceeds redelivery policy limit:" + redeliveryPolicy
+ ", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause()));
session.sendAck(ack,true);
// Adjust the window size.
@ -1392,7 +1393,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
if (listener != null && unconsumedMessages.isRunning()) {
if (redeliveryExceeded(md)) {
posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
posionAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
return;
}
ActiveMQMessage message = createActiveMQMessage(md);

View File

@ -45,6 +45,7 @@ public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable,
protected boolean useExponentialBackOff;
protected double backOffMultiplier = 5.0;
protected long redeliveryDelay = initialRedeliveryDelay;
protected boolean preDispatchCheck = true;
public RedeliveryPolicy() {
}
@ -156,4 +157,12 @@ public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable,
public String toString() {
return IntrospectionSupport.toString(this, DestinationMapEntry.class, null);
}
public void setPreDispatchCheck(boolean preDispatchCheck) {
this.preDispatchCheck = preDispatchCheck;
}
public boolean isPreDispatchCheck() {
return preDispatchCheck;
}
}

View File

@ -350,6 +350,7 @@ public class MessageListenerRedeliveryTest {
assertTrue("is correct exception", cause.contains(getTestName()));
assertTrue("cause exception is remembered", cause.contains("Throwable"));
assertTrue("cause policy is remembered", cause.contains("RedeliveryPolicy"));
assertTrue("cause redelivered count is remembered", cause.contains("[" + (maxDeliveries+1) +"]"));
session.close();
}

View File

@ -38,6 +38,8 @@ import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.transport.vm.VMTransportServer;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -246,7 +248,9 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
assertNotNull(m);
assertEquals("1st", m.getText());
String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
assertTrue("cause exception has policy ref: " + cause, cause.contains("RedeliveryPolicy"));
assertTrue("cause exception has redelivered count ref: " + cause, cause.contains("[3]"));
session.commit();
}
@ -451,9 +455,134 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
assertNotNull("Got message from DLQ", m);
assertEquals("1st", m.getText());
String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
assertTrue("cause exception has policy ref: " + cause, cause.contains("RedeliveryPolicy"));
assertTrue("cause exception has pre dispatch and count:" + cause, cause.contains("Delivery[5]"));
dlqSession.commit();
}
public void testRepeatedRedeliveryBrokerCloseReceiveNoCommit() throws Exception {
connection.start();
ActiveMQQueue destination = new ActiveMQQueue("TEST");
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(destination);
// Send the messages
producer.send(session.createTextMessage("1st"));
session.commit();
final int maxRedeliveries = 4;
for (int i=0;i<=maxRedeliveries +1;i++) {
connection = (ActiveMQConnection)factory.createConnection(userName, password);
connections.add(connection);
// Receive a message with the JMS API
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(0);
policy.setUseExponentialBackOff(false);
policy.setMaximumRedeliveries(maxRedeliveries);
connection.start();
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
ActiveMQTextMessage m = ((ActiveMQTextMessage)consumer.receive(4000));
if (i<=maxRedeliveries) {
assertEquals("1st", m.getText());
assertEquals(i, m.getRedeliveryCounter());
} else {
assertNull("null on exceeding redelivery count", m);
assertTrue("message in dlq", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("total dequeue count: " + broker.getAdminView().getTotalDequeueCount());
return broker.getAdminView().getTotalDequeueCount() == 1;
}
}));
}
// abortive close via broker
for (VMTransportServer transportServer : VMTransportFactory.SERVERS.values()) {
transportServer.stop();
}
try {
connection.close();
} catch (Exception expected) {
} finally {
connections.remove(connection);
}
}
connection = (ActiveMQConnection)factory.createConnection(userName, password);
connection.start();
connections.add(connection);
Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
// We should be able to get the message off the DLQ now.
TextMessage m = (TextMessage)dlqConsumer.receive(1000);
assertNotNull("Got message from DLQ", m);
assertEquals("1st", m.getText());
String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
assertTrue("cause exception has policy ref: " + cause, cause.contains("RedeliveryPolicy"));
assertTrue("cause exception has pre dispatch and count:" + cause, cause.contains("Dispatch[5]"));
dlqSession.commit();
}
public void testRepeatedRedeliveryReceiveBrokerCloseNoPreDispatchCheck() throws Exception {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
MessageProducer producer = session.createProducer(destination);
// Send the messages
producer.send(session.createTextMessage("1st"));
session.commit();
final int maxRedeliveries = 4;
for (int i=0;i<=maxRedeliveries + 1;i++) {
connection = (ActiveMQConnection)factory.createConnection(userName, password);
connections.add(connection);
// Receive a message with the JMS API
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(0);
policy.setUseExponentialBackOff(false);
policy.setMaximumRedeliveries(maxRedeliveries);
policy.setPreDispatchCheck(false);
connection.start();
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
ActiveMQTextMessage m = ((ActiveMQTextMessage)consumer.receive(4000));
assertNotNull("got message on i=" + i, m);
assertEquals("1st", m.getText());
assertEquals(i, m.getRedeliveryCounter());
// abortive close via broker
for (VMTransportServer transportServer : VMTransportFactory.SERVERS.values()) {
transportServer.stop();
}
try {
connection.close();
} catch (Exception expected) {
} finally {
connections.remove(connection);
}
}
}
@ -518,7 +647,10 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
assertNotNull("Got message from DLQ", m);
assertEquals("1st", m.getText());
String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
LOG.info("cause: " + cause);
assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
assertTrue("cause exception has redelivered count ref: " + cause, cause.contains("[5]"));
dlqSession.commit();
}

View File

@ -74,7 +74,7 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
for (int i = 0; i < maxBrokerRedeliveriesToValidate; i++) {
Message shouldBeNull = consumer.receive(500);
assertNull("did not get message after redelivery count exceeded: " + shouldBeNull, shouldBeNull);
assertNull("did not get message early: " + shouldBeNull, shouldBeNull);
TimeUnit.SECONDS.sleep(4);

View File

@ -70,7 +70,7 @@ public class RedeliveryPluginHeaderTest extends TestCase {
//pushed message to broker
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
transportURL + "?trace=true&jms.redeliveryPolicy.maximumRedeliveries=0");
transportURL + "?trace=true&jms.redeliveryPolicy.maximumRedeliveries=0&jms.redeliveryPolicy.preDispatchCheck=true");
Connection connection = factory.createConnection();
connection.start();