https://issues.apache.org/jira/browse/AMQ-5146 - validate redeliveryPolicy excess pre dispatch, fix and test

This commit is contained in:
gtully 2014-04-16 15:48:27 +01:00
parent e947927511
commit fad1dd0f17
2 changed files with 252 additions and 16 deletions

View File

@ -497,6 +497,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (timeout > 0) {
timeout = Math.max(deadline - System.currentTimeMillis(), 0);
}
} else if (redeliveryExceeded(md)) {
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " received with excessive redelivered: " + md);
}
posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
} else {
if (LOG.isTraceEnabled()) {
LOG.trace(getConsumerId() + " received message: " + md);
@ -510,6 +515,25 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
private void posionAck(MessageDispatch md, String cause) throws JMSException {
MessageAck posionAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
posionAck.setFirstMessageId(md.getMessage().getMessageId());
posionAck.setPoisonCause(new Throwable(cause));
session.sendAck(posionAck);
}
private boolean redeliveryExceeded(MessageDispatch md) {
try {
return redeliveryPolicy != null
&& redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
&& md.getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()
// redeliveryCounter > x expected after resend via brokerRedeliveryPlugin
&& md.getMessage().getProperty("redeliveryDelay") == null;
} catch (IOException ignored) {
return false;
}
}
/**
* Receives the next message produced for this message consumer.
* <P>
@ -1353,6 +1377,10 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (!unconsumedMessages.isClosed()) {
if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
if (listener != null && unconsumedMessages.isRunning()) {
if (redeliveryExceeded(md)) {
posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
return;
}
ActiveMQMessage message = createActiveMQMessage(md);
beforeMessageIsConsumed(md);
try {
@ -1386,10 +1414,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} else {
if (!session.isTransacted()) {
LOG.warn("Duplicate non transacted dispatch to consumer: " + getConsumerId() + ", poison acking: " + md);
MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
poisonAck.setFirstMessageId(md.getMessage().getMessageId());
poisonAck.setPoisonCause(new Throwable("Duplicate non transacted delivery to " + getConsumerId()));
session.sendAck(poisonAck);
posionAck(md, "Duplicate non transacted delivery to " + getConsumerId());
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage());
@ -1405,14 +1430,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
if (needsPoisonAck) {
MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
poisonAck.setFirstMessageId(md.getMessage().getMessageId());
poisonAck.setPoisonCause(new JMSException("Duplicate dispatch with transacted redeliver pending on another consumer, connection: "
+ session.getConnection().getConnectionInfo().getConnectionId()));
LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another"
+ " consumer on this connection, failoverRedeliveryWaitPeriod="
+ failoverRedeliveryWaitPeriod + ". Message: " + md + ", poisonAck: " + poisonAck);
session.sendAck(poisonAck);
+ failoverRedeliveryWaitPeriod + ". Message: " + md);
posionAck(md, "Duplicate dispatch with transacted redeliver pending on another consumer, connection: "
+ session.getConnection().getConnectionInfo().getConnectionId());
} else {
if (transactedIndividualAck) {
immediateIndividualTransactedAck(md);

View File

@ -16,23 +16,28 @@
*/
package org.apache.activemq;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.Test;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
/**
* Test cases used to test the JMS message exclusive consumers.
*
*
*/
public class RedeliveryPolicyTest extends JmsTestSupport {
public static Test suite() {
@ -383,6 +388,215 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
}
public void testRepeatedRedeliveryReceiveNoCommit() throws Exception {
connection.start();
Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
MessageProducer producer = dlqSession.createProducer(destination);
// Send the messages
producer.send(dlqSession.createTextMessage("1st"));
dlqSession.commit();
MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
final int maxRedeliveries = 4;
for (int i=0;i<=maxRedeliveries +1;i++) {
connection = (ActiveMQConnection)factory.createConnection(userName, password);
connections.add(connection);
// Receive a message with the JMS API
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(0);
policy.setUseExponentialBackOff(false);
policy.setMaximumRedeliveries(maxRedeliveries);
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
ActiveMQTextMessage m = ((ActiveMQTextMessage)consumer.receive(4000));
if (i<=maxRedeliveries) {
assertEquals("1st", m.getText());
assertEquals(i, m.getRedeliveryCounter());
} else {
assertNull("null on exceeding redelivery count", m);
}
connection.close();
connections.remove(connection);
}
// We should be able to get the message off the DLQ now.
TextMessage m = (TextMessage)dlqConsumer.receive(1000);
assertNotNull("Got message from DLQ", m);
assertEquals("1st", m.getText());
String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
dlqSession.commit();
}
public void testRepeatedRedeliveryOnMessageNoCommit() throws Exception {
connection.start();
Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
MessageProducer producer = dlqSession.createProducer(destination);
// Send the messages
producer.send(dlqSession.createTextMessage("1st"));
dlqSession.commit();
MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
final int maxRedeliveries = 4;
final AtomicInteger receivedCount = new AtomicInteger(0);
for (int i=0;i<=maxRedeliveries+1;i++) {
connection = (ActiveMQConnection)factory.createConnection(userName, password);
connections.add(connection);
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(0);
policy.setUseExponentialBackOff(false);
policy.setMaximumRedeliveries(maxRedeliveries);
connection.start();
final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(destination);
final CountDownLatch done = new CountDownLatch(1);
consumer.setMessageListener(new MessageListener(){
@Override
public void onMessage(Message message) {
try {
ActiveMQTextMessage m = (ActiveMQTextMessage)message;
assertEquals("1st", m.getText());
assertEquals(receivedCount.get(), m.getRedeliveryCounter());
receivedCount.incrementAndGet();
done.countDown();
} catch (Exception ignored) {
ignored.printStackTrace();
}
}
});
if (i<=maxRedeliveries) {
assertTrue("listener done", done.await(5, TimeUnit.SECONDS));
} else {
// final redlivery gets poisoned before dispatch
assertFalse("listener done", done.await(1, TimeUnit.SECONDS));
}
connection.close();
connections.remove(connection);
}
// We should be able to get the message off the DLQ now.
TextMessage m = (TextMessage)dlqConsumer.receive(1000);
assertNotNull("Got message from DLQ", m);
assertEquals("1st", m.getText());
String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
dlqSession.commit();
}
public void testRepeatedRedeliveryServerSessionNoCommit() throws Exception {
connection.start();
Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
MessageProducer producer = dlqSession.createProducer(destination);
// Send the messages
producer.send(dlqSession.createTextMessage("1st"));
dlqSession.commit();
MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
final int maxRedeliveries = 4;
final AtomicInteger receivedCount = new AtomicInteger(0);
for (int i=0;i<=maxRedeliveries+1;i++) {
connection = (ActiveMQConnection)factory.createConnection(userName, password);
connections.add(connection);
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(0);
policy.setUseExponentialBackOff(false);
policy.setMaximumRedeliveries(maxRedeliveries);
connection.start();
final CountDownLatch done = new CountDownLatch(1);
final ActiveMQSession session = (ActiveMQSession) connection.createSession(true, Session.SESSION_TRANSACTED);
session.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
ActiveMQTextMessage m = (ActiveMQTextMessage) message;
assertEquals("1st", m.getText());
assertEquals(receivedCount.get(), m.getRedeliveryCounter());
receivedCount.incrementAndGet();
done.countDown();
} catch (Exception ignored) {
ignored.printStackTrace();
}
}
});
connection.createConnectionConsumer(
destination,
null,
new ServerSessionPool() {
@Override
public ServerSession getServerSession() throws JMSException {
return new ServerSession() {
@Override
public Session getSession() throws JMSException {
return session;
}
@Override
public void start() throws JMSException {
}
};
}
},
100,
false);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
session.run();
return done.await(10, TimeUnit.MILLISECONDS);
}
});
if (i<=maxRedeliveries) {
assertTrue("listener done @" + i, done.await(5, TimeUnit.SECONDS));
} else {
// final redlivery gets poisoned before dispatch
assertFalse("listener not done @" + i, done.await(1, TimeUnit.SECONDS));
}
connection.close();
connections.remove(connection);
}
// We should be able to get the message off the DLQ now.
TextMessage m = (TextMessage)dlqConsumer.receive(1000);
assertNotNull("Got message from DLQ", m);
assertEquals("1st", m.getText());
String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
dlqSession.commit();
}
public void testInitialRedeliveryDelayZero() throws Exception {