mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-2730 - test to validate there is no implicit limit on reconnects due to the task runner exceeding its iterations, if the task is not complete it re queues it, so at least on trunk, the taskrunner issue is resolved
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1208377 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
44488a5bc3
commit
599abc58bb
|
@ -952,7 +952,8 @@ public class FailoverTransport implements CompositeTransport {
|
|||
|
||||
int reconnectLimit = calculateReconnectAttemptLimit();
|
||||
|
||||
if (reconnectLimit != INFINITE && ++connectFailures >= reconnectLimit) {
|
||||
connectFailures++;
|
||||
if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) {
|
||||
LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)");
|
||||
connectionFailure = failure;
|
||||
|
||||
|
@ -1158,6 +1159,10 @@ public class FailoverTransport implements CompositeTransport {
|
|||
return transport.getReceiveCounter();
|
||||
}
|
||||
|
||||
public int getConnectFailures() {
|
||||
return connectFailures;
|
||||
}
|
||||
|
||||
public void connectionInterruptProcessingComplete(ConnectionId connectionId) {
|
||||
synchronized (reconnectMutex) {
|
||||
stateTracker.connectionInterruptProcessingComplete(this, connectionId);
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.activemq.state.ConnectionStateTracker;
|
|||
import org.apache.activemq.transport.Transport;
|
||||
import org.apache.activemq.transport.TransportFactory;
|
||||
import org.apache.activemq.transport.TransportListener;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -41,6 +42,7 @@ public class FailoverTransportTest {
|
|||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
commandsReceived = 0;
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -50,7 +52,40 @@ public class FailoverTransportTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout=30000)
|
||||
@Test(timeout = 30000)
|
||||
public void testReconnectUnlimited() throws Exception {
|
||||
|
||||
Transport transport = TransportFactory.connect(
|
||||
new URI("failover://(tcp://0.0.0.0:61616)?useExponentialBackOff=false&reconnectDelay=0&initialReconnectDelay=0"));
|
||||
|
||||
transport.setTransportListener(new TransportListener() {
|
||||
|
||||
public void onCommand(Object command) {
|
||||
commandsReceived++;
|
||||
}
|
||||
|
||||
public void onException(IOException error) {
|
||||
}
|
||||
|
||||
public void transportInterupted() {
|
||||
}
|
||||
|
||||
public void transportResumed() {
|
||||
}
|
||||
});
|
||||
transport.start();
|
||||
|
||||
this.failoverTransport = transport.narrow(FailoverTransport.class);
|
||||
|
||||
assertTrue("no implicit limit of 1000", Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return failoverTransport.getConnectFailures() > 1002;
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Test(timeout=30000)
|
||||
public void testCommandsIgnoredWhenOffline() throws Exception {
|
||||
this.transport = createTransport();
|
||||
|
||||
|
|
Loading…
Reference in New Issue