diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index d862f708d5..89dbc81778 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -497,6 +497,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if (timeout > 0) { timeout = Math.max(deadline - System.currentTimeMillis(), 0); } + } else if (redeliveryExceeded(md)) { + if (LOG.isDebugEnabled()) { + LOG.debug(getConsumerId() + " received with excessive redelivered: " + md); + } + posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy); } else { if (LOG.isTraceEnabled()) { LOG.trace(getConsumerId() + " received message: " + md); @@ -510,6 +515,25 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } + private void posionAck(MessageDispatch md, String cause) throws JMSException { + MessageAck posionAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); + posionAck.setFirstMessageId(md.getMessage().getMessageId()); + posionAck.setPoisonCause(new Throwable(cause)); + session.sendAck(posionAck); + } + + private boolean redeliveryExceeded(MessageDispatch md) { + try { + return redeliveryPolicy != null + && redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES + && md.getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries() + // redeliveryCounter > x expected after resend via brokerRedeliveryPlugin + && md.getMessage().getProperty("redeliveryDelay") == null; + } catch (IOException ignored) { + return false; + } + } + /** * Receives the next message produced for this message consumer. *

@@ -1353,6 +1377,10 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if (!unconsumedMessages.isClosed()) { if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) { if (listener != null && unconsumedMessages.isRunning()) { + if (redeliveryExceeded(md)) { + posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy); + return; + } ActiveMQMessage message = createActiveMQMessage(md); beforeMessageIsConsumed(md); try { @@ -1386,10 +1414,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } else { if (!session.isTransacted()) { LOG.warn("Duplicate non transacted dispatch to consumer: " + getConsumerId() + ", poison acking: " + md); - MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); - poisonAck.setFirstMessageId(md.getMessage().getMessageId()); - poisonAck.setPoisonCause(new Throwable("Duplicate non transacted delivery to " + getConsumerId())); - session.sendAck(poisonAck); + posionAck(md, "Duplicate non transacted delivery to " + getConsumerId()); } else { if (LOG.isDebugEnabled()) { LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage()); @@ -1405,14 +1430,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } if (needsPoisonAck) { - MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); - poisonAck.setFirstMessageId(md.getMessage().getMessageId()); - poisonAck.setPoisonCause(new JMSException("Duplicate dispatch with transacted redeliver pending on another consumer, connection: " - + session.getConnection().getConnectionInfo().getConnectionId())); LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another" + " consumer on this connection, failoverRedeliveryWaitPeriod=" - + failoverRedeliveryWaitPeriod + ". Message: " + md + ", poisonAck: " + poisonAck); - session.sendAck(poisonAck); + + failoverRedeliveryWaitPeriod + ". Message: " + md); + posionAck(md, "Duplicate dispatch with transacted redeliver pending on another consumer, connection: " + + session.getConnection().getConnectionInfo().getConnectionId()); } else { if (transactedIndividualAck) { immediateIndividualTransactedAck(md); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java index 02db378c7e..e2b58677c3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java @@ -16,23 +16,28 @@ */ package org.apache.activemq; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.JMSException; +import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.MessageProducer; +import javax.jms.ServerSession; +import javax.jms.ServerSessionPool; import javax.jms.Session; import javax.jms.TextMessage; - import junit.framework.Test; import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; -/** - * Test cases used to test the JMS message exclusive consumers. - * - * - */ public class RedeliveryPolicyTest extends JmsTestSupport { public static Test suite() { @@ -383,6 +388,215 @@ public class RedeliveryPolicyTest extends JmsTestSupport { } + public void testRepeatedRedeliveryReceiveNoCommit() throws Exception { + + connection.start(); + Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = dlqSession.createProducer(destination); + + // Send the messages + producer.send(dlqSession.createTextMessage("1st")); + + dlqSession.commit(); + MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ")); + + final int maxRedeliveries = 4; + for (int i=0;i<=maxRedeliveries +1;i++) { + + connection = (ActiveMQConnection)factory.createConnection(userName, password); + connections.add(connection); + // Receive a message with the JMS API + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(0); + policy.setUseExponentialBackOff(false); + policy.setMaximumRedeliveries(maxRedeliveries); + + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(destination); + + ActiveMQTextMessage m = ((ActiveMQTextMessage)consumer.receive(4000)); + if (i<=maxRedeliveries) { + assertEquals("1st", m.getText()); + assertEquals(i, m.getRedeliveryCounter()); + } else { + assertNull("null on exceeding redelivery count", m); + } + connection.close(); + connections.remove(connection); + } + + // We should be able to get the message off the DLQ now. + TextMessage m = (TextMessage)dlqConsumer.receive(1000); + assertNotNull("Got message from DLQ", m); + assertEquals("1st", m.getText()); + String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); + assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy")); + dlqSession.commit(); + + } + + + public void testRepeatedRedeliveryOnMessageNoCommit() throws Exception { + + connection.start(); + Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = dlqSession.createProducer(destination); + + // Send the messages + producer.send(dlqSession.createTextMessage("1st")); + + dlqSession.commit(); + MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ")); + + final int maxRedeliveries = 4; + final AtomicInteger receivedCount = new AtomicInteger(0); + + for (int i=0;i<=maxRedeliveries+1;i++) { + + connection = (ActiveMQConnection)factory.createConnection(userName, password); + connections.add(connection); + + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(0); + policy.setUseExponentialBackOff(false); + policy.setMaximumRedeliveries(maxRedeliveries); + + connection.start(); + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(destination); + final CountDownLatch done = new CountDownLatch(1); + + consumer.setMessageListener(new MessageListener(){ + @Override + public void onMessage(Message message) { + try { + ActiveMQTextMessage m = (ActiveMQTextMessage)message; + assertEquals("1st", m.getText()); + assertEquals(receivedCount.get(), m.getRedeliveryCounter()); + receivedCount.incrementAndGet(); + done.countDown(); + } catch (Exception ignored) { + ignored.printStackTrace(); + } + } + }); + + if (i<=maxRedeliveries) { + assertTrue("listener done", done.await(5, TimeUnit.SECONDS)); + } else { + // final redlivery gets poisoned before dispatch + assertFalse("listener done", done.await(1, TimeUnit.SECONDS)); + } + connection.close(); + connections.remove(connection); + } + + // We should be able to get the message off the DLQ now. + TextMessage m = (TextMessage)dlqConsumer.receive(1000); + assertNotNull("Got message from DLQ", m); + assertEquals("1st", m.getText()); + String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); + assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy")); + dlqSession.commit(); + + } + + public void testRepeatedRedeliveryServerSessionNoCommit() throws Exception { + + connection.start(); + Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = dlqSession.createProducer(destination); + + // Send the messages + producer.send(dlqSession.createTextMessage("1st")); + + dlqSession.commit(); + MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ")); + + final int maxRedeliveries = 4; + final AtomicInteger receivedCount = new AtomicInteger(0); + + for (int i=0;i<=maxRedeliveries+1;i++) { + + connection = (ActiveMQConnection)factory.createConnection(userName, password); + connections.add(connection); + + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(0); + policy.setUseExponentialBackOff(false); + policy.setMaximumRedeliveries(maxRedeliveries); + + connection.start(); + final CountDownLatch done = new CountDownLatch(1); + + final ActiveMQSession session = (ActiveMQSession) connection.createSession(true, Session.SESSION_TRANSACTED); + session.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + try { + ActiveMQTextMessage m = (ActiveMQTextMessage) message; + assertEquals("1st", m.getText()); + assertEquals(receivedCount.get(), m.getRedeliveryCounter()); + receivedCount.incrementAndGet(); + done.countDown(); + } catch (Exception ignored) { + ignored.printStackTrace(); + } + } + }); + + connection.createConnectionConsumer( + destination, + null, + new ServerSessionPool() { + @Override + public ServerSession getServerSession() throws JMSException { + return new ServerSession() { + @Override + public Session getSession() throws JMSException { + return session; + } + + @Override + public void start() throws JMSException { + } + }; + } + }, + 100, + false); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + session.run(); + return done.await(10, TimeUnit.MILLISECONDS); + } + }); + + if (i<=maxRedeliveries) { + assertTrue("listener done @" + i, done.await(5, TimeUnit.SECONDS)); + } else { + // final redlivery gets poisoned before dispatch + assertFalse("listener not done @" + i, done.await(1, TimeUnit.SECONDS)); + } + connection.close(); + connections.remove(connection); + } + + // We should be able to get the message off the DLQ now. + TextMessage m = (TextMessage)dlqConsumer.receive(1000); + assertNotNull("Got message from DLQ", m); + assertEquals("1st", m.getText()); + String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); + assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy")); + dlqSession.commit(); + + } public void testInitialRedeliveryDelayZero() throws Exception {