diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java index cd250da96d..804d4681d2 100644 --- a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java @@ -90,7 +90,7 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport { // Wait for all producers to finish sending synchronized (producerLock) { while (producerLock.get() != 0) { - producerLock.wait(); + producerLock.wait(2000); } } @@ -169,16 +169,16 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport { protected TopicSubscriber createDurableSubscriber(Connection conn, Destination dest, String name) throws Exception { conn.setClientID(name); connections.add(conn); + conn.start(); Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); final TopicSubscriber consumer = sess.createDurableSubscriber((javax.jms.Topic)dest, name); - conn.start(); return consumer; } - protected void waitForAllMessagesToBeReceived(int timeout) throws Exception { - //Thread.sleep(timeout); + protected void waitForAllMessagesToBeReceived(int messageCount) throws Exception { + allMessagesList.waitForMessagesToArrive(messageCount); } protected ActiveMQDestination createDestination() throws JMSException { @@ -233,7 +233,7 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport { protected void assertConsumerReceivedXMessages(MessageConsumer consumer, int msgCount) { MessageList messageList = (MessageList)consumers.get(consumer); - messageList.assertMessagesReceived(msgCount); + messageList.assertMessagesReceivedNoWait(msgCount); } protected void assertEachConsumerReceivedAtLeastXMessages(int msgCount) { @@ -255,7 +255,7 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport { } protected void assertTotalMessagesReceived(int msgCount) { - allMessagesList.assertMessagesReceived(msgCount); + allMessagesList.assertMessagesReceivedNoWait(msgCount); // now lets count the individual messages received int totalMsg = 0; diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java index 0b9ae65e6f..44117cc4f0 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java @@ -137,11 +137,11 @@ public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport { startConsumers(consumerFactory, dest); // Wait for consumers to setup - Thread.sleep(500); + Thread.sleep(1000); startProducers(dest, messageCount); // Wait for messages to be received. Make it proportional to the messages delivered. - waitForAllMessagesToBeReceived((producerCount * messageCount) / 2000); + waitForAllMessagesToBeReceived(messageCount * producerCount); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/util/MessageList.java b/activemq-core/src/test/java/org/apache/activemq/util/MessageList.java index f35a402864..6b6f674eec 100644 --- a/activemq-core/src/test/java/org/apache/activemq/util/MessageList.java +++ b/activemq-core/src/test/java/org/apache/activemq/util/MessageList.java @@ -43,6 +43,7 @@ public class MessageList extends Assert implements MessageListener { private Object semaphore; private boolean verbose; private MessageListener parent; + private long maximumDuration = 15000L; public MessageList() { this(new Object()); @@ -134,6 +135,10 @@ public class MessageList extends Assert implements MessageListener { if (hasReceivedMessages(messageCount)) { break; } + long duration = System.currentTimeMillis() - start; + if (duration > maximumDuration ) { + break; + } synchronized (semaphore) { semaphore.wait(4000); } @@ -144,28 +149,43 @@ public class MessageList extends Assert implements MessageListener { } long end = System.currentTimeMillis() - start; - System.out.println("End of wait for " + end + " millis"); + System.out.println("End of wait for " + end + " millis and received: " + getMessageCount() + " messages"); } /** * Performs a testing assertion that the correct number of messages have - * been received + * been received without waiting + * + * @param messageCount + */ + public void assertMessagesReceivedNoWait(int messageCount) { + assertEquals("expected number of messages when received", messageCount, getMessageCount()); + } + + /** + * Performs a testing assertion that the correct number of messages have + * been received waiting for the messages to arrive up to a fixed amount of time. * * @param messageCount */ public void assertMessagesReceived(int messageCount) { waitForMessagesToArrive(messageCount); - assertEquals("expected number of messages when received: " + getMessages(), messageCount, getMessageCount()); + assertMessagesReceivedNoWait(messageCount); } + /** + * Asserts that there are at least the given number of messages received without waiting. + */ public void assertAtLeastMessagesReceived(int messageCount) { - waitForMessagesToArrive(messageCount); - int actual = getMessageCount(); assertTrue("at least: " + messageCount + " messages received. Actual: " + actual, actual >= messageCount); } + /** + * Asserts that there are at most the number of messages received without waiting + * @param messageCount + */ public void assertAtMostMessagesReceived(int messageCount) { int actual = getMessageCount(); assertTrue("at most: " + messageCount + " messages received. Actual: " + actual, actual <= messageCount);