diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java index 0875273de2..bd6478a960 100644 --- a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java @@ -26,6 +26,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -38,6 +40,9 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; +import org.apache.activemq.advisory.ConsumerEvent; +import org.apache.activemq.advisory.ConsumerEventSource; +import org.apache.activemq.advisory.ConsumerListener; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; @@ -218,6 +223,38 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport { } return null; } + + protected void assertConsumersConnect(String brokerName, Destination destination, final int count, long timeout) throws Exception { + BrokerItem brokerItem = brokers.get(brokerName); + Connection conn = brokerItem.createConnection(); + conn.start(); + ConsumerEventSource ces = new ConsumerEventSource(conn, destination); + + try { + final AtomicInteger actualConnected = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(1); + ces.setConsumerListener(new ConsumerListener(){ + public void onConsumerEvent(ConsumerEvent event) { + if( actualConnected.get() < count ) { + actualConnected.set(event.getConsumerCount()); + } + if( event.getConsumerCount() >= count ) { + latch.countDown(); + } + } + }); + ces.start(); + + latch.await(timeout, TimeUnit.MILLISECONDS); + assertTrue("Expected at least "+count+" consumers to connect, but only "+actualConnected.get()+" connectect within "+timeout+" ms", actualConnected.get() >= count); + + } finally { + ces.stop(); + conn.close(); + brokerItem.connections.remove(conn); + } + } + protected void sendMessages(String brokerName, Destination destination, int count) throws Exception { BrokerItem brokerItem = brokers.get(brokerName); diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java index 139fb1e1d7..bc59576679 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java @@ -56,7 +56,10 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport } // wait for consumers to get propagated - Thread.sleep(5000); + for (int i = 1; i <= BROKER_COUNT; i++) { + // all consumers on the remote brokers look like 1 consumer to the local broker. + assertConsumersConnect("Broker" + i, dest, (BROKER_COUNT-1)+CONSUMER_COUNT, 30000); + } // Send messages for (int i = 1; i <= BROKER_COUNT; i++) {