make networkbridge tests more resilient to time related issues on slow machines

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@831876 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-11-02 12:35:12 +00:00
parent 763ef103d8
commit 4213e42514
8 changed files with 84 additions and 31 deletions

View File

@ -23,7 +23,6 @@ import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener; import javax.jms.MessageListener;
import javax.jms.Session; import javax.jms.Session;

View File

@ -18,6 +18,7 @@ package org.apache.activemq.network;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -282,4 +283,8 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
return removeSucceeded; return removeSucceeded;
} }
public Collection<NetworkBridge> activeBridges() {
return bridges.values();
}
} }

View File

@ -32,8 +32,6 @@ import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportListener; import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/** /**

View File

@ -53,6 +53,7 @@ import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.MessageIdList; import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.Wait;
import org.apache.activemq.xbean.BrokerFactoryBean; import org.apache.activemq.xbean.BrokerFactoryBean;
import org.springframework.core.io.Resource; import org.springframework.core.io.Resource;
@ -96,10 +97,10 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
// By default, bridge them using add network connector of the local broker // By default, bridge them using add network connector of the local broker
// and the first connector of the remote broker // and the first connector of the remote broker
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit) throws Exception { protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit) throws Exception {
List transportConnectors = remoteBroker.getTransportConnectors(); List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
URI remoteURI; URI remoteURI;
if (!transportConnectors.isEmpty()) { if (!transportConnectors.isEmpty()) {
remoteURI = ((TransportConnector)transportConnectors.get(0)).getConnectUri(); remoteURI = transportConnectors.get(0).getConnectUri();
NetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:" + remoteURI)); NetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:" + remoteURI));
connector.setDynamicOnly(dynamicOnly); connector.setDynamicOnly(dynamicOnly);
connector.setNetworkTTL(networkTTL); connector.setNetworkTTL(networkTTL);
@ -126,14 +127,14 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
Collection<BrokerItem> brokerList = brokers.values(); Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) { for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
BrokerService broker = i.next().broker; BrokerService broker = i.next().broker;
List transportConnectors = broker.getTransportConnectors(); List<TransportConnector> transportConnectors = broker.getTransportConnectors();
if (transportConnectors.isEmpty()) { if (transportConnectors.isEmpty()) {
broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT)); broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT));
transportConnectors = broker.getTransportConnectors(); transportConnectors = broker.getTransportConnectors();
} }
TransportConnector transport = (TransportConnector)transportConnectors.get(0); TransportConnector transport = transportConnectors.get(0);
transport.setDiscoveryUri(new URI("multicast://default?group=" + groupName)); transport.setDiscoveryUri(new URI("multicast://default?group=" + groupName));
NetworkConnector nc = broker.addNetworkConnector("multicast://default?group=" + groupName); NetworkConnector nc = broker.addNetworkConnector("multicast://default?group=" + groupName);
nc.setNetworkTTL(ttl); nc.setNetworkTTL(ttl);
@ -145,6 +146,19 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
maxSetupTime = 8000; maxSetupTime = 8000;
} }
protected void waitForBridgeFormation() throws Exception {
for (BrokerItem brokerItem : brokers.values()) {
final BrokerService broker = brokerItem.broker;
if (!broker.getNetworkConnectors().isEmpty()) {
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return !broker.getNetworkConnectors().get(0).activeBridges().isEmpty();
}});
}
}
}
protected void startAllBrokers() throws Exception { protected void startAllBrokers() throws Exception {
Collection<BrokerItem> brokerList = brokers.values(); Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) { for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
@ -465,6 +479,7 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
try { try {
c.close(); c.close();
} catch (ConnectionClosedException e) { } catch (ConnectionClosedException e) {
} catch (JMSException e) {
} }
} }

View File

@ -43,12 +43,13 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport
private static final Log LOG = LogFactory.getLog(MultiBrokersMultiClientsTest.class); private static final Log LOG = LogFactory.getLog(MultiBrokersMultiClientsTest.class);
protected Map consumerMap; protected Map<String, MessageConsumer> consumerMap;
Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, Throwable>(); Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, Throwable>();
public void testTopicAllConnected() throws Exception { public void testTopicAllConnected() throws Exception {
bridgeAllBrokers(); bridgeAllBrokers();
startAllBrokers(); startAllBrokers();
waitForBridgeFormation();
// Setup topic destination // Setup topic destination
Destination dest = createDestination("TEST.FOO", true); Destination dest = createDestination("TEST.FOO", true);
@ -99,6 +100,7 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport
public void testQueueAllConnected() throws Exception { public void testQueueAllConnected() throws Exception {
bridgeAllBrokers(); bridgeAllBrokers();
startAllBrokers(); startAllBrokers();
this.waitForBridgeFormation();
// Setup topic destination // Setup topic destination
Destination dest = createDestination("TEST.FOO", false); Destination dest = createDestination("TEST.FOO", false);
@ -132,7 +134,7 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport
int totalMsg = 0; int totalMsg = 0;
for (int i = 1; i <= BROKER_COUNT; i++) { for (int i = 1; i <= BROKER_COUNT; i++) {
for (int j = 0; j < CONSUMER_COUNT; j++) { for (int j = 0; j < CONSUMER_COUNT; j++) {
MessageIdList msgs = getConsumerMessages("Broker" + i, (MessageConsumer)consumerMap.get("Consumer:" + i + ":" + j)); MessageIdList msgs = getConsumerMessages("Broker" + i, consumerMap.get("Consumer:" + i + ":" + j));
totalMsg += msgs.getMessageCount(); totalMsg += msgs.getMessageCount();
} }
} }
@ -153,7 +155,7 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport
createBroker(new URI("broker:()/Broker" + i + "?persistent=false&useJmx=false")); createBroker(new URI("broker:()/Broker" + i + "?persistent=false&useJmx=false"));
} }
consumerMap = new HashMap(); consumerMap = new HashMap<String, MessageConsumer>();
} }
public void uncaughtException(Thread t, Throwable e) { public void uncaughtException(Thread t, Throwable e) {

View File

@ -46,14 +46,14 @@ public class MultiBrokersMultiClientsUsingTcpTest extends MultiBrokersMultiClien
} }
protected void bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception { protected void bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception {
List remoteTransports = remoteBroker.getTransportConnectors(); List<TransportConnector> remoteTransports = remoteBroker.getTransportConnectors();
List localTransports = localBroker.getTransportConnectors(); List<TransportConnector> localTransports = localBroker.getTransportConnectors();
URI remoteURI; URI remoteURI;
URI localURI; URI localURI;
if (!remoteTransports.isEmpty() && !localTransports.isEmpty()) { if (!remoteTransports.isEmpty() && !localTransports.isEmpty()) {
remoteURI = ((TransportConnector)remoteTransports.get(0)).getConnectUri(); remoteURI = remoteTransports.get(0).getConnectUri();
localURI = ((TransportConnector)localTransports.get(0)).getConnectUri(); localURI = localTransports.get(0).getConnectUri();
// Ensure that we are connecting using tcp // Ensure that we are connecting using tcp
if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) { if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
@ -77,8 +77,8 @@ public class MultiBrokersMultiClientsUsingTcpTest extends MultiBrokersMultiClien
// Assign a tcp connector to each broker // Assign a tcp connector to each broker
int j = 0; int j = 0;
for (Iterator i = brokers.values().iterator(); i.hasNext();) { for (Iterator<BrokerItem> i = brokers.values().iterator(); i.hasNext();) {
((BrokerItem)i.next()).broker.addConnector("tcp://localhost:" + (61616 + j++)); i.next().broker.addConnector("tcp://localhost:" + (61616 + j++));
} }
bridges = new ArrayList<DemandForwardingBridge>(); bridges = new ArrayList<DemandForwardingBridge>();

View File

@ -34,14 +34,15 @@ import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.Topic; import javax.jms.Topic;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import junit.framework.TestCase;
public class NoDuplicateOnTopicNetworkTest extends TestCase { public class NoDuplicateOnTopicNetworkTest extends TestCase {
private static final Log LOG = LogFactory private static final Log LOG = LogFactory
.getLog(NoDuplicateOnTopicNetworkTest.class); .getLog(NoDuplicateOnTopicNetworkTest.class);
@ -71,6 +72,25 @@ public class NoDuplicateOnTopicNetworkTest extends TestCase {
Thread.sleep(3000); Thread.sleep(3000);
broker1 = createAndStartBroker("broker1", BROKER_1); broker1 = createAndStartBroker("broker1", BROKER_1);
Thread.sleep(1000); Thread.sleep(1000);
waitForBridgeFormation();
}
protected void waitForBridgeFormation() throws Exception {
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return !broker3.getNetworkConnectors().get(0).activeBridges().isEmpty();
}});
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return !broker2.getNetworkConnectors().get(0).activeBridges().isEmpty();
}});
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return !broker1.getNetworkConnectors().get(0).activeBridges().isEmpty();
}});
} }
private BrokerService createAndStartBroker(String name, String addr) private BrokerService createAndStartBroker(String name, String addr)

View File

@ -63,6 +63,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
bridgeBrokers("BrokerB", "BrokerC"); bridgeBrokers("BrokerB", "BrokerC");
startAllBrokers(); startAllBrokers();
waitForBridgeFormation();
// Setup destination // Setup destination
Destination dest = createDestination("TEST.FOO", false); Destination dest = createDestination("TEST.FOO", false);
@ -90,6 +91,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
bridgeBrokers("BrokerB", "BrokerC"); bridgeBrokers("BrokerB", "BrokerC");
startAllBrokers(); startAllBrokers();
waitForBridgeFormation();
// Setup destination // Setup destination
Destination dest = createDestination("TEST.FOO", false); Destination dest = createDestination("TEST.FOO", false);
@ -121,6 +123,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
bridgeBrokers("BrokerB", "BrokerC", true, 1, false); bridgeBrokers("BrokerB", "BrokerC", true, 1, false);
startAllBrokers(); startAllBrokers();
waitForBridgeFormation();
// Setup destination // Setup destination
Destination dest = createDestination("TEST.FOO", false); Destination dest = createDestination("TEST.FOO", false);
@ -162,6 +165,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
bridgeBrokers("BrokerB", "BrokerC", true, 1, false); bridgeBrokers("BrokerB", "BrokerC", true, 1, false);
startAllBrokers(); startAllBrokers();
waitForBridgeFormation();
// Setup destination // Setup destination
Destination dest = createDestination("TEST.FOO", false); Destination dest = createDestination("TEST.FOO", false);
@ -203,6 +207,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
bridgeBrokers("BrokerC", "BrokerB"); bridgeBrokers("BrokerC", "BrokerB");
startAllBrokers(); startAllBrokers();
waitForBridgeFormation();
// Setup destination // Setup destination
Destination dest = createDestination("TEST.FOO", false); Destination dest = createDestination("TEST.FOO", false);
@ -235,6 +240,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
bridgeBrokers("BrokerC", "BrokerA"); bridgeBrokers("BrokerC", "BrokerA");
startAllBrokers(); startAllBrokers();
waitForBridgeFormation();
// Setup destination // Setup destination
Destination dest = createDestination("TEST.FOO", false); Destination dest = createDestination("TEST.FOO", false);
@ -265,6 +271,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
bridgeAllBrokers(); bridgeAllBrokers();
startAllBrokers(); startAllBrokers();
waitForBridgeFormation();
// Setup destination // Setup destination
Destination dest = createDestination("TEST.FOO", false); Destination dest = createDestination("TEST.FOO", false);
@ -311,6 +318,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
public void testAllConnectedUsingMulticastProducerConsumerOnA() throws Exception { public void testAllConnectedUsingMulticastProducerConsumerOnA() throws Exception {
bridgeAllBrokers("default", 3, false); bridgeAllBrokers("default", 3, false);
startAllBrokers(); startAllBrokers();
waitForBridgeFormation();
// Setup destination // Setup destination
Destination dest = createDestination("TEST.FOO", false); Destination dest = createDestination("TEST.FOO", false);
@ -336,6 +344,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
public void testAllConnectedWithSpare() throws Exception { public void testAllConnectedWithSpare() throws Exception {
bridgeAllBrokers("default", 3, false); bridgeAllBrokers("default", 3, false);
startAllBrokers(); startAllBrokers();
waitForBridgeFormation();
// Setup destination // Setup destination
Destination dest = createDestination("TEST.FOO", false); Destination dest = createDestination("TEST.FOO", false);
@ -362,6 +371,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
boolean suppressQueueDuplicateSubscriptions = false; boolean suppressQueueDuplicateSubscriptions = false;
bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions); bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions);
startAllBrokers(); startAllBrokers();
waitForBridgeFormation();
// Setup destination // Setup destination
Destination dest = createDestination("TEST.FOO", false); Destination dest = createDestination("TEST.FOO", false);
@ -419,6 +429,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
boolean decreaseNetworkConsumerPriority = true; boolean decreaseNetworkConsumerPriority = true;
bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions, decreaseNetworkConsumerPriority); bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions, decreaseNetworkConsumerPriority);
startAllBrokers(); startAllBrokers();
waitForBridgeFormation();
// Setup destination // Setup destination
Destination dest = createDestination("TEST.FOO", false); Destination dest = createDestination("TEST.FOO", false);
@ -462,6 +473,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
bridgeAllBrokers("default", 3, true); bridgeAllBrokers("default", 3, true);
startAllBrokers(); startAllBrokers();
waitForBridgeFormation();
// Setup destination // Setup destination
Destination dest = createDestination("TEST.FOO", false); Destination dest = createDestination("TEST.FOO", false);
@ -531,6 +543,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
}); });
startAllBrokers(); startAllBrokers();
waitForBridgeFormation();
// Setup consumers // Setup consumers
@ -558,6 +571,7 @@ public class ThreeBrokerQueueNetworkTest extends JmsMultipleBrokersTestSupport {
bridgeAllBrokers("default", 3, false); bridgeAllBrokers("default", 3, false);
startAllBrokers(); startAllBrokers();
waitForBridgeFormation();
// Setup destination // Setup destination
Destination dest = createDestination("TEST.FOO", false); Destination dest = createDestination("TEST.FOO", false);