diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java b/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java index 929046690c..5aaf9d5ab6 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java @@ -227,6 +227,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco } } bridges.clear(); + activeEvents.clear(); try { this.discoveryAgent.stop(); } catch (Exception e) { diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java index 39c8b4dbec..80a48e8727 100644 --- a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; @@ -53,6 +54,7 @@ import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.network.DiscoveryNetworkConnector; import org.apache.activemq.network.NetworkBridge; import org.apache.activemq.network.NetworkConnector; @@ -202,6 +204,68 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport { })); } + /** + * Timed wait for {@link #hasBridge(String, String)}. + * + * @see #hasBridge(String, String) + * + * @param localBrokerName + * - the name of the broker on the "local" side of the bridge + * @param remoteBrokerName + * - the name of the broker on the "remote" side of the bridge + * @param time + * - the maximum time to wait for the bridge to be established + * @param units + * - the units for time + * @throws InterruptedException + * - if the calling thread is interrupted + * @throws TimeoutException + * - if the bridge is not established within the time limit + * @throws Exception + * - some other unknown error occurs + */ + protected void waitForBridge(final String localBrokerName, + final String remoteBrokerName, long time, TimeUnit units) + throws InterruptedException, TimeoutException, Exception { + if (!Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() { + return hasBridge(localBrokerName, remoteBrokerName); + } + }, units.toMillis(time))) { + throw new TimeoutException("Bridge not established from broker " + + localBrokerName + " to " + remoteBrokerName + " within " + + units.toMillis(time) + " milliseconds."); + } + } + + /** + * Determines whether a bridge has been established between the specified + * brokers.Establishment means that connections have been created and broker + * info has been exchanged. Due to the asynchronous nature of the + * connections, there is still a possibility that the bridge may fail + * shortly after establishment. + * + * @param localBrokerName + * - the name of the broker on the "local" side of the bridge + * @param remoteBrokerName + * - the name of the broker on the "remote" side of the bridge + */ + protected boolean hasBridge(String localBrokerName, String remoteBrokerName) { + final BrokerItem fromBroker = brokers.get(localBrokerName); + if (fromBroker == null) { + throw new IllegalArgumentException("Unknown broker: " + + localBrokerName); + } + + for (BrokerInfo peerInfo : fromBroker.broker.getRegionBroker() + .getPeerBrokerInfos()) { + if (peerInfo.getBrokerName().equals(remoteBrokerName)) { + return true; + } + } + return false; + } + protected void waitForBridgeFormation() throws Exception { waitForBridgeFormation(1); } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java index 56c8ae1271..c1dadd9fdb 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java @@ -35,6 +35,7 @@ import org.apache.activemq.command.DiscoveryEvent; import org.apache.activemq.network.DiscoveryNetworkConnector; import org.apache.activemq.network.NetworkBridge; import org.apache.activemq.network.NetworkBridgeListener; +import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.discovery.DiscoveryAgent; @@ -49,6 +50,18 @@ import org.junit.Assert; * being reported as active. */ public class AMQ4160Test extends JmsMultipleBrokersTestSupport { + final long MAX_TEST_TIME = TimeUnit.MINUTES.toMillis(2); + + /** + * Since these tests involve wait conditions, protect against indefinite + * waits (due to unanticipated issues). + */ + public void setUp() throws Exception { + setAutoFail(true); + setMaxTestTime(MAX_TEST_TIME); + super.setUp(); + } + /** * This test demonstrates how concurrent attempts to establish a bridge to * the same remote broker are allowed to occur. Connection uniqueness will @@ -57,7 +70,9 @@ public class AMQ4160Test extends JmsMultipleBrokersTestSupport { * {@link DiscoveryNetworkConnector#activeBridges()} that represents the * successful first bridge creation attempt. */ - public void x_testLostActiveBridge() throws Exception { + public void testLostActiveBridge() throws Exception { + final long ATTEMPT_TO_CREATE_DELAY = TimeUnit.SECONDS.toMillis(15); + // Start two brokers with a bridge from broker1 to broker2. BrokerService broker1 = createBroker(new URI( "broker:(vm://broker1)/broker1?persistent=false")); @@ -87,21 +102,41 @@ public class AMQ4160Test extends JmsMultipleBrokersTestSupport { // Start a bridge from broker1 to broker2. The discovery agent attempts // to create the bridge concurrently with two threads, and the - // synchronization in createBridge ensures that both threads actually - // attempt to start bridges. + // synchronization in createBridge ensures that pre-patch both threads + // actually attempt to start bridges. Post-patch, only one thread is + // allowed to start the bridge. + final CountDownLatch attemptLatch = new CountDownLatch(2); final CountDownLatch createLatch = new CountDownLatch(2); DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector() { + @Override + public void onServiceAdd(DiscoveryEvent event) { + // Pre-and-post patch, two threads attempt to establish a bridge + // to the same remote broker. + attemptLatch.countDown(); + super.onServiceAdd(event); + } + @Override protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) { - createLatch.countDown(); + // Pre-patch, the two threads are allowed to create the bridge. + // Post-patch, only the first thread is allowed. Wait a + // reasonable delay once both attempts are detected to allow + // the two bridge creations to occur concurrently (pre-patch). + // Post-patch, the wait will timeout and allow the first (and + // only) bridge creation to occur. try { - createLatch.await(); + attemptLatch.await(); + createLatch.countDown(); + createLatch.await(ATTEMPT_TO_CREATE_DELAY, + TimeUnit.MILLISECONDS); + return super.createBridge(localTransport, remoteTransport, + event); } catch (InterruptedException e) { + Thread.interrupted(); + return null; } - return super.createBridge(localTransport, remoteTransport, - event); } }; @@ -151,12 +186,16 @@ public class AMQ4160Test extends JmsMultipleBrokersTestSupport { broker1.addNetworkConnector(nc); nc.start(); - // The bridge should be formed by the second creation attempt, but the - // wait will time out because the active bridge entry from the second - // (successful) bridge creation attempt is removed by the first - // (unsuccessful) bridge creation attempt. - waitForBridgeFormation(); + // Wait for the bridge to be formed by the first attempt. + waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(), + MAX_TEST_TIME, TimeUnit.MILLISECONDS); + // Pre-patch, the second bridge creation attempt fails and removes the + // first (successful) bridge creation attempt from the + // list of active bridges. Post-patch, the second bridge creation + // attempt is prevented, so the first bridge creation attempt + // remains "active". This assertion is expected to fail pre-patch and + // pass post-patch. Assert.assertFalse(nc.activeBridges().isEmpty()); } @@ -164,7 +203,7 @@ public class AMQ4160Test extends JmsMultipleBrokersTestSupport { * This test demonstrates a race condition where a failed bridge can be * removed from the list of active bridges in * {@link DiscoveryNetworkConnector} before it has been added. Eventually, - * the failed bridge is added, but never removed, which prevents subsequent + * the failed bridge is added, but never removed, which causes subsequent * bridge creation attempts to be ignored. The result is a network connector * that thinks it has an active bridge, when in fact it doesn't. */ @@ -306,4 +345,40 @@ public class AMQ4160Test extends JmsMultipleBrokersTestSupport { // Therefore, this wait will time out and cause the test to fail. Assert.assertTrue(attemptLatch.await(30, TimeUnit.SECONDS)); } + + /** + * This test verifies that when a network connector is restarted, any + * bridges that were active at the time of the stop are allowed to be + * re-established (i.e., the "active events" data structure in + * {@link DiscoveryNetworkConnector} is reset. + */ + public void testAllowAttemptsAfterRestart() throws Exception { + final long STOP_DELAY = TimeUnit.SECONDS.toMillis(10); + + // Start two brokers with a bridge from broker1 to broker2. + BrokerService broker1 = createBroker(new URI( + "broker:(vm://broker1)/broker1?persistent=false")); + final BrokerService broker2 = createBroker(new URI( + "broker:(vm://broker2)/broker2?persistent=false")); + + startAllBrokers(); + + // Start a bridge from broker1 to broker2. + NetworkConnector nc = bridgeBrokers(broker1.getBrokerName(), + broker2.getBrokerName()); + nc.start(); + + waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(), + MAX_TEST_TIME, TimeUnit.MILLISECONDS); + + // Restart the network connector and verify that the bridge is + // re-established. The pause between start/stop is to account for the + // asynchronous closure. + nc.stop(); + Thread.sleep(STOP_DELAY); + nc.start(); + + waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(), + MAX_TEST_TIME, TimeUnit.MILLISECONDS); + } }