mirror of https://github.com/apache/activemq.git
Making the MultiBrokersMultiClientsTest less timing dependent. Using advisories to know when the subscriptions have been setup so
that we can start the publishing at the right time. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@643407 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2a328ed27c
commit
a818270040
|
@ -26,6 +26,8 @@ import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.ConnectionFactory;
|
import javax.jms.ConnectionFactory;
|
||||||
|
@ -38,6 +40,9 @@ import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
import javax.jms.Topic;
|
import javax.jms.Topic;
|
||||||
|
|
||||||
|
import org.apache.activemq.advisory.ConsumerEvent;
|
||||||
|
import org.apache.activemq.advisory.ConsumerEventSource;
|
||||||
|
import org.apache.activemq.advisory.ConsumerListener;
|
||||||
import org.apache.activemq.broker.BrokerFactory;
|
import org.apache.activemq.broker.BrokerFactory;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.TransportConnector;
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
|
@ -219,6 +224,38 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void assertConsumersConnect(String brokerName, Destination destination, final int count, long timeout) throws Exception {
|
||||||
|
BrokerItem brokerItem = brokers.get(brokerName);
|
||||||
|
Connection conn = brokerItem.createConnection();
|
||||||
|
conn.start();
|
||||||
|
ConsumerEventSource ces = new ConsumerEventSource(conn, destination);
|
||||||
|
|
||||||
|
try {
|
||||||
|
final AtomicInteger actualConnected = new AtomicInteger();
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
ces.setConsumerListener(new ConsumerListener(){
|
||||||
|
public void onConsumerEvent(ConsumerEvent event) {
|
||||||
|
if( actualConnected.get() < count ) {
|
||||||
|
actualConnected.set(event.getConsumerCount());
|
||||||
|
}
|
||||||
|
if( event.getConsumerCount() >= count ) {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
ces.start();
|
||||||
|
|
||||||
|
latch.await(timeout, TimeUnit.MILLISECONDS);
|
||||||
|
assertTrue("Expected at least "+count+" consumers to connect, but only "+actualConnected.get()+" connectect within "+timeout+" ms", actualConnected.get() >= count);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
ces.stop();
|
||||||
|
conn.close();
|
||||||
|
brokerItem.connections.remove(conn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
protected void sendMessages(String brokerName, Destination destination, int count) throws Exception {
|
protected void sendMessages(String brokerName, Destination destination, int count) throws Exception {
|
||||||
BrokerItem brokerItem = brokers.get(brokerName);
|
BrokerItem brokerItem = brokers.get(brokerName);
|
||||||
|
|
||||||
|
|
|
@ -56,7 +56,10 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for consumers to get propagated
|
// wait for consumers to get propagated
|
||||||
Thread.sleep(5000);
|
for (int i = 1; i <= BROKER_COUNT; i++) {
|
||||||
|
// all consumers on the remote brokers look like 1 consumer to the local broker.
|
||||||
|
assertConsumersConnect("Broker" + i, dest, (BROKER_COUNT-1)+CONSUMER_COUNT, 30000);
|
||||||
|
}
|
||||||
|
|
||||||
// Send messages
|
// Send messages
|
||||||
for (int i = 1; i <= BROKER_COUNT; i++) {
|
for (int i = 1; i <= BROKER_COUNT; i++) {
|
||||||
|
|
Loading…
Reference in New Issue