diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 9a13f4155b..7c6e74db80 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -82,6 +82,7 @@ public class FailoverTransport implements CompositeTransport { private long initialReconnectDelay = 10; private long maxReconnectDelay = 1000 * 30; private long backOffMultiplier = 2; + private long timeout = -1; private boolean useExponentialBackOff = true; private boolean randomize = true; private boolean initialized; @@ -318,7 +319,15 @@ public class FailoverTransport implements CompositeTransport { this.maxReconnectAttempts = maxReconnectAttempts; } - /** + public long getTimeout() { + return timeout; + } + + public void setTimeout(long timeout) { + this.timeout = timeout; + } + + /** * @return Returns the randomize. */ public boolean isRandomize() { @@ -380,7 +389,7 @@ public class FailoverTransport implements CompositeTransport { try { synchronized (reconnectMutex) { - + if (isShutdownCommand(command) && connectedTransport.get() == null) { if(command.isShutdownInfo()) { // Skipping send of ShutdownInfo command when not connected. @@ -393,19 +402,27 @@ public class FailoverTransport implements CompositeTransport { myTransportListener.onCommand(response); return; } - } + } // Keep trying until the message is sent. for (int i = 0; !disposed; i++) { try { // Wait for transport to be connected. Transport transport = connectedTransport.get(); + long start = System.currentTimeMillis(); + boolean timedout = false; while (transport == null && !disposed && connectionFailure == null && !Thread.currentThread().isInterrupted()) { LOG.trace("Waiting for transport to reconnect."); + long end = System.currentTimeMillis(); + if (timeout > 0 && (end - start > timeout)) { + timedout = true; + LOG.info("Failover timed out after " + (end - start) + "ms"); + break; + } try { - reconnectMutex.wait(1000); + reconnectMutex.wait(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.debug("Interupted: " + e, e); @@ -420,7 +437,9 @@ public class FailoverTransport implements CompositeTransport { error = new IOException("Transport disposed."); } else if (connectionFailure != null) { error = connectionFailure; - } else { + } else if (timedout == true) { + error = new IOException("Failover timeout of " + timeout + " ms reached."); + }else { error = new IOException("Unexpected failure."); } break; @@ -632,7 +651,6 @@ public class FailoverTransport implements CompositeTransport { } final boolean doReconnect() { - Exception failure = null; synchronized (reconnectMutex) { @@ -724,7 +742,7 @@ public class FailoverTransport implements CompositeTransport { } } } - + if (maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts) { LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)"); connectionFailure = failure; diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java new file mode 100644 index 0000000000..7828e05c5d --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java @@ -0,0 +1,58 @@ +package org.apache.activemq.transport.failover; + +import java.net.URI; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; + +public class FailoverTimeoutTest extends TestCase { + + private static final String QUEUE_NAME = "test.failovertimeout"; + + public void testTimeout() throws Exception { + + long timeout = 1000; + URI tcpUri = new URI("tcp://localhost:61616"); + BrokerService bs = new BrokerService(); + bs.setUseJmx(false); + bs.addConnector(tcpUri); + bs.start(); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?timeout=" + timeout); + Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session + .createQueue(QUEUE_NAME)); + TextMessage message = session.createTextMessage("Test message"); + producer.send(message); + + bs.stop(); + + try { + producer.send(message); + } catch (JMSException jmse) { + jmse.printStackTrace(); + assertEquals("Failover timeout of " + timeout + " ms reached.", jmse.getMessage()); + } + + bs = new BrokerService(); + + bs.setUseJmx(false); + bs.addConnector(tcpUri); + bs.start(); + + producer.send(message); + + bs.stop(); + } + +}