git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1478184 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-05-01 20:53:51 +00:00
parent 4742c7d86f
commit e777126d30
2 changed files with 59 additions and 24 deletions

View File

@ -143,7 +143,6 @@ public class FailoverTransport implements CompositeTransport {
if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) { if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) {
result = doReconnect(); result = doReconnect();
buildBackup = false; buildBackup = false;
connectedToPriority = isPriority(connectedTransportURI);
} }
} }
if (buildBackup) { if (buildBackup) {
@ -264,6 +263,7 @@ public class FailoverTransport implements CompositeTransport {
failedConnectTransportURI = connectedTransportURI; failedConnectTransportURI = connectedTransportURI;
connectedTransportURI = null; connectedTransportURI = null;
connected = false; connected = false;
connectedToPriority = false;
// notify before any reconnect attempt so ack state can be whacked // notify before any reconnect attempt so ack state can be whacked
if (transportListener != null) { if (transportListener != null) {
@ -922,7 +922,7 @@ public class FailoverTransport implements CompositeTransport {
failure = new IOException("No uris available to connect to."); failure = new IOException("No uris available to connect to.");
} else { } else {
if (doRebalance) { if (doRebalance) {
if (compareURIs(connectList.get(0), connectedTransportURI)) { if (connectedToPriority || compareURIs(connectList.get(0), connectedTransportURI)) {
// already connected to first in the list, no need to rebalance // already connected to first in the list, no need to rebalance
doRebalance = false; doRebalance = false;
return false; return false;
@ -930,6 +930,7 @@ public class FailoverTransport implements CompositeTransport {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList); LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
} }
try { try {
Transport transport = this.connectedTransport.getAndSet(null); Transport transport = this.connectedTransport.getAndSet(null);
if (transport != null) { if (transport != null) {
@ -1008,12 +1009,13 @@ public class FailoverTransport implements CompositeTransport {
restoreTransport(transport); restoreTransport(transport);
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Connection established"); LOG.debug("Connection established");
} }
reconnectDelay = initialReconnectDelay; reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri; connectedTransportURI = uri;
connectedTransport.set(transport); connectedTransport.set(transport);
connectedToPriority = isPriority(connectedTransportURI);
reconnectMutex.notifyAll(); reconnectMutex.notifyAll();
connectFailures = 0; connectFailures = 0;
@ -1201,6 +1203,10 @@ public class FailoverTransport implements CompositeTransport {
} }
protected boolean isPriority(URI uri) { protected boolean isPriority(URI uri) {
if (!priorityBackup) {
return false;
}
if (!priorityList.isEmpty()) { if (!priorityList.isEmpty()) {
return priorityList.contains(uri); return priorityList.contains(uri);
} }
@ -1326,8 +1332,9 @@ public class FailoverTransport implements CompositeTransport {
private boolean compareURIs(final URI first, final URI second) { private boolean compareURIs(final URI first, final URI second) {
boolean result = false;
if (first == null || second == null) { if (first == null || second == null) {
return false; return result;
} }
if (first.getPort() == second.getPort()) { if (first.getPort() == second.getPort()) {
@ -1336,25 +1343,26 @@ public class FailoverTransport implements CompositeTransport {
try { try {
firstAddr = InetAddress.getByName(first.getHost()); firstAddr = InetAddress.getByName(first.getHost());
secondAddr = InetAddress.getByName(second.getHost()); secondAddr = InetAddress.getByName(second.getHost());
if (firstAddr.equals(secondAddr)) {
result = true;
}
} catch(IOException e) { } catch(IOException e) {
if (firstAddr == null) { if (firstAddr == null) {
LOG.error("Failed to Lookup INetAddress for URI[ " + firstAddr + " ] : " + e); LOG.error("Failed to Lookup INetAddress for URI[ " + first + " ] : " + e);
} else { } else {
LOG.error("Failed to Lookup INetAddress for URI[ " + secondAddr + " ] : " + e); LOG.error("Failed to Lookup INetAddress for URI[ " + second + " ] : " + e);
} }
if (first.getHost().equalsIgnoreCase(second.getHost())) { if (first.getHost().equalsIgnoreCase(second.getHost())) {
return true; result = true;
} }
} }
if (firstAddr.equals(secondAddr)) {
return true;
}
} }
return false; return result;
} }
private InputStreamReader getURLStream(String path) throws IOException { private InputStreamReader getURLStream(String path) throws IOException {

View File

@ -16,13 +16,11 @@
*/ */
package org.apache.activemq.transport.failover; package org.apache.activemq.transport.failover;
import org.apache.activemq.broker.BrokerService; import java.util.HashMap;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
public class FailoverPriorityTest extends FailoverClusterTestSupport { public class FailoverPriorityTest extends FailoverClusterTestSupport {
protected final Logger LOG = LoggerFactory.getLogger(getClass()); protected final Logger LOG = LoggerFactory.getLogger(getClass());
@ -30,7 +28,7 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616"; private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616";
private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617"; private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617";
private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61618"; private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61618";
private HashMap<String,String> urls = new HashMap<String,String>(); private final HashMap<String,String> urls = new HashMap<String,String>();
@Override @Override
public void setUp() throws Exception { public void setUp() throws Exception {
@ -42,8 +40,8 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
private static final String BROKER_A_NAME = "BROKERA"; private static final String BROKER_A_NAME = "BROKERA";
private static final String BROKER_B_NAME = "BROKERB"; private static final String BROKER_B_NAME = "BROKERB";
private static final String BROKER_C_NAME = "BROKERC"; private static final String BROKER_C_NAME = "BROKERC";
public void testPriorityBackup() throws Exception { public void testPriorityBackup() throws Exception {
createBrokerA(); createBrokerA();
createBrokerB(); createBrokerB();
@ -57,7 +55,7 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
restart(false, BROKER_A_NAME, BROKER_B_NAME); restart(false, BROKER_A_NAME, BROKER_B_NAME);
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
restart(true, BROKER_A_NAME, BROKER_B_NAME); restart(true, BROKER_A_NAME, BROKER_B_NAME);
} }
@ -126,7 +124,36 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
restart(true, BROKER_A_NAME, BROKER_B_NAME); restart(true, BROKER_A_NAME, BROKER_B_NAME);
} }
public void testPriorityBackupAndUpdateClients() throws Exception {
// Broker A
addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS, true);
addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
getBroker(BROKER_A_NAME).start();
// Broker B
addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS, true);
addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
getBroker(BROKER_B_NAME).start();
getBroker(BROKER_B_NAME).waitUntilStarted();
Thread.sleep(1000);
setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false");
LOG.info("Client URI will be: " + getClientUrl());
createClients(5);
// Let's wait a little bit longer just in case it takes a while to realize that the
// Broker A is the one with higher priority.
Thread.sleep(5000);
assertAllConnectedTo(urls.get(BROKER_A_NAME));
}
private void restart(boolean primary, String primaryName, String secondaryName) throws Exception { private void restart(boolean primary, String primaryName, String secondaryName) throws Exception {
Thread.sleep(1000); Thread.sleep(1000);
@ -159,9 +186,9 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
Thread.sleep(5000); Thread.sleep(5000);
assertAllConnectedTo(urls.get(primaryName)); assertAllConnectedTo(urls.get(primaryName));
} }
private void createBrokerByName(String name) throws Exception { private void createBrokerByName(String name) throws Exception {
if (name.equals(BROKER_A_NAME)) { if (name.equals(BROKER_A_NAME)) {
createBrokerA(); createBrokerA();