diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 6d3beb017b..45843620d8 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -502,6 +502,10 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } else if (redeliveryExceeded(md)) { LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md); posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy); + if (timeout > 0) { + timeout = Math.max(deadline - System.currentTimeMillis(), 0); + } + sendPullCommand(timeout); } else { if (LOG.isTraceEnabled()) { LOG.trace(getConsumerId() + " received message: " + md); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java index b34fe7f739..a98263b054 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java @@ -16,43 +16,56 @@ */ package org.apache.activemq; -import javax.jms.Connection; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; +import javax.jms.TextMessage; -/** - * - */ -public class ConsumerReceiveWithTimeoutTest extends TestSupport { +import org.apache.activemq.broker.BrokerService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; - private Connection connection; +public class ConsumerReceiveWithTimeoutTest { + + private ActiveMQConnection connection; + private BrokerService broker; + private String connectionUri; + + @Before + public void setUp() throws Exception { + createBroker(); - protected void setUp() throws Exception { - super.setUp(); connection = createConnection(); } - /** - * @see junit.framework.TestCase#tearDown() - */ - protected void tearDown() throws Exception { + @After + public void tearDown() throws Exception { if (connection != null) { - connection.close(); - connection = null; + try { + connection.close(); + } catch (Exception e) {} + } + + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); } - super.tearDown(); } /** * Test to check if consumer thread wakes up inside a receive(timeout) after * a message is dispatched to the consumer - * + * * @throws javax.jms.JMSException */ + @Test(timeout = 30000) public void testConsumerReceiveBeforeMessageDispatched() throws JMSException { connection.start(); @@ -61,6 +74,7 @@ public class ConsumerReceiveWithTimeoutTest extends TestSupport { final Queue queue = session.createQueue("test"); Thread t = new Thread() { + @Override public void run() { try { // wait for 10 seconds to allow consumer.receive to be run @@ -81,7 +95,67 @@ public class ConsumerReceiveWithTimeoutTest extends TestSupport { Message msg = consumer.receive(60000); assertNotNull(msg); session.close(); - } + /** + * check if receive(timeout) does timeout when prefetch=0 and redeliveries=0 + *
+ * send a message. + * consume and rollback to ensure redeliverCount is incremented + * try to consume message with a timeout. + */ + @Test(timeout=20000) + public void testConsumerReceivePrefetchZeroRedeliveryZero() throws Exception { + + connection.start(); + + // push message to queue + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("test.prefetch.zero"); + MessageProducer producer = session.createProducer(queue); + TextMessage textMessage = session.createTextMessage("test Message"); + producer.send(textMessage); + session.close(); + + // consume and rollback - increase redelivery counter on message + session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(queue); + Message message = consumer.receive(2000); + assertNotNull(message); + session.rollback(); + session.close(); + + // Reconnect with zero prefetch and zero redeliveries allowed. + connection.close(); + connection = createConnection(); + connection.getPrefetchPolicy().setQueuePrefetch(0); + connection.getRedeliveryPolicy().setMaximumRedeliveries(0); + connection.start(); + + // try consume with timeout - expect it to timeout and return NULL message + session = connection.createSession(true, Session.SESSION_TRANSACTED); + consumer = session.createConsumer(queue); + message = consumer.receive(3000); + + assertNull(message); + } + + private void createBroker() throws Exception { + broker = new BrokerService(); + broker.setUseJmx(false); + broker.setPersistent(false); + broker.addConnector("tcp://localhost:0"); + broker.start(); + broker.waitUntilStarted(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + } + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(connectionUri); + } + + protected ActiveMQConnection createConnection() throws Exception { + return (ActiveMQConnection) createConnectionFactory().createConnection(); + } }