mirror of https://github.com/apache/activemq.git
Additional fixes and test updates. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1409045 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7a0c1f85ab
commit
dd0b16d38b
|
@ -227,6 +227,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bridges.clear();
|
bridges.clear();
|
||||||
|
activeEvents.clear();
|
||||||
try {
|
try {
|
||||||
this.discoveryAgent.stop();
|
this.discoveryAgent.stop();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
|
@ -53,6 +54,7 @@ import org.apache.activemq.broker.region.TopicRegion;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
import org.apache.activemq.command.BrokerInfo;
|
||||||
import org.apache.activemq.network.DiscoveryNetworkConnector;
|
import org.apache.activemq.network.DiscoveryNetworkConnector;
|
||||||
import org.apache.activemq.network.NetworkBridge;
|
import org.apache.activemq.network.NetworkBridge;
|
||||||
import org.apache.activemq.network.NetworkConnector;
|
import org.apache.activemq.network.NetworkConnector;
|
||||||
|
@ -202,6 +204,68 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Timed wait for {@link #hasBridge(String, String)}.
|
||||||
|
*
|
||||||
|
* @see #hasBridge(String, String)
|
||||||
|
*
|
||||||
|
* @param localBrokerName
|
||||||
|
* - the name of the broker on the "local" side of the bridge
|
||||||
|
* @param remoteBrokerName
|
||||||
|
* - the name of the broker on the "remote" side of the bridge
|
||||||
|
* @param time
|
||||||
|
* - the maximum time to wait for the bridge to be established
|
||||||
|
* @param units
|
||||||
|
* - the units for <param>time</param>
|
||||||
|
* @throws InterruptedException
|
||||||
|
* - if the calling thread is interrupted
|
||||||
|
* @throws TimeoutException
|
||||||
|
* - if the bridge is not established within the time limit
|
||||||
|
* @throws Exception
|
||||||
|
* - some other unknown error occurs
|
||||||
|
*/
|
||||||
|
protected void waitForBridge(final String localBrokerName,
|
||||||
|
final String remoteBrokerName, long time, TimeUnit units)
|
||||||
|
throws InterruptedException, TimeoutException, Exception {
|
||||||
|
if (!Wait.waitFor(new Wait.Condition() {
|
||||||
|
public boolean isSatisified() {
|
||||||
|
return hasBridge(localBrokerName, remoteBrokerName);
|
||||||
|
}
|
||||||
|
}, units.toMillis(time))) {
|
||||||
|
throw new TimeoutException("Bridge not established from broker "
|
||||||
|
+ localBrokerName + " to " + remoteBrokerName + " within "
|
||||||
|
+ units.toMillis(time) + " milliseconds.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determines whether a bridge has been established between the specified
|
||||||
|
* brokers.Establishment means that connections have been created and broker
|
||||||
|
* info has been exchanged. Due to the asynchronous nature of the
|
||||||
|
* connections, there is still a possibility that the bridge may fail
|
||||||
|
* shortly after establishment.
|
||||||
|
*
|
||||||
|
* @param localBrokerName
|
||||||
|
* - the name of the broker on the "local" side of the bridge
|
||||||
|
* @param remoteBrokerName
|
||||||
|
* - the name of the broker on the "remote" side of the bridge
|
||||||
|
*/
|
||||||
|
protected boolean hasBridge(String localBrokerName, String remoteBrokerName) {
|
||||||
|
final BrokerItem fromBroker = brokers.get(localBrokerName);
|
||||||
|
if (fromBroker == null) {
|
||||||
|
throw new IllegalArgumentException("Unknown broker: "
|
||||||
|
+ localBrokerName);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (BrokerInfo peerInfo : fromBroker.broker.getRegionBroker()
|
||||||
|
.getPeerBrokerInfos()) {
|
||||||
|
if (peerInfo.getBrokerName().equals(remoteBrokerName)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
protected void waitForBridgeFormation() throws Exception {
|
protected void waitForBridgeFormation() throws Exception {
|
||||||
waitForBridgeFormation(1);
|
waitForBridgeFormation(1);
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.activemq.command.DiscoveryEvent;
|
||||||
import org.apache.activemq.network.DiscoveryNetworkConnector;
|
import org.apache.activemq.network.DiscoveryNetworkConnector;
|
||||||
import org.apache.activemq.network.NetworkBridge;
|
import org.apache.activemq.network.NetworkBridge;
|
||||||
import org.apache.activemq.network.NetworkBridgeListener;
|
import org.apache.activemq.network.NetworkBridgeListener;
|
||||||
|
import org.apache.activemq.network.NetworkConnector;
|
||||||
import org.apache.activemq.thread.TaskRunnerFactory;
|
import org.apache.activemq.thread.TaskRunnerFactory;
|
||||||
import org.apache.activemq.transport.Transport;
|
import org.apache.activemq.transport.Transport;
|
||||||
import org.apache.activemq.transport.discovery.DiscoveryAgent;
|
import org.apache.activemq.transport.discovery.DiscoveryAgent;
|
||||||
|
@ -49,6 +50,18 @@ import org.junit.Assert;
|
||||||
* being reported as active.
|
* being reported as active.
|
||||||
*/
|
*/
|
||||||
public class AMQ4160Test extends JmsMultipleBrokersTestSupport {
|
public class AMQ4160Test extends JmsMultipleBrokersTestSupport {
|
||||||
|
final long MAX_TEST_TIME = TimeUnit.MINUTES.toMillis(2);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Since these tests involve wait conditions, protect against indefinite
|
||||||
|
* waits (due to unanticipated issues).
|
||||||
|
*/
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
setAutoFail(true);
|
||||||
|
setMaxTestTime(MAX_TEST_TIME);
|
||||||
|
super.setUp();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This test demonstrates how concurrent attempts to establish a bridge to
|
* This test demonstrates how concurrent attempts to establish a bridge to
|
||||||
* the same remote broker are allowed to occur. Connection uniqueness will
|
* the same remote broker are allowed to occur. Connection uniqueness will
|
||||||
|
@ -57,7 +70,9 @@ public class AMQ4160Test extends JmsMultipleBrokersTestSupport {
|
||||||
* {@link DiscoveryNetworkConnector#activeBridges()} that represents the
|
* {@link DiscoveryNetworkConnector#activeBridges()} that represents the
|
||||||
* successful first bridge creation attempt.
|
* successful first bridge creation attempt.
|
||||||
*/
|
*/
|
||||||
public void x_testLostActiveBridge() throws Exception {
|
public void testLostActiveBridge() throws Exception {
|
||||||
|
final long ATTEMPT_TO_CREATE_DELAY = TimeUnit.SECONDS.toMillis(15);
|
||||||
|
|
||||||
// Start two brokers with a bridge from broker1 to broker2.
|
// Start two brokers with a bridge from broker1 to broker2.
|
||||||
BrokerService broker1 = createBroker(new URI(
|
BrokerService broker1 = createBroker(new URI(
|
||||||
"broker:(vm://broker1)/broker1?persistent=false"));
|
"broker:(vm://broker1)/broker1?persistent=false"));
|
||||||
|
@ -87,21 +102,41 @@ public class AMQ4160Test extends JmsMultipleBrokersTestSupport {
|
||||||
|
|
||||||
// Start a bridge from broker1 to broker2. The discovery agent attempts
|
// Start a bridge from broker1 to broker2. The discovery agent attempts
|
||||||
// to create the bridge concurrently with two threads, and the
|
// to create the bridge concurrently with two threads, and the
|
||||||
// synchronization in createBridge ensures that both threads actually
|
// synchronization in createBridge ensures that pre-patch both threads
|
||||||
// attempt to start bridges.
|
// actually attempt to start bridges. Post-patch, only one thread is
|
||||||
|
// allowed to start the bridge.
|
||||||
|
final CountDownLatch attemptLatch = new CountDownLatch(2);
|
||||||
final CountDownLatch createLatch = new CountDownLatch(2);
|
final CountDownLatch createLatch = new CountDownLatch(2);
|
||||||
|
|
||||||
DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector() {
|
DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector() {
|
||||||
|
@Override
|
||||||
|
public void onServiceAdd(DiscoveryEvent event) {
|
||||||
|
// Pre-and-post patch, two threads attempt to establish a bridge
|
||||||
|
// to the same remote broker.
|
||||||
|
attemptLatch.countDown();
|
||||||
|
super.onServiceAdd(event);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected NetworkBridge createBridge(Transport localTransport,
|
protected NetworkBridge createBridge(Transport localTransport,
|
||||||
Transport remoteTransport, final DiscoveryEvent event) {
|
Transport remoteTransport, final DiscoveryEvent event) {
|
||||||
createLatch.countDown();
|
// Pre-patch, the two threads are allowed to create the bridge.
|
||||||
|
// Post-patch, only the first thread is allowed. Wait a
|
||||||
|
// reasonable delay once both attempts are detected to allow
|
||||||
|
// the two bridge creations to occur concurrently (pre-patch).
|
||||||
|
// Post-patch, the wait will timeout and allow the first (and
|
||||||
|
// only) bridge creation to occur.
|
||||||
try {
|
try {
|
||||||
createLatch.await();
|
attemptLatch.await();
|
||||||
} catch (InterruptedException e) {
|
createLatch.countDown();
|
||||||
}
|
createLatch.await(ATTEMPT_TO_CREATE_DELAY,
|
||||||
|
TimeUnit.MILLISECONDS);
|
||||||
return super.createBridge(localTransport, remoteTransport,
|
return super.createBridge(localTransport, remoteTransport,
|
||||||
event);
|
event);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.interrupted();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -151,12 +186,16 @@ public class AMQ4160Test extends JmsMultipleBrokersTestSupport {
|
||||||
broker1.addNetworkConnector(nc);
|
broker1.addNetworkConnector(nc);
|
||||||
nc.start();
|
nc.start();
|
||||||
|
|
||||||
// The bridge should be formed by the second creation attempt, but the
|
// Wait for the bridge to be formed by the first attempt.
|
||||||
// wait will time out because the active bridge entry from the second
|
waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(),
|
||||||
// (successful) bridge creation attempt is removed by the first
|
MAX_TEST_TIME, TimeUnit.MILLISECONDS);
|
||||||
// (unsuccessful) bridge creation attempt.
|
|
||||||
waitForBridgeFormation();
|
|
||||||
|
|
||||||
|
// Pre-patch, the second bridge creation attempt fails and removes the
|
||||||
|
// first (successful) bridge creation attempt from the
|
||||||
|
// list of active bridges. Post-patch, the second bridge creation
|
||||||
|
// attempt is prevented, so the first bridge creation attempt
|
||||||
|
// remains "active". This assertion is expected to fail pre-patch and
|
||||||
|
// pass post-patch.
|
||||||
Assert.assertFalse(nc.activeBridges().isEmpty());
|
Assert.assertFalse(nc.activeBridges().isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,7 +203,7 @@ public class AMQ4160Test extends JmsMultipleBrokersTestSupport {
|
||||||
* This test demonstrates a race condition where a failed bridge can be
|
* This test demonstrates a race condition where a failed bridge can be
|
||||||
* removed from the list of active bridges in
|
* removed from the list of active bridges in
|
||||||
* {@link DiscoveryNetworkConnector} before it has been added. Eventually,
|
* {@link DiscoveryNetworkConnector} before it has been added. Eventually,
|
||||||
* the failed bridge is added, but never removed, which prevents subsequent
|
* the failed bridge is added, but never removed, which causes subsequent
|
||||||
* bridge creation attempts to be ignored. The result is a network connector
|
* bridge creation attempts to be ignored. The result is a network connector
|
||||||
* that thinks it has an active bridge, when in fact it doesn't.
|
* that thinks it has an active bridge, when in fact it doesn't.
|
||||||
*/
|
*/
|
||||||
|
@ -306,4 +345,40 @@ public class AMQ4160Test extends JmsMultipleBrokersTestSupport {
|
||||||
// Therefore, this wait will time out and cause the test to fail.
|
// Therefore, this wait will time out and cause the test to fail.
|
||||||
Assert.assertTrue(attemptLatch.await(30, TimeUnit.SECONDS));
|
Assert.assertTrue(attemptLatch.await(30, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test verifies that when a network connector is restarted, any
|
||||||
|
* bridges that were active at the time of the stop are allowed to be
|
||||||
|
* re-established (i.e., the "active events" data structure in
|
||||||
|
* {@link DiscoveryNetworkConnector} is reset.
|
||||||
|
*/
|
||||||
|
public void testAllowAttemptsAfterRestart() throws Exception {
|
||||||
|
final long STOP_DELAY = TimeUnit.SECONDS.toMillis(10);
|
||||||
|
|
||||||
|
// Start two brokers with a bridge from broker1 to broker2.
|
||||||
|
BrokerService broker1 = createBroker(new URI(
|
||||||
|
"broker:(vm://broker1)/broker1?persistent=false"));
|
||||||
|
final BrokerService broker2 = createBroker(new URI(
|
||||||
|
"broker:(vm://broker2)/broker2?persistent=false"));
|
||||||
|
|
||||||
|
startAllBrokers();
|
||||||
|
|
||||||
|
// Start a bridge from broker1 to broker2.
|
||||||
|
NetworkConnector nc = bridgeBrokers(broker1.getBrokerName(),
|
||||||
|
broker2.getBrokerName());
|
||||||
|
nc.start();
|
||||||
|
|
||||||
|
waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(),
|
||||||
|
MAX_TEST_TIME, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
// Restart the network connector and verify that the bridge is
|
||||||
|
// re-established. The pause between start/stop is to account for the
|
||||||
|
// asynchronous closure.
|
||||||
|
nc.stop();
|
||||||
|
Thread.sleep(STOP_DELAY);
|
||||||
|
nc.start();
|
||||||
|
|
||||||
|
waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(),
|
||||||
|
MAX_TEST_TIME, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue