ARTEMIS-2479 Initial connection will not work with infinite retry and multiple nodes
This commit is contained in:
parent
54de31e813
commit
60b62940b9
|
@ -826,13 +826,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
ClientSessionFactoryImpl.logger.trace("Waiting " + interval + " milliseconds before next retry. RetryInterval=" + retryInterval + " and multiplier=" + retryIntervalMultiplier);
|
||||
}
|
||||
|
||||
try {
|
||||
if (clientProtocolManager.waitOnLatch(interval)) {
|
||||
return;
|
||||
}
|
||||
} catch (InterruptedException ignore) {
|
||||
throw new ActiveMQInterruptedException(createTrace);
|
||||
}
|
||||
if (waitForRetry(interval))
|
||||
return;
|
||||
|
||||
// Exponential back-off
|
||||
long newInterval = (long) (interval * retryIntervalMultiplier);
|
||||
|
@ -850,6 +845,18 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean waitForRetry(long interval) {
|
||||
try {
|
||||
if (clientProtocolManager.waitOnLatch(interval)) {
|
||||
return true;
|
||||
}
|
||||
} catch (InterruptedException ignore) {
|
||||
throw new ActiveMQInterruptedException(createTrace);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void cancelScheduledTasks() {
|
||||
Future<?> pingerFutureLocal = pingerFuture;
|
||||
if (pingerFutureLocal != null) {
|
||||
|
|
|
@ -60,4 +60,6 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory {
|
|||
ConfirmationWindowWarning getConfirmationWindowWarning();
|
||||
|
||||
Lock lockFailover();
|
||||
|
||||
boolean waitForRetry(long interval);
|
||||
}
|
||||
|
|
|
@ -563,6 +563,24 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
}
|
||||
}
|
||||
|
||||
private int getConnectorsSize() {
|
||||
Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;
|
||||
|
||||
flushTopology();
|
||||
|
||||
synchronized (topologyArrayGuard) {
|
||||
usedTopology = topologyArray;
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
if (usedTopology != null && useTopologyForLoadBalancing) {
|
||||
return usedTopology.length;
|
||||
} else {
|
||||
return initialConnectors.length;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(Executor executor) throws Exception {
|
||||
initialize();
|
||||
|
@ -764,9 +782,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
ClientSessionFactoryInternal factory = null;
|
||||
|
||||
synchronized (this) {
|
||||
boolean retry;
|
||||
boolean retry = true;
|
||||
int attempts = 0;
|
||||
do {
|
||||
while (retry && !isClosed()) {
|
||||
retry = false;
|
||||
|
||||
TransportConfiguration tc = selectConnector();
|
||||
|
@ -780,31 +798,36 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
|
|||
factory = new ClientSessionFactoryImpl(this, tc, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
|
||||
try {
|
||||
addToConnecting(factory);
|
||||
factory.connect(initialConnectAttempts, failoverOnInitialConnection);
|
||||
// We always try to connect here with only one attempt,
|
||||
// as we will perform the initial retry here, looking for all possible connectors
|
||||
factory.connect(1, false);
|
||||
} finally {
|
||||
removeFromConnecting(factory);
|
||||
}
|
||||
} catch (ActiveMQException e) {
|
||||
factory.close();
|
||||
if (e.getType() == ActiveMQExceptionType.NOT_CONNECTED) {
|
||||
attempts++;
|
||||
try {
|
||||
if (e.getType() == ActiveMQExceptionType.NOT_CONNECTED) {
|
||||
attempts++;
|
||||
|
||||
synchronized (topologyArrayGuard) {
|
||||
int connectorsSize = getConnectorsSize();
|
||||
int maxAttempts = initialConnectAttempts == 0 ? 1 : initialConnectAttempts;
|
||||
|
||||
if (topologyArray != null && attempts == topologyArray.length) {
|
||||
if (initialConnectAttempts >= 0 && attempts >= maxAttempts * connectorsSize) {
|
||||
throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
|
||||
}
|
||||
if (topologyArray == null && attempts == this.getNumInitialConnectors()) {
|
||||
if (factory.waitForRetry(retryInterval)) {
|
||||
throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
|
||||
}
|
||||
retry = true;
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
retry = true;
|
||||
} else {
|
||||
throw e;
|
||||
} finally {
|
||||
|
||||
factory.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
while (retry);
|
||||
}
|
||||
|
||||
// ATM topology is never != null. Checking here just to be consistent with
|
||||
|
|
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.integration.client;
|
||||
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class InitialConnectionTest extends ActiveMQTestBase {
|
||||
|
||||
@Test
|
||||
public void testInitialInfinite() throws Exception {
|
||||
AtomicInteger errors = new AtomicInteger(0);
|
||||
|
||||
ActiveMQServer server = createServer(false, true);
|
||||
|
||||
Thread t = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
server.start();
|
||||
} catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
errors.incrementAndGet();
|
||||
}
|
||||
}
|
||||
};
|
||||
t.start();
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("(tcp://localhost:61618,tcp://localhost:61616,tcp://localhost:61610)?ha=true&retryInterval=100&retryIntervalMultiplier=1.0&reconnectAttempts=-1&initialConnectAttempts=-1&useTopologyForLoadBalancing=true");
|
||||
connectionFactory.createConnection().close();
|
||||
connectionFactory.close();
|
||||
|
||||
|
||||
t.join();
|
||||
|
||||
Assert.assertEquals(0, errors.get());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testNegativeMaxTries() throws Exception {
|
||||
|
||||
long timeStart = System.currentTimeMillis();
|
||||
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("(tcp://localhost:61618,tcp://localhost:61616,tcp://localhost:61610)?ha=true&retryInterval=100&retryIntervalMultiplier=1.0&reconnectAttempts=-1&initialConnectAttempts=2&useTopologyForLoadBalancing=true");
|
||||
boolean failed = false;
|
||||
try {
|
||||
connectionFactory.createConnection();
|
||||
} catch (JMSException e) {
|
||||
// expected
|
||||
failed = true;
|
||||
}
|
||||
Assert.assertTrue(failed);
|
||||
long timeEnd = System.currentTimeMillis();
|
||||
Assert.assertTrue("3 connectors, at 100 milliseconds each try, initialConnectAttempt=2, it should have waited at least 600 (- 100 from the last try that we don't actually wait, just throw ) milliseconds", timeEnd - timeStart >= 500);
|
||||
}
|
||||
}
|
|
@ -39,6 +39,7 @@ import org.apache.activemq.artemis.ra.ActiveMQRASession;
|
|||
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
|
||||
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivation;
|
||||
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase {
|
||||
|
@ -193,6 +194,7 @@ public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase {
|
|||
spec.setMaxSession(CONSUMER_COUNT);
|
||||
spec.setSetupAttempts(5);
|
||||
spec.setSetupInterval(200L);
|
||||
spec.setRetryInterval(100L);
|
||||
spec.setReconnectAttempts(reconnectAttempts);
|
||||
spec.setHA(true); // if this isn't true then the topology listener won't get nodeDown notifications
|
||||
spec.setCallTimeout(500L); // if this isn't set then it may take a long time for tearDown to occur on the MDB connection
|
||||
|
@ -220,32 +222,25 @@ public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase {
|
|||
assertNotNull(endpoint.lastMessage);
|
||||
assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "test");
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
secondaryServer.stop();
|
||||
try {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
secondaryServer.stop();
|
||||
|
||||
long mark = System.currentTimeMillis();
|
||||
long timeout = 5000;
|
||||
while (primaryQueue.getConsumerCount() < CONSUMER_COUNT && (System.currentTimeMillis() - mark) < timeout) {
|
||||
Thread.sleep(100);
|
||||
Wait.assertTrue(() -> primaryQueue.getConsumerCount() == CONSUMER_COUNT);
|
||||
|
||||
secondaryServer.start();
|
||||
waitForServerToStart(secondaryServer);
|
||||
secondaryQueue = secondaryServer.locateQueue(MDBQUEUEPREFIXEDSIMPLE);
|
||||
|
||||
Queue secondaryQueueRef = secondaryQueue;
|
||||
Wait.assertTrue(() -> primaryQueue.getConsumerCount() <= CONSUMER_COUNT);
|
||||
Wait.assertTrue(() -> secondaryQueueRef.getConsumerCount() <= CONSUMER_COUNT);
|
||||
Wait.assertTrue(() -> primaryQueue.getConsumerCount() + secondaryQueueRef.getConsumerCount() == CONSUMER_COUNT);
|
||||
}
|
||||
|
||||
assertTrue(primaryQueue.getConsumerCount() == CONSUMER_COUNT);
|
||||
|
||||
secondaryServer.start();
|
||||
waitForServerToStart(secondaryServer);
|
||||
secondaryQueue = secondaryServer.locateQueue(MDBQUEUEPREFIXEDSIMPLE);
|
||||
|
||||
mark = System.currentTimeMillis();
|
||||
while (((primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount()) < (CONSUMER_COUNT) || primaryQueue.getConsumerCount() == CONSUMER_COUNT) && (System.currentTimeMillis() - mark) <= timeout) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
assertTrue(primaryQueue.getConsumerCount() < CONSUMER_COUNT);
|
||||
assertTrue(secondaryQueue.getConsumerCount() < CONSUMER_COUNT);
|
||||
assertTrue(primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount() == CONSUMER_COUNT);
|
||||
} finally {
|
||||
qResourceAdapter.endpointDeactivation(endpointFactory, spec);
|
||||
qResourceAdapter.stop();
|
||||
}
|
||||
|
||||
qResourceAdapter.endpointDeactivation(endpointFactory, spec);
|
||||
qResourceAdapter.stop();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue