diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 50130e00e1..9b86a28f97 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -952,7 +952,8 @@ public class FailoverTransport implements CompositeTransport { int reconnectLimit = calculateReconnectAttemptLimit(); - if (reconnectLimit != INFINITE && ++connectFailures >= reconnectLimit) { + connectFailures++; + if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) { LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)"); connectionFailure = failure; @@ -1158,6 +1159,10 @@ public class FailoverTransport implements CompositeTransport { return transport.getReceiveCounter(); } + public int getConnectFailures() { + return connectFailures; + } + public void connectionInterruptProcessingComplete(ConnectionId connectionId) { synchronized (reconnectMutex) { stateTracker.connectionInterruptProcessingComplete(this, connectionId); diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java index 04b3e47e65..7fdb415c30 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java @@ -28,6 +28,7 @@ import org.apache.activemq.state.ConnectionStateTracker; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.util.Wait; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -41,6 +42,7 @@ public class FailoverTransportTest { @Before public void setUp() throws Exception { + commandsReceived = 0; } @After @@ -50,7 +52,40 @@ public class FailoverTransportTest { } } - @Test(timeout=30000) + @Test(timeout = 30000) + public void testReconnectUnlimited() throws Exception { + + Transport transport = TransportFactory.connect( + new URI("failover://(tcp://0.0.0.0:61616)?useExponentialBackOff=false&reconnectDelay=0&initialReconnectDelay=0")); + + transport.setTransportListener(new TransportListener() { + + public void onCommand(Object command) { + commandsReceived++; + } + + public void onException(IOException error) { + } + + public void transportInterupted() { + } + + public void transportResumed() { + } + }); + transport.start(); + + this.failoverTransport = transport.narrow(FailoverTransport.class); + + assertTrue("no implicit limit of 1000", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return failoverTransport.getConnectFailures() > 1002; + } + })); + } + + @Test(timeout=30000) public void testCommandsIgnoredWhenOffline() throws Exception { this.transport = createTransport();