diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeQueuePrefetchTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeQueuePrefetchTest.java index b9be4466d5..dbb53376b0 100755 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeQueuePrefetchTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeQueuePrefetchTest.java @@ -17,26 +17,41 @@ package org.apache.activemq.usecases; import javax.jms.JMSException; +import javax.jms.Message; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; -/** - * @version $Revision: 1.1.1.1 $ - */ public class ConsumeQueuePrefetchTest extends ConsumeTopicPrefetchTest { - - /** - * TODO disabled failing test cases until we fix queue dispatching - */ - public void testSendDoublePrefetchSize() throws JMSException { - } - - /** - * TODO disabled failing test cases until we fix queue dispatching - */ - public void testSendPrefetchSizePlusOne() throws JMSException { - } - + private static final Log LOG = LogFactory.getLog(ConsumeQueuePrefetchTest.class); + protected void setUp() throws Exception { topic = false; super.setUp(); } + + public void testInflightWithConsumerPerMessage() throws JMSException { + makeMessages(prefetchSize); + + LOG.info("About to send and receive: " + prefetchSize + " on destination: " + destination + + " of type: " + destination.getClass().getName()); + + for (int i = 0; i < prefetchSize; i++) { + Message message = session.createTextMessage(messageTexts[i]); + producer.send(message); + } + + validateConsumerPrefetch(this.getSubject(), prefetchSize); + + // new consumer per 20 messages + for (int i = 0; i < prefetchSize; i+=20) { + consumer.close(); + consumer = session.createConsumer(destination); + validateConsumerPrefetch(this.getSubject(), prefetchSize - i); + for (int j=0; j<20; j++) { + Message message = consumeMessge(i+j); + message.acknowledge(); + } + } + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java index ee76db52b2..5dc3f813dd 100755 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java @@ -22,7 +22,12 @@ import javax.jms.Message; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.broker.BrokerRegistry; import org.apache.activemq.broker.policy.IndividualDeadLetterViaXmlTest; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.util.Wait; +import org.apache.activemq.util.Wait.Condition; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -59,6 +64,8 @@ public class ConsumeTopicPrefetchTest extends ProducerConsumerTestSupport { producer.send(message); } + validateConsumerPrefetch(this.getSubject(), prefetchSize); + // lets consume them in two fetch batches for (int i = 0; i < messageCount; i++) { consumeMessge(i); @@ -72,12 +79,13 @@ public class ConsumeTopicPrefetchTest extends ProducerConsumerTestSupport { return connection; } - protected void consumeMessge(int i) throws JMSException { + protected TextMessage consumeMessge(int i) throws JMSException { Message message = consumer.receive(consumerTimeout); assertTrue("Should have received a message by now for message: " + i, message != null); assertTrue("Should be a TextMessage: " + message, message instanceof TextMessage); TextMessage textMessage = (TextMessage) message; assertEquals("Message content", messageTexts[i], textMessage.getText()); + return textMessage; } @@ -88,4 +96,27 @@ public class ConsumeTopicPrefetchTest extends ProducerConsumerTestSupport { } } + protected void validateConsumerPrefetch(String destination, final long expectedCount) throws JMSException { + RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().lookup("localhost").getRegionBroker(); + for (org.apache.activemq.broker.region.Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) { + final org.apache.activemq.broker.region.Destination target = dest; + if (dest.getName().equals(destination)) { + try { + Wait.waitFor(new Condition() { + public boolean isSatisified() throws Exception { + DestinationStatistics stats = target.getDestinationStatistics(); + LOG.info("inflight for : " + target.getName() + ": " + stats.getInflight().getCount()); + return stats.getInflight().getCount() == expectedCount; + } + }); + } catch (Exception e) { + throw new JMSException(e.toString()); + } + DestinationStatistics stats = dest.getDestinationStatistics(); + LOG.info("inflight for : " + dest.getName() + ": " + stats.getInflight().getCount()); + assertEquals("inflight for: " + dest.getName() + ": " + stats.getInflight().getCount() + " matches", + expectedCount, stats.getInflight().getCount()); + } + } + } }