This commit is contained in:
Clebert Suconic 2019-09-10 15:02:27 -04:00
commit b583ba7a47
5 changed files with 149 additions and 43 deletions

View File

@ -826,13 +826,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
ClientSessionFactoryImpl.logger.trace("Waiting " + interval + " milliseconds before next retry. RetryInterval=" + retryInterval + " and multiplier=" + retryIntervalMultiplier);
}
try {
if (clientProtocolManager.waitOnLatch(interval)) {
if (waitForRetry(interval))
return;
}
} catch (InterruptedException ignore) {
throw new ActiveMQInterruptedException(createTrace);
}
// Exponential back-off
long newInterval = (long) (interval * retryIntervalMultiplier);
@ -850,6 +845,18 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
}
}
@Override
public boolean waitForRetry(long interval) {
try {
if (clientProtocolManager.waitOnLatch(interval)) {
return true;
}
} catch (InterruptedException ignore) {
throw new ActiveMQInterruptedException(createTrace);
}
return false;
}
private void cancelScheduledTasks() {
Future<?> pingerFutureLocal = pingerFuture;
if (pingerFutureLocal != null) {

View File

@ -60,4 +60,6 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory {
ConfirmationWindowWarning getConfirmationWindowWarning();
Lock lockFailover();
boolean waitForRetry(long interval);
}

View File

@ -563,6 +563,24 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
}
private int getConnectorsSize() {
Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;
flushTopology();
synchronized (topologyArrayGuard) {
usedTopology = topologyArray;
}
synchronized (this) {
if (usedTopology != null && useTopologyForLoadBalancing) {
return usedTopology.length;
} else {
return initialConnectors.length;
}
}
}
@Override
public void start(Executor executor) throws Exception {
initialize();
@ -764,9 +782,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
ClientSessionFactoryInternal factory = null;
synchronized (this) {
boolean retry;
boolean retry = true;
int attempts = 0;
do {
while (retry && !isClosed()) {
retry = false;
TransportConfiguration tc = selectConnector();
@ -780,31 +798,36 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
factory = new ClientSessionFactoryImpl(this, tc, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
try {
addToConnecting(factory);
factory.connect(initialConnectAttempts, failoverOnInitialConnection);
// We always try to connect here with only one attempt,
// as we will perform the initial retry here, looking for all possible connectors
factory.connect(1, false);
} finally {
removeFromConnecting(factory);
}
} catch (ActiveMQException e) {
factory.close();
try {
if (e.getType() == ActiveMQExceptionType.NOT_CONNECTED) {
attempts++;
synchronized (topologyArrayGuard) {
int connectorsSize = getConnectorsSize();
int maxAttempts = initialConnectAttempts == 0 ? 1 : initialConnectAttempts;
if (topologyArray != null && attempts == topologyArray.length) {
if (initialConnectAttempts >= 0 && attempts >= maxAttempts * connectorsSize) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
}
if (topologyArray == null && attempts == this.getNumInitialConnectors()) {
if (factory.waitForRetry(retryInterval)) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
}
}
retry = true;
} else {
throw e;
}
} finally {
factory.close();
}
}
}
while (retry);
}
// ATM topology is never != null. Checking here just to be consistent with

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
public class InitialConnectionTest extends ActiveMQTestBase {
@Test
public void testInitialInfinite() throws Exception {
AtomicInteger errors = new AtomicInteger(0);
ActiveMQServer server = createServer(false, true);
Thread t = new Thread() {
@Override
public void run() {
try {
Thread.sleep(500);
server.start();
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
t.start();
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("(tcp://localhost:61618,tcp://localhost:61616,tcp://localhost:61610)?ha=true&retryInterval=100&retryIntervalMultiplier=1.0&reconnectAttempts=-1&initialConnectAttempts=-1&useTopologyForLoadBalancing=true");
connectionFactory.createConnection().close();
connectionFactory.close();
t.join();
Assert.assertEquals(0, errors.get());
}
@Test
public void testNegativeMaxTries() throws Exception {
long timeStart = System.currentTimeMillis();
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("(tcp://localhost:61618,tcp://localhost:61616,tcp://localhost:61610)?ha=true&retryInterval=100&retryIntervalMultiplier=1.0&reconnectAttempts=-1&initialConnectAttempts=2&useTopologyForLoadBalancing=true");
boolean failed = false;
try {
connectionFactory.createConnection();
} catch (JMSException e) {
// expected
failed = true;
}
Assert.assertTrue(failed);
long timeEnd = System.currentTimeMillis();
Assert.assertTrue("3 connectors, at 100 milliseconds each try, initialConnectAttempt=2, it should have waited at least 600 (- 100 from the last try that we don't actually wait, just throw ) milliseconds", timeEnd - timeStart >= 500);
}
}

View File

@ -39,6 +39,7 @@ import org.apache.activemq.artemis.ra.ActiveMQRASession;
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivation;
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Test;
public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase {
@ -193,6 +194,7 @@ public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase {
spec.setMaxSession(CONSUMER_COUNT);
spec.setSetupAttempts(5);
spec.setSetupInterval(200L);
spec.setRetryInterval(100L);
spec.setReconnectAttempts(reconnectAttempts);
spec.setHA(true); // if this isn't true then the topology listener won't get nodeDown notifications
spec.setCallTimeout(500L); // if this isn't set then it may take a long time for tearDown to occur on the MDB connection
@ -220,32 +222,25 @@ public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase {
assertNotNull(endpoint.lastMessage);
assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "test");
try {
for (int i = 0; i < 10; i++) {
secondaryServer.stop();
long mark = System.currentTimeMillis();
long timeout = 5000;
while (primaryQueue.getConsumerCount() < CONSUMER_COUNT && (System.currentTimeMillis() - mark) < timeout) {
Thread.sleep(100);
}
assertTrue(primaryQueue.getConsumerCount() == CONSUMER_COUNT);
Wait.assertTrue(() -> primaryQueue.getConsumerCount() == CONSUMER_COUNT);
secondaryServer.start();
waitForServerToStart(secondaryServer);
secondaryQueue = secondaryServer.locateQueue(MDBQUEUEPREFIXEDSIMPLE);
mark = System.currentTimeMillis();
while (((primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount()) < (CONSUMER_COUNT) || primaryQueue.getConsumerCount() == CONSUMER_COUNT) && (System.currentTimeMillis() - mark) <= timeout) {
Thread.sleep(100);
Queue secondaryQueueRef = secondaryQueue;
Wait.assertTrue(() -> primaryQueue.getConsumerCount() <= CONSUMER_COUNT);
Wait.assertTrue(() -> secondaryQueueRef.getConsumerCount() <= CONSUMER_COUNT);
Wait.assertTrue(() -> primaryQueue.getConsumerCount() + secondaryQueueRef.getConsumerCount() == CONSUMER_COUNT);
}
assertTrue(primaryQueue.getConsumerCount() < CONSUMER_COUNT);
assertTrue(secondaryQueue.getConsumerCount() < CONSUMER_COUNT);
assertTrue(primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount() == CONSUMER_COUNT);
}
} finally {
qResourceAdapter.endpointDeactivation(endpointFactory, spec);
qResourceAdapter.stop();
}
}
}