When a broker was stopping it was sending out a cluster update after tearing down its bridges so any client connected to it would lose its awareness of other brokers in the cluster. 

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1478835 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-05-03 15:21:25 +00:00
parent 7450a32ae7
commit 3a8eb74a9f
4 changed files with 141 additions and 91 deletions

View File

@ -2829,4 +2829,7 @@ public class BrokerService implements Service {
return this.slave; return this.slave;
} }
public boolean isStopping() {
return this.stopping.get();
}
} }

View File

@ -16,6 +16,23 @@
*/ */
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection; import org.apache.activemq.broker.Connection;
@ -40,22 +57,6 @@ import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
* Routes Broker operations to the correct messaging regions for processing. * Routes Broker operations to the correct messaging regions for processing.
* *
@ -94,6 +95,7 @@ public class RegionBroker extends EmptyBroker {
private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock();
private final Runnable purgeInactiveDestinationsTask = new Runnable() { private final Runnable purgeInactiveDestinationsTask = new Runnable() {
@Override
public void run() { public void run() {
purgeInactiveDestinations(); purgeInactiveDestinations();
} }
@ -526,7 +528,11 @@ public class RegionBroker extends EmptyBroker {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size()); LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
} }
removeBrokerInClusterUpdate(info); // When stopping don't send cluster updates since we are the one's tearing down
// our own bridges.
if (!brokerService.isStopping()) {
removeBrokerInClusterUpdate(info);
}
} }
} }
@ -730,6 +736,7 @@ public class RegionBroker extends EmptyBroker {
return this.scheduler; return this.scheduler;
} }
@Override
public ThreadPoolExecutor getExecutor() { public ThreadPoolExecutor getExecutor() {
return this.executor; return this.executor;
} }

View File

@ -286,6 +286,10 @@ public class FailoverTransport implements CompositeTransport {
public final void handleConnectionControl(ConnectionControl control) { public final void handleConnectionControl(ConnectionControl control) {
String reconnectStr = control.getReconnectTo(); String reconnectStr = control.getReconnectTo();
if (LOG.isTraceEnabled()) {
LOG.trace("Received ConnectionControl: {}", control);
}
if (reconnectStr != null) { if (reconnectStr != null) {
reconnectStr = reconnectStr.trim(); reconnectStr = reconnectStr.trim();
if (reconnectStr.length() > 0) { if (reconnectStr.length() > 0) {

View File

@ -16,6 +16,9 @@
*/ */
package org.apache.activemq.transport.failover; package org.apache.activemq.transport.failover;
import org.apache.activemq.broker.TransportConnector;
import org.mortbay.log.Log;
/** /**
* Complex cluster test that will exercise the dynamic failover capabilities of * Complex cluster test that will exercise the dynamic failover capabilities of
@ -26,21 +29,19 @@ package org.apache.activemq.transport.failover;
*/ */
public class FailoverComplexClusterTest extends FailoverClusterTestSupport { public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://localhost:61616"; private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616";
private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://localhost:61617"; private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617";
private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://localhost:61618"; private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61618";
private static final String BROKER_A_NOB_TC_ADDRESS = "tcp://localhost:61626"; private static final String BROKER_A_NOB_TC_ADDRESS = "tcp://127.0.0.1:61626";
private static final String BROKER_B_NOB_TC_ADDRESS = "tcp://localhost:61627"; private static final String BROKER_B_NOB_TC_ADDRESS = "tcp://127.0.0.1:61627";
private static final String BROKER_C_NOB_TC_ADDRESS = "tcp://localhost:61628"; private static final String BROKER_C_NOB_TC_ADDRESS = "tcp://127.0.0.1:61628";
private static final String BROKER_A_NAME = "BROKERA"; private static final String BROKER_A_NAME = "BROKERA";
private static final String BROKER_B_NAME = "BROKERB"; private static final String BROKER_B_NAME = "BROKERB";
private static final String BROKER_C_NAME = "BROKERC"; private static final String BROKER_C_NAME = "BROKERC";
/** /**
* Basic dynamic failover 3 broker test * Basic dynamic failover 3 broker test
* *
* @throws Exception * @throws Exception
*/ */
public void testThreeBrokerClusterSingleConnectorBasic() throws Exception { public void testThreeBrokerClusterSingleConnectorBasic() throws Exception {
@ -56,14 +57,14 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
runTests(false, null, null, null); runTests(false, null, null, null);
} }
/** /**
* Tests a 3 broker configuration to ensure that the backup is random and * Tests a 3 broker configuration to ensure that the backup is random and
* supported in a cluster. useExponentialBackOff is set to false and * supported in a cluster. useExponentialBackOff is set to false and
* maxReconnectAttempts is set to 1 to move through the list quickly for * maxReconnectAttempts is set to 1 to move through the list quickly for
* this test. * this test.
* *
* @throws Exception * @throws Exception
*/ */
public void testThreeBrokerClusterSingleConnectorBackupFailoverConfig() throws Exception { public void testThreeBrokerClusterSingleConnectorBackupFailoverConfig() throws Exception {
initSingleTcBroker("", null, null); initSingleTcBroker("", null, null);
@ -77,14 +78,14 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
runTests(false, null, null, null); runTests(false, null, null, null);
} }
/** /**
* Tests a 3 broker cluster that passes in connection params on the * Tests a 3 broker cluster that passes in connection params on the
* transport connector. Prior versions of AMQ passed the TC connection * transport connector. Prior versions of AMQ passed the TC connection
* params to the client and this should not happen. The chosen param is not * params to the client and this should not happen. The chosen param is not
* compatible with the client and will throw an error if used. * compatible with the client and will throw an error if used.
* *
* @throws Exception * @throws Exception
*/ */
public void testThreeBrokerClusterSingleConnectorWithParams() throws Exception { public void testThreeBrokerClusterSingleConnectorWithParams() throws Exception {
initSingleTcBroker("?transport.closeAsync=false", null, null); initSingleTcBroker("?transport.closeAsync=false", null, null);
@ -97,10 +98,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
runTests(false, null, null, null); runTests(false, null, null, null);
} }
/** /**
* Tests a 3 broker cluster using a cluster filter of * * Tests a 3 broker cluster using a cluster filter of *
* *
* @throws Exception * @throws Exception
*/ */
public void testThreeBrokerClusterWithClusterFilter() throws Exception { public void testThreeBrokerClusterWithClusterFilter() throws Exception {
@ -114,12 +114,12 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
runTests(false, null, "*", null); runTests(false, null, "*", null);
} }
/** /**
* Test to verify that a broker with multiple transport connections only the * Test to verify that a broker with multiple transport connections only the
* one marked to update clients is propagate * one marked to update clients is propagate
* *
* @throws Exception * @throws Exception
*/ */
public void testThreeBrokerClusterMultipleConnectorBasic() throws Exception { public void testThreeBrokerClusterMultipleConnectorBasic() throws Exception {
initMultiTcCluster("", null); initMultiTcCluster("", null);
@ -133,11 +133,11 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
runTests(true, null, null, null); runTests(true, null, null, null);
} }
/** /**
* Test to verify the reintroduction of the A Broker * Test to verify the reintroduction of the A Broker
* *
* @throws Exception * @throws Exception
*/ */
public void testOriginalBrokerRestart() throws Exception { public void testOriginalBrokerRestart() throws Exception {
initSingleTcBroker("", null, null); initSingleTcBroker("", null, null);
@ -164,12 +164,12 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
assertClientsConnectedToThreeBrokers(); assertClientsConnectedToThreeBrokers();
} }
/** /**
* Test to ensure clients are evenly to all available brokers in the * Test to ensure clients are evenly to all available brokers in the
* network. * network.
* *
* @throws Exception * @throws Exception
*/ */
public void testThreeBrokerClusterClientDistributions() throws Exception { public void testThreeBrokerClusterClientDistributions() throws Exception {
initSingleTcBroker("", null, null); initSingleTcBroker("", null, null);
@ -182,12 +182,12 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
runClientDistributionTests(false, null, null, null); runClientDistributionTests(false, null, null, null);
} }
/** /**
* Test to verify that clients are distributed with no less than 20% of the * Test to verify that clients are distributed with no less than 20% of the
* clients on any one broker. * clients on any one broker.
* *
* @throws Exception * @throws Exception
*/ */
public void testThreeBrokerClusterDestinationFilter() throws Exception { public void testThreeBrokerClusterDestinationFilter() throws Exception {
initSingleTcBroker("", null, null); initSingleTcBroker("", null, null);
@ -199,23 +199,61 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
runTests(false, null, null, "Queue.TEST.FOO.>"); runTests(false, null, null, "Queue.TEST.FOO.>");
} }
public void testFailOverWithUpdateClientsOnRemove() throws Exception{
// Broker A
addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
TransportConnector connectorA = getBroker(BROKER_A_NAME).addConnector(BROKER_A_CLIENT_TC_ADDRESS);
connectorA.setName("openwire");
connectorA.setRebalanceClusterClients(true);
connectorA.setUpdateClusterClients(true);
connectorA.setUpdateClusterClientsOnRemove(true); //If set to false the test succeeds.
addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
getBroker(BROKER_A_NAME).start();
/** // Broker B
* Runs a 3 Broker dynamic failover test: <br/> addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
* <ul> TransportConnector connectorB = getBroker(BROKER_B_NAME).addConnector(BROKER_B_CLIENT_TC_ADDRESS);
* <li>asserts clients are distributed across all 3 brokers</li> connectorB.setName("openwire");
* <li>asserts clients are distributed across 2 brokers after removing the 3rd</li> connectorB.setRebalanceClusterClients(true);
* <li>asserts clients are distributed across all 3 brokers after connectorB.setUpdateClusterClients(true);
* reintroducing the 3rd broker</li> connectorB.setUpdateClusterClientsOnRemove(true); //If set to false the test succeeds.
* </ul> addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
* getBroker(BROKER_B_NAME).start();
* @param multi
* @param tcParams getBroker(BROKER_B_NAME).waitUntilStarted();
* @param clusterFilter Thread.sleep(1000);
* @param destinationFilter
* @throws Exception // create client connecting only to A. It should receive broker B address whet it connects to A.
* @throws InterruptedException setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=true");
*/ createClients(1);
Thread.sleep(5000);
// We stop broker A.
Log.info("Stopping broker A whose address is: {}", BROKER_A_CLIENT_TC_ADDRESS);
getBroker(BROKER_A_NAME).stop();
getBroker(BROKER_A_NAME).waitUntilStopped();
Thread.sleep(5000);
// Client should failover to B.
assertAllConnectedTo(BROKER_B_CLIENT_TC_ADDRESS);
}
/**
* Runs a 3 Broker dynamic failover test: <br/>
* <ul>
* <li>asserts clients are distributed across all 3 brokers</li>
* <li>asserts clients are distributed across 2 brokers after removing the 3rd</li>
* <li>asserts clients are distributed across all 3 brokers after
* reintroducing the 3rd broker</li>
* </ul>
*
* @param multi
* @param tcParams
* @param clusterFilter
* @param destinationFilter
* @throws Exception
* @throws InterruptedException
*/
private void runTests(boolean multi, String tcParams, String clusterFilter, String destinationFilter) throws Exception, InterruptedException { private void runTests(boolean multi, String tcParams, String clusterFilter, String destinationFilter) throws Exception, InterruptedException {
assertClientsConnectedToThreeBrokers(); assertClientsConnectedToThreeBrokers();
@ -226,14 +264,13 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
Thread.sleep(5000); Thread.sleep(5000);
assertClientsConnectedToTwoBrokers(); assertClientsConnectedToTwoBrokers();
createBrokerC(multi, tcParams, clusterFilter, destinationFilter); createBrokerC(multi, tcParams, clusterFilter, destinationFilter);
getBroker(BROKER_C_NAME).waitUntilStarted(); getBroker(BROKER_C_NAME).waitUntilStarted();
Thread.sleep(5000); Thread.sleep(5000);
assertClientsConnectedToThreeBrokers(); assertClientsConnectedToThreeBrokers();
} }
/** /**
* @param multi * @param multi
@ -288,9 +325,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
createBrokerC(true, params, clusterFilter, null); createBrokerC(true, params, clusterFilter, null);
getBroker(BROKER_C_NAME).waitUntilStarted(); getBroker(BROKER_C_NAME).waitUntilStarted();
} }
private void createBrokerA(boolean multi, String params, String clusterFilter, String destinationFilter) throws Exception { private void createBrokerA(boolean multi, String params, String clusterFilter, String destinationFilter) throws Exception {
final String tcParams = (params == null)?"":params; final String tcParams = (params == null)?"":params;
if (getBroker(BROKER_A_NAME) == null) { if (getBroker(BROKER_A_NAME) == null) {
addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME)); addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS + tcParams, true); addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS + tcParams, true);
@ -307,7 +344,7 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
} }
private void createBrokerB(boolean multi, String params, String clusterFilter, String destinationFilter) throws Exception { private void createBrokerB(boolean multi, String params, String clusterFilter, String destinationFilter) throws Exception {
final String tcParams = (params == null)?"":params; final String tcParams = (params == null)?"":params;
if (getBroker(BROKER_B_NAME) == null) { if (getBroker(BROKER_B_NAME) == null) {
addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME)); addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS + tcParams, true); addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS + tcParams, true);
@ -324,7 +361,7 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
} }
private void createBrokerC(boolean multi, String params, String clusterFilter, String destinationFilter) throws Exception { private void createBrokerC(boolean multi, String params, String clusterFilter, String destinationFilter) throws Exception {
final String tcParams = (params == null)?"":params; final String tcParams = (params == null)?"":params;
if (getBroker(BROKER_C_NAME) == null) { if (getBroker(BROKER_C_NAME) == null) {
addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME)); addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME));
addTransportConnector(getBroker(BROKER_C_NAME), "openwire", BROKER_C_CLIENT_TC_ADDRESS + tcParams, true); addTransportConnector(getBroker(BROKER_C_NAME), "openwire", BROKER_C_CLIENT_TC_ADDRESS + tcParams, true);
@ -339,5 +376,4 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
getBroker(BROKER_C_NAME).start(); getBroker(BROKER_C_NAME).start();
} }
} }
} }