diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java index 66938c5be9..d34f5ca8cf 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java @@ -41,19 +41,20 @@ import org.apache.activemq.util.Wait; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { private static final Logger LOG = LoggerFactory.getLogger(ExpiredMessagesWithNoConsumerTest.class); + private final ActiveMQDestination destination = new ActiveMQQueue("test"); - BrokerService broker; - Connection connection; - Session session; - MessageProducer producer; - public ActiveMQDestination destination = new ActiveMQQueue("test"); - public boolean optimizedDispatch = true; - public PendingQueueMessageStoragePolicy pendingQueuePolicy; + private boolean optimizedDispatch = true; + private PendingQueueMessageStoragePolicy pendingQueuePolicy; + + private BrokerService broker; + private String connectionUri; + private Connection connection; + private Session session; + private MessageProducer producer; public static Test suite() { return suite(ExpiredMessagesWithNoConsumerTest.class); @@ -76,11 +77,11 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { broker.setBrokerName("localhost"); broker.setUseJmx(true); broker.setDeleteAllMessagesOnStartup(true); - broker.addConnector("tcp://localhost:61616"); + broker.addConnector("tcp://localhost:0"); PolicyMap policyMap = new PolicyMap(); PolicyEntry defaultEntry = new PolicyEntry(); - defaultEntry.setOptimizedDispatch(optimizedDispatch ); + defaultEntry.setOptimizedDispatch(optimizedDispatch); defaultEntry.setExpireMessagesPeriod(800); defaultEntry.setMaxExpirePageSize(800); @@ -93,11 +94,12 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { } policyMap.setDefaultEntry(defaultEntry); + broker.setDestinationPolicy(policyMap); - broker.start(); - broker.waitUntilStarted(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); } public void initCombosForTestExpiredMessagesWithNoConsumer() { @@ -109,7 +111,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { createBrokerWithMemoryLimit(); - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(destination); @@ -139,7 +141,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { - producingThread.join(1000); + producingThread.join(TimeUnit.SECONDS.toMillis(1000)); return !producingThread.isAlive(); } })); @@ -157,15 +159,16 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + ", size= " + view.getQueueSize()); - assertEquals("All sent have expired", sendCount, view.getExpiredCount()); - assertEquals("memory usage goes to duck egg", 0, view.getMemoryPercentUsage()); + assertEquals("Not all sent messages have expired", sendCount, view.getExpiredCount()); + assertEquals("memory usage doesn't go to duck egg", 0, view.getMemoryPercentUsage()); } // first ack delivered after expiry public void testExpiredMessagesWithVerySlowConsumer() throws Exception { createBroker(); final long queuePrefetch = 600; - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + connectionUri + "?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch); connection = factory.createConnection(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); producer = session.createProducer(destination); @@ -183,7 +186,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { try { LOG.info("Got my message: " + message); receivedOneCondition.countDown(); - waitCondition.await(60, TimeUnit.SECONDS); + waitCondition.await(6, TimeUnit.MINUTES); LOG.info("acking message: " + message); message.acknowledge(); } catch (Exception e) { @@ -195,7 +198,6 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { connection.start(); - final Thread producingThread = new Thread("Producing Thread") { public void run() { try { @@ -222,7 +224,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { producingThread.join(1000); return !producingThread.isAlive(); } - }, Wait.MAX_WAIT_MILLIS * 2)); + }, Wait.MAX_WAIT_MILLIS * 10)); final DestinationViewMBean view = createView(destination); @@ -231,7 +233,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { return queuePrefetch == view.getDispatchCount(); } })); - assertTrue("All sent have expired ", Wait.waitFor(new Wait.Condition() { + assertTrue("Not all sent have expired ", Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { return sendCount == view.getExpiredCount(); } @@ -255,10 +257,10 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + ", size= " + view.getQueueSize()); - - assertEquals("inflight reduces to half prefetch minus single delivered message", (queuePrefetch/2) -1, view.getInFlightCount()); - assertEquals("size gets back to 0 ", 0, view.getQueueSize()); - assertEquals("dequeues match sent/expired ", sendCount, view.getDequeueCount()); + assertEquals("inflight didn't reduce to half prefetch minus single delivered message", + (queuePrefetch/2) -1, view.getInFlightCount()); + assertEquals("size didn't get back to 0 ", 0, view.getQueueSize()); + assertEquals("dequeues didn't match sent/expired ", sendCount, view.getDequeueCount()); consumer.close(); @@ -275,7 +277,8 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { public void testExpiredMessagesWithVerySlowConsumerCanContinue() throws Exception { createBroker(); final long queuePrefetch = 600; - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + connectionUri + "?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch); connection = factory.createConnection(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); producer = session.createProducer(destination); @@ -291,11 +294,15 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { public void onMessage(Message message) { try { - LOG.info("Got my message: " + message); + if(LOG.isDebugEnabled()) { + LOG.debug("Got my message: " + message); + } receivedOneCondition.countDown(); received.incrementAndGet(); - waitCondition.await(60, TimeUnit.SECONDS); - LOG.info("acking message: " + message); + waitCondition.await(5, TimeUnit.MINUTES); + if(LOG.isDebugEnabled()) { + LOG.debug("acking message: " + message); + } message.acknowledge(); } catch (Exception e) { e.printStackTrace(); @@ -306,7 +313,6 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { connection.start(); - final Thread producingThread = new Thread("Producing Thread") { public void run() { try { @@ -333,16 +339,16 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { producingThread.join(1000); return !producingThread.isAlive(); } - })); + }, Wait.MAX_WAIT_MILLIS * 10)); final DestinationViewMBean view = createView(destination); - assertTrue("all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() { + assertTrue("Not all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { return queuePrefetch == view.getDispatchCount(); } })); - assertTrue("All sent have expired ", Wait.waitFor(new Wait.Condition() { + assertTrue("All have not sent have expired ", Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { return sendCount == view.getExpiredCount(); } @@ -366,16 +372,20 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + ", size= " + view.getQueueSize()); - - assertEquals("inflight reduces to half prefetch minus single delivered message", (queuePrefetch/2) -1, view.getInFlightCount()); - assertEquals("size gets back to 0 ", 0, view.getQueueSize()); - assertEquals("dequeues match sent/expired ", sendCount, view.getDequeueCount()); - + assertEquals("inflight didn't reduce to half prefetch minus single delivered message", + (queuePrefetch/2) -1, view.getInFlightCount()); + assertEquals("size doesn't get back to 0 ", 0, view.getQueueSize()); + assertEquals("dequeues don't match sent/expired ", sendCount, view.getDequeueCount()); // produce some more producer.setTimeToLive(0); + long tStamp = System.currentTimeMillis(); for (int i=0; i= sendCount; @@ -458,11 +463,8 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { assertEquals(0, received.get()); assertEquals(10, view.getExpiredCount()); assertEquals(0, view.getEnqueueCount()); - } - - protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { String domain = "org.apache.activemq"; ObjectName name; @@ -481,7 +483,19 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { broker.waitUntilStopped(); } + public boolean getOptimizedDispatch() { + return this.optimizedDispatch; + } + public void setOptimizedDispatch(boolean option) { + this.optimizedDispatch = option; + } + public PendingQueueMessageStoragePolicy getPendingQueuePolicy() { + return this.pendingQueuePolicy; + } + public void setPendingQueuePolicy(PendingQueueMessageStoragePolicy policy) { + this.pendingQueuePolicy = policy; + } }