diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java index a0adb064e5..6beb710a0a 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java @@ -28,12 +28,10 @@ import javax.jms.Destination; import javax.jms.MessageConsumer; import org.apache.activemq.JmsMultipleBrokersTestSupport; -import org.apache.activemq.JmsMultipleBrokersTestSupport.BrokerItem; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; @@ -50,7 +48,12 @@ import org.apache.commons.logging.LogFactory; public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport { private static final Log LOG = LogFactory.getLog(ThreeBrokerQueueNetworkTest.class); protected static final int MESSAGE_COUNT = 100; + private static final long MAX_WAIT_MILLIS = 10000; + interface Condition { + boolean isSatisified() throws Exception; + } + /** * BrokerA -> BrokerB -> BrokerC */ @@ -280,14 +283,31 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport { Thread.sleep(1000); // Get message count - MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); + final MessageIdList msgsA = getConsumerMessages("BrokerA", clientA); MessageIdList msgsB = getConsumerMessages("BrokerB", clientB); MessageIdList msgsC = getConsumerMessages("BrokerC", clientC); + waitFor(new Condition() { + public boolean isSatisified() { + return msgsA.getMessageCount() == MESSAGE_COUNT; + } + }); + assertEquals(MESSAGE_COUNT * 3, msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount()); } - + // on slow machines some more waiting is required on account of slow advisories + private void waitFor(Condition condition) throws Exception { + final long expiry = System.currentTimeMillis() + MAX_WAIT_MILLIS; + while (!condition.isSatisified() && System.currentTimeMillis() < expiry) { + Thread.sleep(1000); + } + if (System.currentTimeMillis() >= expiry) { + LOG.error("expired while waiting for condition " + condition); + } + + } + public void testAllConnectedUsingMulticastProducerConsumerOnA() throws Exception { bridgeAllBrokers("default", 3, false); startAllBrokers(); @@ -551,8 +571,13 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport { } } - private void verifyConsumerCount(BrokerService broker, int count, Destination dest) throws Exception { - RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); + private void verifyConsumerCount(BrokerService broker, int count, final Destination dest) throws Exception { + final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); + waitFor(new Condition() { + public boolean isSatisified() throws Exception { + return !regionBroker.getDestinations(ActiveMQDestination.transform(dest)).isEmpty(); + } + }); Queue internalQueue = (Queue) regionBroker.getDestinations(ActiveMQDestination.transform(dest)).iterator().next(); assertEquals("consumer count on " + broker.getBrokerName() + " matches for q: " + internalQueue, count, internalQueue.getConsumers().size()); }