From b6b69113b1c0eb78f05a95023355e678457b9d51 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Wed, 28 Sep 2011 22:07:21 +0000 Subject: [PATCH] apply fixes for https://issues.apache.org/jira/browse/AMQ-3513 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1177088 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/failover/FailoverTransport.java | 211 ++++++++++-------- .../failover/FailoverTransportFactory.java | 4 +- .../failover/FailoverConsumerTest.java | 5 +- .../failover/InitalReconnectDelayTest.java | 110 +++++++++ 4 files changed, 229 insertions(+), 101 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 80ace90a46..3e9938c40e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -14,11 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.activemq.transport.failover; import java.io.BufferedReader; -import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; @@ -37,6 +35,7 @@ import java.util.Set; import java.util.StringTokenizer; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicReference; + import org.apache.activemq.broker.SslContext; import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionControl; @@ -60,12 +59,9 @@ import org.apache.activemq.util.ServiceSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * A Transport that is made reliable by being able to fail over to another * transport when a transport failure is detected. - * - * */ public class FailoverTransport implements CompositeTransport { @@ -114,12 +110,12 @@ public class FailoverTransport implements CompositeTransport { //private boolean connectionInterruptProcessingComplete; private final TransportListener myTransportListener = createTransportListener(); - private boolean updateURIsSupported=true; - private boolean reconnectSupported=true; + private boolean updateURIsSupported = true; + private boolean reconnectSupported = true; // remember for reconnect thread private SslContext brokerSslContext; private String updateURIsURL = null; - private boolean rebalanceUpdateURIs=true; + private boolean rebalanceUpdateURIs = true; private boolean doRebalance = false; public FailoverTransport() throws InterruptedIOException { @@ -130,7 +126,6 @@ public class FailoverTransport implements CompositeTransport { public boolean iterate() { boolean result = false; boolean buildBackup = true; - boolean doReconnect = !disposed; synchronized (backupMutex) { if ((connectedTransport.get() == null || doRebalance) && !disposed) { result = doReconnect(); @@ -170,11 +165,11 @@ public class FailoverTransport implements CompositeTransport { ((Tracked) object).onResponses(command); } } - if (!initialized) { + if (!initialized) { initialized = true; } - - if(command.isConnectionControl()) { + + if (command.isConnectionControl()) { handleConnectionControl((ConnectionControl) command); } if (transportListener != null) { @@ -238,8 +233,7 @@ public class FailoverTransport implements CompositeTransport { connectedTransportURI = null; connected = false; - // notify before any reconnect attempt so ack state can be - // whacked + // notify before any reconnect attempt so ack state can be whacked if (transportListener != null) { transportListener.transportInterupted(); } @@ -292,7 +286,6 @@ public class FailoverTransport implements CompositeTransport { LOG.error("Failed to update transport URI's from: " + newTransports, e); } } - } } } @@ -416,8 +409,7 @@ public class FailoverTransport implements CompositeTransport { } /** - * @param randomize - * The randomize to set. + * @param randomize The randomize to set. */ public void setRandomize(boolean randomize) { this.randomize = randomize; @@ -571,7 +563,6 @@ public class FailoverTransport implements CompositeTransport { // the outer catch throw e; } - } return; @@ -613,9 +604,9 @@ public class FailoverTransport implements CompositeTransport { public void add(boolean rebalance, URI u[]) { boolean newURI = false; - for (int i = 0; i < u.length; i++) { - if (!contains(u[i])) { - uris.add(u[i]); + for (URI uri : u) { + if (!contains(uri)) { + uris.add(uri); newURI = true; } } @@ -625,8 +616,8 @@ public class FailoverTransport implements CompositeTransport { } public void remove(boolean rebalance, URI u[]) { - for (int i = 0; i < u.length; i++) { - uris.remove(u[i]); + for (URI uri : u) { + uris.remove(uri); } // rebalance is automatic if any connected to removed/stopped broker } @@ -634,11 +625,11 @@ public class FailoverTransport implements CompositeTransport { public void add(boolean rebalance, String u) { try { URI newURI = new URI(u); - if (contains(newURI)==false) { + if (contains(newURI) == false) { uris.add(newURI); reconnect(rebalance); } - + } catch (Exception e) { LOG.error("Failed to parse URI: " + u); } @@ -680,7 +671,9 @@ public class FailoverTransport implements CompositeTransport { if (removed) { l.add(failedConnectTransportURI); } - LOG.debug("urlList connectionList:" + l + ", from: " + uris); + if (LOG.isDebugEnabled()) { + LOG.debug("urlList connectionList:" + l + ", from: " + uris); + } return l; } @@ -715,12 +708,11 @@ public class FailoverTransport implements CompositeTransport { cc.setFaultTolerant(true); t.oneway(cc); stateTracker.restore(t); - Map tmpMap = null; + Map tmpMap = null; synchronized (requestMap) { tmpMap = new LinkedHashMap(requestMap); } - for (Iterator iter2 = tmpMap.values().iterator(); iter2.hasNext();) { - Command command = iter2.next(); + for (Command command : tmpMap.values()) { if (LOG.isTraceEnabled()) { LOG.trace("restore requestMap, replay: " + command); } @@ -753,43 +745,49 @@ public class FailoverTransport implements CompositeTransport { return true; } + private void doUpdateURIsFromDisk() { + + // If updateURIsURL is specified, read the file and add any new + // transport URI's to this FailOverTransport. + // Note: Could track file timestamp to avoid unnecessary reading. + String fileURL = getUpdateURIsURL(); + if (fileURL != null) { + BufferedReader in = null; + String newUris = null; + StringBuffer buffer = new StringBuffer(); + + try { + in = new BufferedReader(getURLStream(fileURL)); + while (true) { + String line = in.readLine(); + if (line == null) { + break; + } + buffer.append(line); + } + newUris = buffer.toString(); + } catch (IOException ioe) { + LOG.error("Failed to read updateURIsURL: " + fileURL, ioe); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException ioe) { + // ignore + } + } + } + + processNewTransports(isRebalanceUpdateURIs(), newUris); + } + } + final boolean doReconnect() { Exception failure = null; synchronized (reconnectMutex) { - // If updateURIsURL is specified, read the file and add any new - // transport URI's to this FailOverTransport. - // Note: Could track file timestamp to avoid unnecessary reading. - String fileURL = getUpdateURIsURL(); - if (fileURL != null) { - BufferedReader in = null; - String newUris = null; - StringBuffer buffer = new StringBuffer(); - - try { - in = new BufferedReader(getURLStream(fileURL)); - while (true) { - String line = in.readLine(); - if (line == null) { - break; - } - buffer.append(line); - } - newUris = buffer.toString(); - } catch (IOException ioe) { - LOG.error("Failed to read updateURIsURL: " + fileURL, ioe); - } finally { - if (in != null) { - try { - in.close(); - } catch (IOException ioe) { - // ignore - } - } - } - - processNewTransports(isRebalanceUpdateURIs(), newUris); - } + // First ensure we are up to date. + doUpdateURIsFromDisk(); if (disposed || connectionFailure != null) { reconnectMutex.notifyAll(); @@ -808,14 +806,18 @@ public class FailoverTransport implements CompositeTransport { doRebalance = false; return false; } else { - LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList); + if (LOG.isDebugEnabled()) { + LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList); + } try { Transport transport = this.connectedTransport.getAndSet(null); if (transport != null) { disposeTransport(transport); } } catch (Exception e) { - LOG.debug("Caught an exception stopping existing transport for rebalance", e); + if (LOG.isDebugEnabled()) { + LOG.debug("Caught an exception stopping existing transport for rebalance", e); + } } } doRebalance = false; @@ -847,12 +849,27 @@ public class FailoverTransport implements CompositeTransport { } } + // Sleep for the reconnectDelay + if (!firstConnection && (reconnectDelay > 0) && !disposed) { + synchronized (sleepMutex) { + LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); + try { + sleepMutex.wait(reconnectDelay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + Iterator iter = connectList.iterator(); while (iter.hasNext() && connectedTransport.get() == null && !disposed) { + URI uri = iter.next(); Transport t = null; try { - LOG.debug("Attempting connect to: " + uri); + if (LOG.isDebugEnabled()) { + LOG.debug("Attempting connect to: " + uri); + } SslContext.setCurrentSslContext(brokerSslContext); t = TransportFactory.compositeConnect(uri); t.setTransportListener(myTransportListener); @@ -924,8 +941,7 @@ public class FailoverTransport implements CompositeTransport { connectionFailure = failure; // Make sure on initial startup, that the transportListener has - // been initialized - // for this instance. + // been initialized for this instance. synchronized (listenerMutex) { if (transportListener == null) { try { @@ -946,14 +962,17 @@ public class FailoverTransport implements CompositeTransport { return false; } } + if (!disposed) { - LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); - synchronized (sleepMutex) { - try { - sleepMutex.wait(reconnectDelay); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + if (reconnectDelay > 0) { + synchronized (sleepMutex) { + LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); + try { + sleepMutex.wait(reconnectDelay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } } @@ -965,6 +984,7 @@ public class FailoverTransport implements CompositeTransport { } } } + return !disposed; } @@ -981,7 +1001,7 @@ public class FailoverTransport implements CompositeTransport { } backups.removeAll(disposedList); disposedList.clear(); - for (Iterator iter = connectList.iterator(); iter.hasNext() && backups.size() < backupPoolSize;) { + for (Iterator iter = connectList.iterator(); iter.hasNext() && backups.size() < backupPoolSize; ) { URI uri = iter.next(); if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) { try { @@ -1016,23 +1036,23 @@ public class FailoverTransport implements CompositeTransport { } public void reconnect(URI uri) throws IOException { - add(true, new URI[] { uri }); + add(true, new URI[]{uri}); } public boolean isReconnectSupported() { return this.reconnectSupported; } - + public void setReconnectSupported(boolean value) { - this.reconnectSupported=value; + this.reconnectSupported = value; } - + public boolean isUpdateURIsSupported() { return this.updateURIsSupported; } - + public void setUpdateURIsSupported(boolean value) { - this.updateURIsSupported=value; + this.updateURIsSupported = value; } public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException { @@ -1041,8 +1061,7 @@ public class FailoverTransport implements CompositeTransport { List add = new ArrayList(); if (updatedURIs != null && updatedURIs.length > 0) { Set set = new HashSet(); - for (int i = 0; i < updatedURIs.length; i++) { - URI uri = updatedURIs[i]; + for (URI uri : updatedURIs) { if (uri != null) { set.add(uri); } @@ -1063,7 +1082,7 @@ public class FailoverTransport implements CompositeTransport { } } } - + /** * @return the updateURIsURL */ @@ -1077,7 +1096,7 @@ public class FailoverTransport implements CompositeTransport { public void setUpdateURIsURL(String updateURIsURL) { this.updateURIsURL = updateURIsURL; } - + /** * @return the rebalanceUpdateURIs */ @@ -1105,32 +1124,32 @@ public class FailoverTransport implements CompositeTransport { stateTracker.connectionInterruptProcessingComplete(this, connectionId); } } - + public ConnectionStateTracker getStateTracker() { return stateTracker; } - + private boolean contains(URI newURI) { boolean result = false; try { - for (URI uri:uris) { - if (newURI.getPort()==uri.getPort()) { - InetAddress newAddr = InetAddress.getByName(newURI.getHost()); - InetAddress addr = InetAddress.getByName(uri.getHost()); - if (addr.equals(newAddr)) { - result = true; - break; + for (URI uri : uris) { + if (newURI.getPort() == uri.getPort()) { + InetAddress newAddr = InetAddress.getByName(newURI.getHost()); + InetAddress addr = InetAddress.getByName(uri.getHost()); + if (addr.equals(newAddr)) { + result = true; + break; + } } } - } - }catch(IOException e) { + } catch (IOException e) { result = true; LOG.error("Failed to verify URI " + newURI + " already known: " + e); } return result; } - + private InputStreamReader getURLStream(String path) throws IOException { InputStreamReader result = null; URL url = null; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java index d898035204..c00b8ad0a1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java @@ -58,7 +58,7 @@ public class FailoverTransportFactory extends TransportFactory { * @throws IOException */ public Transport createTransport(CompositeData compositData) throws IOException { - Map options = compositData.getParameters(); + Map options = compositData.getParameters(); FailoverTransport transport = createTransport(options); if (!options.isEmpty()) { throw new IllegalArgumentException("Invalid connect parameters: " + options); @@ -67,7 +67,7 @@ public class FailoverTransportFactory extends TransportFactory { return transport; } - public FailoverTransport createTransport(Map parameters) throws IOException { + public FailoverTransport createTransport(Map parameters) throws IOException { FailoverTransport transport = new FailoverTransport(); IntrospectionSupport.setProperties(transport, parameters); return transport; diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerTest.java index 61e976f9d2..a921ba9b30 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerTest.java @@ -36,7 +36,6 @@ public class FailoverConsumerTest extends NetworkTestSupport { public static final int MSG_COUNT = 100; private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerTest.class); - public void testPublisherFailsOver() throws Exception { // Uncomment this if you want to use remote broker created by // NetworkTestSupport. @@ -72,7 +71,7 @@ public class FailoverConsumerTest extends NetworkTestSupport { // though). // So we must use external broker ant restart it manually. LOG.info("You should restart remote broker now and press enter!"); - System.in.read(); + //System.in.read(); // Thread.sleep(20000); restartRemoteBroker(); msg.acknowledge(); @@ -114,6 +113,6 @@ public class FailoverConsumerTest extends NetworkTestSupport { } protected String getRemoteURI() { - return "tcp://localhost:55555"; + return "tcp://localhost:61616"; } } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java new file mode 100644 index 0000000000..e41d1ea27a --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java @@ -0,0 +1,110 @@ +package org.apache.activemq.transport.failover; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import java.util.Date; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertTrue; + +public class InitalReconnectDelayTest { + + private static final transient Logger LOG = LoggerFactory.getLogger(InitalReconnectDelayTest.class); + protected BrokerService broker1; + protected BrokerService broker2; + protected CountDownLatch broker2Started = new CountDownLatch(1); + protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false&initialReconnectDelay=15000"; + + @Test + public void testInitialReconnectDelay() throws Exception { + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uriString); + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue destination = session.createQueue("foo"); + MessageProducer producer = session.createProducer(destination); + + long start = (new Date()).getTime(); + producer.send(session.createTextMessage("TEST")); + long end = (new Date()).getTime(); + + //Verify we can send quickly + assertTrue((end - start) < 2000); + + //Halt the broker1... + LOG.info("Stopping the Broker1..."); + broker1.stop(); + + LOG.info("Attempting to send... failover should kick in..."); + start = (new Date()).getTime(); + producer.send(session.createTextMessage("TEST")); + end = (new Date()).getTime(); + + //Inital reconnection should kick in and be darned close to what we expected + LOG.info("Failover took " + (end - start) + " ms."); + assertTrue("Failover took " + (end - start) + " ms and should be > 14000.", (end - start) > 14000); + + } + + @Before + public void setUp() throws Exception { + + final String dataDir = "target/data/shared"; + + broker1 = new BrokerService(); + + broker1.setBrokerName("broker1"); + broker1.setDeleteAllMessagesOnStartup(true); + broker1.setDataDirectory(dataDir); + broker1.addConnector("tcp://localhost:62001"); + broker1.setUseJmx(false); + broker1.start(); + broker1.waitUntilStarted(); + + broker2 = new BrokerService(); + broker2.setBrokerName("broker2"); + broker2.setDataDirectory(dataDir); + broker2.setUseJmx(false); + broker2.addConnector("tcp://localhost:62002"); + broker2.start(); + broker2.waitUntilStarted(); + + } + + protected String getSlaveXml() { + return "org/apache/activemq/broker/ft/sharedFileSlave.xml"; + } + + protected String getMasterXml() { + return "org/apache/activemq/broker/ft/sharedFileMaster.xml"; + } + + @After + public void tearDown() throws Exception { + + if (broker1.isStarted()) { + broker1.stop(); + broker1.waitUntilStopped(); + } + + if (broker2.isStarted()) { + broker2.stop(); + broker2.waitUntilStopped(); + } + } + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(uriString); + } + +}