https://issues.apache.org/jira/browse/AMQ-3706 - balance randomize with cluster update

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1242748 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2012-02-10 11:48:56 +00:00
parent 2cb8ed221b
commit beda82ad5a
4 changed files with 236 additions and 73 deletions

View File

@ -39,7 +39,10 @@ import javax.management.ObjectName;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.Random;
import java.util.StringTokenizer; import java.util.StringTokenizer;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -74,6 +77,8 @@ public class TransportConnector implements Connector, BrokerServiceAware {
private String updateClusterFilter; private String updateClusterFilter;
private boolean auditNetworkProducers = false; private boolean auditNetworkProducers = false;
Random rnd = new Random(System.currentTimeMillis());
public TransportConnector() { public TransportConnector() {
} }
@ -401,29 +406,24 @@ public class TransportConnector implements Connector, BrokerServiceAware {
protected ConnectionControl getConnectionControl() { protected ConnectionControl getConnectionControl() {
boolean rebalance = isRebalanceClusterClients(); boolean rebalance = isRebalanceClusterClients();
String connectedBrokers = ""; String connectedBrokers = "";
String self = "";
String separator = ""; String separator = "";
if (isUpdateClusterClients()) { if (isUpdateClusterClients()) {
if (brokerService.getDefaultSocketURIString() != null) { ArrayList<String> uris = new ArrayList<String>();
self += brokerService.getDefaultSocketURIString(); uris.add(brokerService.getDefaultSocketURIString());
} for (BrokerInfo info: broker.getPeerBrokerInfos()) {
if (rebalance == false) { if (isMatchesClusterFilter(info.getBrokerName())) {
connectedBrokers += self; uris.add(info.getBrokerURL());
separator = ",";
}
if (this.broker.getPeerBrokerInfos() != null) {
for (BrokerInfo info : this.broker.getPeerBrokerInfos()) {
if (isMatchesClusterFilter(info.getBrokerName())) {
connectedBrokers += separator;
connectedBrokers += info.getBrokerURL();
separator = ",";
}
} }
} }
if (rebalance) { if (rebalance) {
connectedBrokers += separator + self; Collections.shuffle(uris, rnd);
} }
for (String uri: uris) {
connectedBrokers += separator + uri;
separator = ",";
}
} }
ConnectionControl control = new ConnectionControl(); ConnectionControl control = new ConnectionControl();
control.setConnectedBrokers(connectedBrokers); control.setConnectedBrokers(connectedBrokers);
@ -437,6 +437,9 @@ public class TransportConnector implements Connector, BrokerServiceAware {
ConnectionControl control = getConnectionControl(); ConnectionControl control = getConnectionControl();
for (Connection c : this.connections) { for (Connection c : this.connections) {
c.updateClient(control); c.updateClient(control);
if (isRebalanceClusterClients()) {
control = getConnectionControl();
}
} }
} }
} }

View File

@ -50,6 +50,7 @@ import java.net.URI;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
@ -704,12 +705,16 @@ public class FailoverTransport implements CompositeTransport {
} }
private List<URI> getConnectList() { private List<URI> getConnectList() {
ArrayList<URI> l = new ArrayList<URI>(uris); if (!updated.isEmpty()) {
for (URI uri : updated) { if (failedConnectTransportURI != null) {
if (!l.contains(uri)) { boolean removed = updated.remove(failedConnectTransportURI);
l.add(uri); if (removed) {
} updated.add(failedConnectTransportURI);
}
}
return updated;
} }
ArrayList<URI> l = new ArrayList<URI>(uris);
boolean removed = false; boolean removed = false;
if (failedConnectTransportURI != null) { if (failedConnectTransportURI != null) {
removed = l.remove(failedConnectTransportURI); removed = l.remove(failedConnectTransportURI);
@ -1167,7 +1172,7 @@ public class FailoverTransport implements CompositeTransport {
public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException { public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
if (isUpdateURIsSupported()) { if (isUpdateURIsSupported()) {
List<URI> copy = new ArrayList<URI>(this.updated); HashSet<URI> copy = new HashSet<URI>(this.updated);
updated.clear(); updated.clear();
if (updatedURIs != null && updatedURIs.length > 0) { if (updatedURIs != null && updatedURIs.length > 0) {
for (URI uri : updatedURIs) { for (URI uri : updatedURIs) {
@ -1175,7 +1180,7 @@ public class FailoverTransport implements CompositeTransport {
updated.add(uri); updated.add(uri);
} }
} }
if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(updated)) { if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(new HashSet(updated))) {
buildBackups(); buildBackups();
synchronized (reconnectMutex) { synchronized (reconnectMutex) {
reconnect(rebalance); reconnect(rebalance);

View File

@ -50,7 +50,9 @@ public class FailoverClusterTestSupport extends TestCase {
protected void assertClientsConnectedToTwoBrokers() { protected void assertClientsConnectedToTwoBrokers() {
Set<String> set = new HashSet<String>(); Set<String> set = new HashSet<String>();
for (ActiveMQConnection c : connections) { for (ActiveMQConnection c : connections) {
set.add(c.getTransportChannel().getRemoteAddress()); if (c.getTransportChannel().getRemoteAddress() != null) {
set.add(c.getTransportChannel().getRemoteAddress());
}
} }
assertTrue("Only 2 connections should be found: " + set, assertTrue("Only 2 connections should be found: " + set,
set.size() == 2); set.size() == 2);
@ -59,17 +61,46 @@ public class FailoverClusterTestSupport extends TestCase {
protected void assertClientsConnectedToThreeBrokers() { protected void assertClientsConnectedToThreeBrokers() {
Set<String> set = new HashSet<String>(); Set<String> set = new HashSet<String>();
for (ActiveMQConnection c : connections) { for (ActiveMQConnection c : connections) {
set.add(c.getTransportChannel().getRemoteAddress()); if (c.getTransportChannel().getRemoteAddress() != null) {
set.add(c.getTransportChannel().getRemoteAddress());
}
} }
assertTrue("Only 3 connections should be found: " + set, assertTrue("Only 3 connections should be found: " + set,
set.size() == 3); set.size() == 3);
} }
protected void assertClientsConnectionsEvenlyDistributed(double minimumPercentage) {
Map<String, Double> clientConnectionCounts = new HashMap<String, Double>();
int total = 0;
for (ActiveMQConnection c : connections) {
String key = c.getTransportChannel().getRemoteAddress();
if (key != null) {
total++;
if (clientConnectionCounts.containsKey(key)) {
double count = clientConnectionCounts.get(key);
count += 1.0;
clientConnectionCounts.put(key, count);
} else {
clientConnectionCounts.put(key, 1.0);
}
}
}
Set<String> keys = clientConnectionCounts.keySet();
for(String key: keys){
double count = (double)clientConnectionCounts.get(key);
double percentage = count / (double)total;
logger.info(count + " of " + total + " connections for " + key + " = " + percentage);
assertTrue("Connections distribution expected to be >= than " + minimumPercentage
+ ". Actuall distribution was " + percentage + " for connection " + key,
percentage >= minimumPercentage);
}
}
protected void assertAllConnectedTo(String url) throws Exception { protected void assertAllConnectedTo(String url) throws Exception {
for (ActiveMQConnection c : connections) { for (ActiveMQConnection c : connections) {
assertEquals(c.getTransportChannel().getRemoteAddress(), url); assertEquals(c.getTransportChannel().getRemoteAddress(), url);
} }
} }
protected void addBroker(String name, BrokerService brokerService) { protected void addBroker(String name, BrokerService brokerService) {
brokers.put(name, brokerService); brokers.put(name, brokerService);
@ -92,6 +123,7 @@ public class FailoverClusterTestSupport extends TestCase {
protected void destroyBrokerCluster() throws JMSException, Exception { protected void destroyBrokerCluster() throws JMSException, Exception {
for (BrokerService b : brokers.values()) { for (BrokerService b : brokers.values()) {
b.stop(); b.stop();
b.waitUntilStopped();
} }
brokers.clear(); brokers.clear();
} }
@ -142,10 +174,11 @@ public class FailoverClusterTestSupport extends TestCase {
createClients(NUMBER_OF_CLIENTS); createClients(NUMBER_OF_CLIENTS);
} }
protected void createClients(int num) throws Exception { @SuppressWarnings("unused")
protected void createClients(int numOfClients) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
clientUrl); clientUrl);
for (int i = 0; i < num; i++) { for (int i = 0; i < numOfClients; i++) {
ActiveMQConnection c = (ActiveMQConnection) factory ActiveMQConnection c = (ActiveMQConnection) factory
.createConnection(); .createConnection();
c.start(); c.start();
@ -163,4 +196,4 @@ public class FailoverClusterTestSupport extends TestCase {
public void setClientUrl(String clientUrl) { public void setClientUrl(String clientUrl) {
this.clientUrl = clientUrl; this.clientUrl = clientUrl;
} }
} }

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.transport.failover; package org.apache.activemq.transport.failover;
/** /**
* Complex cluster test that will exercise the dynamic failover capabilities of * Complex cluster test that will exercise the dynamic failover capabilities of
* a network of brokers. Using a networking of 3 brokers where the 3rd broker is * a network of brokers. Using a networking of 3 brokers where the 3rd broker is
@ -37,9 +38,14 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
/**
* Basic dynamic failover 3 broker test
*
* @throws Exception
*/
public void testThreeBrokerClusterSingleConnectorBasic() throws Exception { public void testThreeBrokerClusterSingleConnectorBasic() throws Exception {
initSingleTcBroker("", null); initSingleTcBroker("", null, null);
Thread.sleep(2000); Thread.sleep(2000);
@ -47,36 +53,73 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
createClients(); createClients();
Thread.sleep(2000); Thread.sleep(2000);
runTests(false); runTests(false, null, null, null);
} }
/**
* Tests a 3 broker configuration to ensure that the backup is random and
* supported in a cluster. useExponentialBackOff is set to false and
* maxReconnectAttempts is set to 1 to move through the list quickly for
* this test.
*
* @throws Exception
*/
public void testThreeBrokerClusterSingleConnectorBackupFailoverConfig() throws Exception {
public void testThreeBrokerClusterSingleConnectorBackup() throws Exception { initSingleTcBroker("", null, null);
initSingleTcBroker("", null);
Thread.sleep(2000); Thread.sleep(2000);
setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?backup=true&backupPoolSize=2"); setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?backup=true&backupPoolSize=2&useExponentialBackOff=false&initialReconnectDelay=500");
createClients(); createClients();
Thread.sleep(2000); Thread.sleep(2000);
runTests(false); runTests(false, null, null, null);
} }
/**
* Tests a 3 broker cluster that passes in connection params on the
* transport connector. Prior versions of AMQ passed the TC connection
* params to the client and this should not happen. The chosen param is not
* compatible with the client and will throw an error if used.
*
* @throws Exception
*/
public void testThreeBrokerClusterSingleConnectorWithParams() throws Exception { public void testThreeBrokerClusterSingleConnectorWithParams() throws Exception {
initSingleTcBroker("?transport.closeAsync=false", null); initSingleTcBroker("?transport.closeAsync=false", null, null);
Thread.sleep(2000); Thread.sleep(2000);
setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")"); setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
createClients(); createClients();
Thread.sleep(2000); Thread.sleep(2000);
runTests(false); runTests(false, null, null, null);
} }
/**
* Tests a 3 broker cluster using a cluster filter of *
*
* @throws Exception
*/
public void testThreeBrokerClusterWithClusterFilter() throws Exception {
initSingleTcBroker("?transport.closeAsync=false", null, null);
Thread.sleep(2000);
setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
createClients();
runTests(false, null, "*", null);
}
/**
* Test to verify that a broker with multiple transport connections only the
* one marked to update clients is propagate
*
* @throws Exception
*/
public void testThreeBrokerClusterMultipleConnectorBasic() throws Exception { public void testThreeBrokerClusterMultipleConnectorBasic() throws Exception {
initMultiTcCluster("", null); initMultiTcCluster("", null);
@ -87,11 +130,16 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
createClients(); createClients();
Thread.sleep(2000); Thread.sleep(2000);
runTests(true); runTests(true, null, null, null);
} }
/**
* Test to verify the reintroduction of the A Broker
*
* @throws Exception
*/
public void testOriginalBrokerRestart() throws Exception { public void testOriginalBrokerRestart() throws Exception {
initSingleTcBroker("", null); initSingleTcBroker("", null, null);
Thread.sleep(2000); Thread.sleep(2000);
@ -109,25 +157,66 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
assertClientsConnectedToTwoBrokers(); assertClientsConnectedToTwoBrokers();
createBrokerA(false, "", null); createBrokerA(false, null, null, null);
getBroker(BROKER_A_NAME).waitUntilStarted(); getBroker(BROKER_A_NAME).waitUntilStarted();
Thread.sleep(5000); Thread.sleep(5000);
assertClientsConnectedToThreeBrokers(); assertClientsConnectedToThreeBrokers();
} }
/**
* Test to ensure clients are evenly to all available brokers in the
* network.
*
* @throws Exception
*/
public void testThreeBrokerClusterClientDistributions() throws Exception {
/** initSingleTcBroker("", null, null);
* Runs a 3 tests: <br/>
* <ul> Thread.sleep(2000);
* <li>asserts clients are distributed across all 3 brokers</li> setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false&initialReconnectDelay=500");
* <li>asserts clients are distributed across 2 brokers after removing the 3rd</li> createClients(100);
* <li>asserts clients are distributed across all 3 brokers after reintroducing the 3rd broker</li> Thread.sleep(5000);
* </ul>
* @throws Exception runClientDistributionTests(false, null, null, null);
* @throws InterruptedException }
*/
private void runTests(boolean multi) throws Exception, InterruptedException { /**
* Test to verify that clients are distributed with no less than 20% of the
* clients on any one broker.
*
* @throws Exception
*/
public void testThreeBrokerClusterDestinationFilter() throws Exception {
initSingleTcBroker("", null, null);
Thread.sleep(2000);
setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
createClients();
runTests(false, null, null, "Queue.TEST.FOO.>");
}
/**
* Runs a 3 Broker dynamic failover test: <br/>
* <ul>
* <li>asserts clients are distributed across all 3 brokers</li>
* <li>asserts clients are distributed across 2 brokers after removing the 3rd</li>
* <li>asserts clients are distributed across all 3 brokers after
* reintroducing the 3rd broker</li>
* </ul>
*
* @param multi
* @param tcParams
* @param clusterFilter
* @param destinationFilter
* @throws Exception
* @throws InterruptedException
*/
private void runTests(boolean multi, String tcParams, String clusterFilter, String destinationFilter) throws Exception, InterruptedException {
assertClientsConnectedToThreeBrokers(); assertClientsConnectedToThreeBrokers();
getBroker(BROKER_C_NAME).stop(); getBroker(BROKER_C_NAME).stop();
@ -137,13 +226,43 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
Thread.sleep(5000); Thread.sleep(5000);
assertClientsConnectedToTwoBrokers(); assertClientsConnectedToTwoBrokers();
createBrokerC(multi, "", null); createBrokerC(multi, tcParams, clusterFilter, destinationFilter);
getBroker(BROKER_C_NAME).waitUntilStarted(); getBroker(BROKER_C_NAME).waitUntilStarted();
Thread.sleep(5000); Thread.sleep(5000);
assertClientsConnectedToThreeBrokers(); assertClientsConnectedToThreeBrokers();
} }
/**
* @param multi
* @param tcParams
* @param clusterFilter
* @param destinationFilter
* @throws Exception
* @throws InterruptedException
*/
private void runClientDistributionTests(boolean multi, String tcParams, String clusterFilter, String destinationFilter) throws Exception, InterruptedException {
assertClientsConnectedToThreeBrokers();
assertClientsConnectionsEvenlyDistributed(.25);
getBroker(BROKER_C_NAME).stop();
getBroker(BROKER_C_NAME).waitUntilStopped();
removeBroker(BROKER_C_NAME);
Thread.sleep(5000);
assertClientsConnectedToTwoBrokers();
assertClientsConnectionsEvenlyDistributed(.35);
createBrokerC(multi, tcParams, clusterFilter, destinationFilter);
getBroker(BROKER_C_NAME).waitUntilStarted();
Thread.sleep(5000);
assertClientsConnectedToThreeBrokers();
assertClientsConnectionsEvenlyDistributed(.20);
}
@Override @Override
protected void setUp() throws Exception { protected void setUp() throws Exception {
@ -152,30 +271,31 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
@Override @Override
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
shutdownClients(); shutdownClients();
destroyBrokerCluster();
Thread.sleep(2000); Thread.sleep(2000);
destroyBrokerCluster();
} }
private void initSingleTcBroker(String params, String clusterFilter) throws Exception { private void initSingleTcBroker(String params, String clusterFilter, String destinationFilter) throws Exception {
createBrokerA(false, params, clusterFilter); createBrokerA(false, params, clusterFilter, null);
createBrokerB(false, params, clusterFilter); createBrokerB(false, params, clusterFilter, null);
createBrokerC(false, params, clusterFilter); createBrokerC(false, params, clusterFilter, null);
getBroker(BROKER_C_NAME).waitUntilStarted(); getBroker(BROKER_C_NAME).waitUntilStarted();
} }
private void initMultiTcCluster(String params, String clusterFilter) throws Exception { private void initMultiTcCluster(String params, String clusterFilter) throws Exception {
createBrokerA(true, params, clusterFilter); createBrokerA(true, params, clusterFilter, null);
createBrokerB(true, params, clusterFilter); createBrokerB(true, params, clusterFilter, null);
createBrokerC(true, params, clusterFilter); createBrokerC(true, params, clusterFilter, null);
getBroker(BROKER_C_NAME).waitUntilStarted(); getBroker(BROKER_C_NAME).waitUntilStarted();
} }
private void createBrokerA(boolean multi, String params, String clusterFilter) throws Exception { private void createBrokerA(boolean multi, String params, String clusterFilter, String destinationFilter) throws Exception {
final String tcParams = (params == null)?"":params;
if (getBroker(BROKER_A_NAME) == null) { if (getBroker(BROKER_A_NAME) == null) {
addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME)); addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS + params, true); addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS + tcParams, true);
if (multi) { if (multi) {
addTransportConnector(getBroker(BROKER_A_NAME), "network", BROKER_A_NOB_TC_ADDRESS, false); addTransportConnector(getBroker(BROKER_A_NAME), "network", BROKER_A_NOB_TC_ADDRESS + tcParams, false);
addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter); addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
} else { } else {
@ -186,12 +306,13 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
} }
} }
private void createBrokerB(boolean multi, String params, String clusterFilter) throws Exception { private void createBrokerB(boolean multi, String params, String clusterFilter, String destinationFilter) throws Exception {
final String tcParams = (params == null)?"":params;
if (getBroker(BROKER_B_NAME) == null) { if (getBroker(BROKER_B_NAME) == null) {
addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME)); addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS + params, true); addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS + tcParams, true);
if (multi) { if (multi) {
addTransportConnector(getBroker(BROKER_B_NAME), "network", BROKER_B_NOB_TC_ADDRESS, false); addTransportConnector(getBroker(BROKER_B_NAME), "network", BROKER_B_NOB_TC_ADDRESS + tcParams, false);
addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter); addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
} else { } else {
@ -202,12 +323,13 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
} }
} }
private void createBrokerC(boolean multi, String params, String clusterFilter) throws Exception { private void createBrokerC(boolean multi, String params, String clusterFilter, String destinationFilter) throws Exception {
final String tcParams = (params == null)?"":params;
if (getBroker(BROKER_C_NAME) == null) { if (getBroker(BROKER_C_NAME) == null) {
addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME)); addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME));
addTransportConnector(getBroker(BROKER_C_NAME), "openwire", BROKER_C_CLIENT_TC_ADDRESS + params, true); addTransportConnector(getBroker(BROKER_C_NAME), "openwire", BROKER_C_CLIENT_TC_ADDRESS + tcParams, true);
if (multi) { if (multi) {
addTransportConnector(getBroker(BROKER_C_NAME), "network", BROKER_C_NOB_TC_ADDRESS, false); addTransportConnector(getBroker(BROKER_C_NAME), "network", BROKER_C_NOB_TC_ADDRESS + tcParams, false);
addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter); addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
} else { } else {