diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index a52e2d4347..0bf1adef8e 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -506,7 +506,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC sendPullCommand(timeout); } else if (redeliveryExceeded(md)) { LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md); - posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy); + posionAck(md, "Dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy); if (timeout > 0) { timeout = Math.max(deadline - System.currentTimeMillis(), 0); } @@ -539,6 +539,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC try { return session.getTransacted() && redeliveryPolicy != null + && redeliveryPolicy.isPreDispatchCheck() && redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES && md.getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries() // redeliveryCounter > x expected after resend via brokerRedeliveryPlugin @@ -1255,7 +1256,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size()); ack.setFirstMessageId(firstMsgId); - ack.setPoisonCause(new Throwable("Exceeded redelivery policy limit:" + redeliveryPolicy + ack.setPoisonCause(new Throwable("Delivery[" + lastMd.getMessage().getRedeliveryCounter() + "] exceeds redelivery policy limit:" + redeliveryPolicy + ", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause())); session.sendAck(ack,true); // Adjust the window size. @@ -1392,7 +1393,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) { if (listener != null && unconsumedMessages.isRunning()) { if (redeliveryExceeded(md)) { - posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy); + posionAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy); return; } ActiveMQMessage message = createActiveMQMessage(md); diff --git a/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java b/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java index 91f2b71923..1ced50755c 100644 --- a/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java +++ b/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java @@ -45,6 +45,7 @@ public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable, protected boolean useExponentialBackOff; protected double backOffMultiplier = 5.0; protected long redeliveryDelay = initialRedeliveryDelay; + protected boolean preDispatchCheck = true; public RedeliveryPolicy() { } @@ -156,4 +157,12 @@ public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable, public String toString() { return IntrospectionSupport.toString(this, DestinationMapEntry.class, null); } + + public void setPreDispatchCheck(boolean preDispatchCheck) { + this.preDispatchCheck = preDispatchCheck; + } + + public boolean isPreDispatchCheck() { + return preDispatchCheck; + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java index 62f4995f30..cdecfce78e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java @@ -350,6 +350,7 @@ public class MessageListenerRedeliveryTest { assertTrue("is correct exception", cause.contains(getTestName())); assertTrue("cause exception is remembered", cause.contains("Throwable")); assertTrue("cause policy is remembered", cause.contains("RedeliveryPolicy")); + assertTrue("cause redelivered count is remembered", cause.contains("[" + (maxDeliveries+1) +"]")); session.close(); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java index ac81a1f410..c403781596 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java @@ -38,6 +38,8 @@ import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.transport.vm.VMTransportFactory; +import org.apache.activemq.transport.vm.VMTransportServer; import org.apache.activemq.util.Wait; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -246,7 +248,9 @@ public class RedeliveryPolicyTest extends JmsTestSupport { assertNotNull(m); assertEquals("1st", m.getText()); String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); - assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy")); + assertTrue("cause exception has policy ref: " + cause, cause.contains("RedeliveryPolicy")); + assertTrue("cause exception has redelivered count ref: " + cause, cause.contains("[3]")); + session.commit(); } @@ -451,9 +455,134 @@ public class RedeliveryPolicyTest extends JmsTestSupport { assertNotNull("Got message from DLQ", m); assertEquals("1st", m.getText()); String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); - assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy")); + assertTrue("cause exception has policy ref: " + cause, cause.contains("RedeliveryPolicy")); + assertTrue("cause exception has pre dispatch and count:" + cause, cause.contains("Delivery[5]")); + dlqSession.commit(); + + } + + + public void testRepeatedRedeliveryBrokerCloseReceiveNoCommit() throws Exception { + + connection.start(); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + MessageProducer producer = session.createProducer(destination); + + // Send the messages + producer.send(session.createTextMessage("1st")); + + session.commit(); + + final int maxRedeliveries = 4; + for (int i=0;i<=maxRedeliveries +1;i++) { + + connection = (ActiveMQConnection)factory.createConnection(userName, password); + connections.add(connection); + // Receive a message with the JMS API + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(0); + policy.setUseExponentialBackOff(false); + policy.setMaximumRedeliveries(maxRedeliveries); + + connection.start(); + session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(destination); + + ActiveMQTextMessage m = ((ActiveMQTextMessage)consumer.receive(4000)); + if (i<=maxRedeliveries) { + assertEquals("1st", m.getText()); + assertEquals(i, m.getRedeliveryCounter()); + } else { + assertNull("null on exceeding redelivery count", m); + + assertTrue("message in dlq", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("total dequeue count: " + broker.getAdminView().getTotalDequeueCount()); + return broker.getAdminView().getTotalDequeueCount() == 1; + } + })); + } + + // abortive close via broker + for (VMTransportServer transportServer : VMTransportFactory.SERVERS.values()) { + transportServer.stop(); + } + + try { + connection.close(); + } catch (Exception expected) { + } finally { + connections.remove(connection); + } + } + + connection = (ActiveMQConnection)factory.createConnection(userName, password); + connection.start(); + connections.add(connection); + Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ")); + + // We should be able to get the message off the DLQ now. + TextMessage m = (TextMessage)dlqConsumer.receive(1000); + assertNotNull("Got message from DLQ", m); + assertEquals("1st", m.getText()); + String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); + assertTrue("cause exception has policy ref: " + cause, cause.contains("RedeliveryPolicy")); + assertTrue("cause exception has pre dispatch and count:" + cause, cause.contains("Dispatch[5]")); + + dlqSession.commit(); + + } + + public void testRepeatedRedeliveryReceiveBrokerCloseNoPreDispatchCheck() throws Exception { + + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = session.createProducer(destination); + + // Send the messages + producer.send(session.createTextMessage("1st")); + session.commit(); + + final int maxRedeliveries = 4; + for (int i=0;i<=maxRedeliveries + 1;i++) { + + connection = (ActiveMQConnection)factory.createConnection(userName, password); + connections.add(connection); + // Receive a message with the JMS API + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(0); + policy.setUseExponentialBackOff(false); + policy.setMaximumRedeliveries(maxRedeliveries); + policy.setPreDispatchCheck(false); + + connection.start(); + session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(destination); + + ActiveMQTextMessage m = ((ActiveMQTextMessage)consumer.receive(4000)); + assertNotNull("got message on i=" + i, m); + assertEquals("1st", m.getText()); + assertEquals(i, m.getRedeliveryCounter()); + + // abortive close via broker + for (VMTransportServer transportServer : VMTransportFactory.SERVERS.values()) { + transportServer.stop(); + } + + try { + connection.close(); + } catch (Exception expected) { + } finally { + connections.remove(connection); + } + } } @@ -518,7 +647,10 @@ public class RedeliveryPolicyTest extends JmsTestSupport { assertNotNull("Got message from DLQ", m); assertEquals("1st", m.getText()); String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); + LOG.info("cause: " + cause); assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy")); + assertTrue("cause exception has redelivered count ref: " + cause, cause.contains("[5]")); + dlqSession.commit(); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java index 916a655d45..9971e8efe5 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java @@ -74,7 +74,7 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport { for (int i = 0; i < maxBrokerRedeliveriesToValidate; i++) { Message shouldBeNull = consumer.receive(500); - assertNull("did not get message after redelivery count exceeded: " + shouldBeNull, shouldBeNull); + assertNull("did not get message early: " + shouldBeNull, shouldBeNull); TimeUnit.SECONDS.sleep(4); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java index 414b70d7d8..9fc20fe73a 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java @@ -70,7 +70,7 @@ public class RedeliveryPluginHeaderTest extends TestCase { //pushed message to broker ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( - transportURL + "?trace=true&jms.redeliveryPolicy.maximumRedeliveries=0"); + transportURL + "?trace=true&jms.redeliveryPolicy.maximumRedeliveries=0&jms.redeliveryPolicy.preDispatchCheck=true"); Connection connection = factory.createConnection(); connection.start();