diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 47237b5783..4ca1af1125 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -826,13 +826,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C ClientSessionFactoryImpl.logger.trace("Waiting " + interval + " milliseconds before next retry. RetryInterval=" + retryInterval + " and multiplier=" + retryIntervalMultiplier); } - try { - if (clientProtocolManager.waitOnLatch(interval)) { - return; - } - } catch (InterruptedException ignore) { - throw new ActiveMQInterruptedException(createTrace); - } + if (waitForRetry(interval)) + return; // Exponential back-off long newInterval = (long) (interval * retryIntervalMultiplier); @@ -850,6 +845,18 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C } } + @Override + public boolean waitForRetry(long interval) { + try { + if (clientProtocolManager.waitOnLatch(interval)) { + return true; + } + } catch (InterruptedException ignore) { + throw new ActiveMQInterruptedException(createTrace); + } + return false; + } + private void cancelScheduledTasks() { Future pingerFutureLocal = pingerFuture; if (pingerFutureLocal != null) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java index f72ca5ea28..408c8e7454 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java @@ -60,4 +60,6 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory { ConfirmationWindowWarning getConfirmationWindowWarning(); Lock lockFailover(); + + boolean waitForRetry(long interval); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index a05a8af123..fd3507e275 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -563,6 +563,24 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } } + private int getConnectorsSize() { + Pair[] usedTopology; + + flushTopology(); + + synchronized (topologyArrayGuard) { + usedTopology = topologyArray; + } + + synchronized (this) { + if (usedTopology != null && useTopologyForLoadBalancing) { + return usedTopology.length; + } else { + return initialConnectors.length; + } + } + } + @Override public void start(Executor executor) throws Exception { initialize(); @@ -764,9 +782,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery ClientSessionFactoryInternal factory = null; synchronized (this) { - boolean retry; + boolean retry = true; int attempts = 0; - do { + while (retry && !isClosed()) { retry = false; TransportConfiguration tc = selectConnector(); @@ -780,31 +798,36 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery factory = new ClientSessionFactoryImpl(this, tc, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors); try { addToConnecting(factory); - factory.connect(initialConnectAttempts, failoverOnInitialConnection); + // We always try to connect here with only one attempt, + // as we will perform the initial retry here, looking for all possible connectors + factory.connect(1, false); } finally { removeFromConnecting(factory); } } catch (ActiveMQException e) { - factory.close(); - if (e.getType() == ActiveMQExceptionType.NOT_CONNECTED) { - attempts++; + try { + if (e.getType() == ActiveMQExceptionType.NOT_CONNECTED) { + attempts++; - synchronized (topologyArrayGuard) { + int connectorsSize = getConnectorsSize(); + int maxAttempts = initialConnectAttempts == 0 ? 1 : initialConnectAttempts; - if (topologyArray != null && attempts == topologyArray.length) { + if (initialConnectAttempts >= 0 && attempts >= maxAttempts * connectorsSize) { throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers(); } - if (topologyArray == null && attempts == this.getNumInitialConnectors()) { + if (factory.waitForRetry(retryInterval)) { throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers(); } + retry = true; + } else { + throw e; } - retry = true; - } else { - throw e; + } finally { + + factory.close(); } } } - while (retry); } // ATM topology is never != null. Checking here just to be consistent with diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InitialConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InitialConnectionTest.java new file mode 100644 index 0000000000..dc93db4b2f --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InitialConnectionTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.client; + +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Assert; +import org.junit.Test; + +public class InitialConnectionTest extends ActiveMQTestBase { + + @Test + public void testInitialInfinite() throws Exception { + AtomicInteger errors = new AtomicInteger(0); + + ActiveMQServer server = createServer(false, true); + + Thread t = new Thread() { + @Override + public void run() { + try { + Thread.sleep(500); + server.start(); + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + } + }; + t.start(); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("(tcp://localhost:61618,tcp://localhost:61616,tcp://localhost:61610)?ha=true&retryInterval=100&retryIntervalMultiplier=1.0&reconnectAttempts=-1&initialConnectAttempts=-1&useTopologyForLoadBalancing=true"); + connectionFactory.createConnection().close(); + connectionFactory.close(); + + + t.join(); + + Assert.assertEquals(0, errors.get()); + } + + + @Test + public void testNegativeMaxTries() throws Exception { + + long timeStart = System.currentTimeMillis(); + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("(tcp://localhost:61618,tcp://localhost:61616,tcp://localhost:61610)?ha=true&retryInterval=100&retryIntervalMultiplier=1.0&reconnectAttempts=-1&initialConnectAttempts=2&useTopologyForLoadBalancing=true"); + boolean failed = false; + try { + connectionFactory.createConnection(); + } catch (JMSException e) { + // expected + failed = true; + } + Assert.assertTrue(failed); + long timeEnd = System.currentTimeMillis(); + Assert.assertTrue("3 connectors, at 100 milliseconds each try, initialConnectAttempt=2, it should have waited at least 600 (- 100 from the last try that we don't actually wait, just throw ) milliseconds", timeEnd - timeStart >= 500); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java index 8d886cb54a..a19b79e06a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.ra.ActiveMQRASession; import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter; import org.apache.activemq.artemis.ra.inflow.ActiveMQActivation; import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec; +import org.apache.activemq.artemis.tests.util.Wait; import org.junit.Test; public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase { @@ -193,6 +194,7 @@ public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase { spec.setMaxSession(CONSUMER_COUNT); spec.setSetupAttempts(5); spec.setSetupInterval(200L); + spec.setRetryInterval(100L); spec.setReconnectAttempts(reconnectAttempts); spec.setHA(true); // if this isn't true then the topology listener won't get nodeDown notifications spec.setCallTimeout(500L); // if this isn't set then it may take a long time for tearDown to occur on the MDB connection @@ -220,32 +222,25 @@ public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase { assertNotNull(endpoint.lastMessage); assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "test"); - for (int i = 0; i < 10; i++) { - secondaryServer.stop(); + try { + for (int i = 0; i < 10; i++) { + secondaryServer.stop(); - long mark = System.currentTimeMillis(); - long timeout = 5000; - while (primaryQueue.getConsumerCount() < CONSUMER_COUNT && (System.currentTimeMillis() - mark) < timeout) { - Thread.sleep(100); + Wait.assertTrue(() -> primaryQueue.getConsumerCount() == CONSUMER_COUNT); + + secondaryServer.start(); + waitForServerToStart(secondaryServer); + secondaryQueue = secondaryServer.locateQueue(MDBQUEUEPREFIXEDSIMPLE); + + Queue secondaryQueueRef = secondaryQueue; + Wait.assertTrue(() -> primaryQueue.getConsumerCount() <= CONSUMER_COUNT); + Wait.assertTrue(() -> secondaryQueueRef.getConsumerCount() <= CONSUMER_COUNT); + Wait.assertTrue(() -> primaryQueue.getConsumerCount() + secondaryQueueRef.getConsumerCount() == CONSUMER_COUNT); } - - assertTrue(primaryQueue.getConsumerCount() == CONSUMER_COUNT); - - secondaryServer.start(); - waitForServerToStart(secondaryServer); - secondaryQueue = secondaryServer.locateQueue(MDBQUEUEPREFIXEDSIMPLE); - - mark = System.currentTimeMillis(); - while (((primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount()) < (CONSUMER_COUNT) || primaryQueue.getConsumerCount() == CONSUMER_COUNT) && (System.currentTimeMillis() - mark) <= timeout) { - Thread.sleep(100); - } - - assertTrue(primaryQueue.getConsumerCount() < CONSUMER_COUNT); - assertTrue(secondaryQueue.getConsumerCount() < CONSUMER_COUNT); - assertTrue(primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount() == CONSUMER_COUNT); + } finally { + qResourceAdapter.endpointDeactivation(endpointFactory, spec); + qResourceAdapter.stop(); } - qResourceAdapter.endpointDeactivation(endpointFactory, spec); - qResourceAdapter.stop(); } }