From e777126d3006f15868e890864ec795f1cfeff324 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Wed, 1 May 2013 20:53:51 +0000 Subject: [PATCH] fix for: https://issues.apache.org/jira/browse/AMQ-4501 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1478184 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/failover/FailoverTransport.java | 34 ++++++++----- .../failover/FailoverPriorityTest.java | 49 ++++++++++++++----- 2 files changed, 59 insertions(+), 24 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 3f7400f496..3f711810ff 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -143,7 +143,6 @@ public class FailoverTransport implements CompositeTransport { if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) { result = doReconnect(); buildBackup = false; - connectedToPriority = isPriority(connectedTransportURI); } } if (buildBackup) { @@ -264,6 +263,7 @@ public class FailoverTransport implements CompositeTransport { failedConnectTransportURI = connectedTransportURI; connectedTransportURI = null; connected = false; + connectedToPriority = false; // notify before any reconnect attempt so ack state can be whacked if (transportListener != null) { @@ -922,7 +922,7 @@ public class FailoverTransport implements CompositeTransport { failure = new IOException("No uris available to connect to."); } else { if (doRebalance) { - if (compareURIs(connectList.get(0), connectedTransportURI)) { + if (connectedToPriority || compareURIs(connectList.get(0), connectedTransportURI)) { // already connected to first in the list, no need to rebalance doRebalance = false; return false; @@ -930,6 +930,7 @@ public class FailoverTransport implements CompositeTransport { if (LOG.isDebugEnabled()) { LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList); } + try { Transport transport = this.connectedTransport.getAndSet(null); if (transport != null) { @@ -1008,12 +1009,13 @@ public class FailoverTransport implements CompositeTransport { restoreTransport(transport); } - if (LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("Connection established"); - } + } reconnectDelay = initialReconnectDelay; connectedTransportURI = uri; connectedTransport.set(transport); + connectedToPriority = isPriority(connectedTransportURI); reconnectMutex.notifyAll(); connectFailures = 0; @@ -1201,6 +1203,10 @@ public class FailoverTransport implements CompositeTransport { } protected boolean isPriority(URI uri) { + if (!priorityBackup) { + return false; + } + if (!priorityList.isEmpty()) { return priorityList.contains(uri); } @@ -1326,8 +1332,9 @@ public class FailoverTransport implements CompositeTransport { private boolean compareURIs(final URI first, final URI second) { + boolean result = false; if (first == null || second == null) { - return false; + return result; } if (first.getPort() == second.getPort()) { @@ -1336,25 +1343,26 @@ public class FailoverTransport implements CompositeTransport { try { firstAddr = InetAddress.getByName(first.getHost()); secondAddr = InetAddress.getByName(second.getHost()); + + if (firstAddr.equals(secondAddr)) { + result = true; + } + } catch(IOException e) { if (firstAddr == null) { - LOG.error("Failed to Lookup INetAddress for URI[ " + firstAddr + " ] : " + e); + LOG.error("Failed to Lookup INetAddress for URI[ " + first + " ] : " + e); } else { - LOG.error("Failed to Lookup INetAddress for URI[ " + secondAddr + " ] : " + e); + LOG.error("Failed to Lookup INetAddress for URI[ " + second + " ] : " + e); } if (first.getHost().equalsIgnoreCase(second.getHost())) { - return true; + result = true; } } - - if (firstAddr.equals(secondAddr)) { - return true; - } } - return false; + return result; } private InputStreamReader getURLStream(String path) throws IOException { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java index d80e6e4477..0176a8fad3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java @@ -16,13 +16,11 @@ */ package org.apache.activemq.transport.failover; -import org.apache.activemq.broker.BrokerService; +import java.util.HashMap; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; - public class FailoverPriorityTest extends FailoverClusterTestSupport { protected final Logger LOG = LoggerFactory.getLogger(getClass()); @@ -30,7 +28,7 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport { private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616"; private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617"; private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61618"; - private HashMap urls = new HashMap(); + private final HashMap urls = new HashMap(); @Override public void setUp() throws Exception { @@ -42,8 +40,8 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport { private static final String BROKER_A_NAME = "BROKERA"; private static final String BROKER_B_NAME = "BROKERB"; private static final String BROKER_C_NAME = "BROKERC"; - - + + public void testPriorityBackup() throws Exception { createBrokerA(); createBrokerB(); @@ -57,7 +55,7 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport { restart(false, BROKER_A_NAME, BROKER_B_NAME); - + for (int i = 0; i < 3; i++) { restart(true, BROKER_A_NAME, BROKER_B_NAME); } @@ -126,7 +124,36 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport { restart(true, BROKER_A_NAME, BROKER_B_NAME); } - + + public void testPriorityBackupAndUpdateClients() throws Exception { + // Broker A + addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME)); + addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS, true); + addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); + getBroker(BROKER_A_NAME).start(); + + // Broker B + addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME)); + addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS, true); + addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null); + getBroker(BROKER_B_NAME).start(); + + getBroker(BROKER_B_NAME).waitUntilStarted(); + Thread.sleep(1000); + + setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false"); + + LOG.info("Client URI will be: " + getClientUrl()); + + createClients(5); + + // Let's wait a little bit longer just in case it takes a while to realize that the + // Broker A is the one with higher priority. + Thread.sleep(5000); + + assertAllConnectedTo(urls.get(BROKER_A_NAME)); + } + private void restart(boolean primary, String primaryName, String secondaryName) throws Exception { Thread.sleep(1000); @@ -159,9 +186,9 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport { Thread.sleep(5000); assertAllConnectedTo(urls.get(primaryName)); - + } - + private void createBrokerByName(String name) throws Exception { if (name.equals(BROKER_A_NAME)) { createBrokerA();