diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index bed47b7a21..c979246f04 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -561,32 +561,34 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery clusterTransportConfiguration = locator.clusterTransportConfiguration; } - private synchronized TransportConfiguration selectConnector() { + private TransportConfiguration selectConnector() { Pair[] usedTopology; synchronized (topologyArrayGuard) { usedTopology = topologyArray; } - // if the topologyArray is null, we will use the initialConnectors - if (usedTopology != null) { - if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { - ActiveMQClientLogger.LOGGER.trace("Selecting connector from toplogy."); + synchronized (this) { + // if the topologyArray is null, we will use the initialConnectors + if (usedTopology != null) { + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { + ActiveMQClientLogger.LOGGER.trace("Selecting connector from toplogy."); + } + int pos = loadBalancingPolicy.select(usedTopology.length); + Pair pair = usedTopology[pos]; + + return pair.getA(); } - int pos = loadBalancingPolicy.select(usedTopology.length); - Pair pair = usedTopology[pos]; + else { + // Get from initialconnectors + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { + ActiveMQClientLogger.LOGGER.trace("Selecting connector from initial connectors."); + } - return pair.getA(); - } - else { - // Get from initialconnectors - if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { - ActiveMQClientLogger.LOGGER.trace("Selecting connector from initial connectors."); + int pos = loadBalancingPolicy.select(initialConnectors.length); + + return initialConnectors[pos]; } - - int pos = loadBalancingPolicy.select(initialConnectors.length); - - return initialConnectors[pos]; } } @@ -637,16 +639,23 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } private ClientSessionFactoryInternal connect(final boolean skipWarnings) throws ActiveMQException { + ClientSessionFactoryInternal returnFactory = null; + synchronized (this) { // static list of initial connectors if (getNumInitialConnectors() > 0 && discoveryGroup == null) { - ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) staticConnector.connect(skipWarnings); - addFactory(sf); - return sf; + returnFactory = (ClientSessionFactoryInternal) staticConnector.connect(skipWarnings); } } - // wait for discovery group to get the list of initial connectors - return (ClientSessionFactoryInternal) createSessionFactory(); + + if (returnFactory != null) { + addFactory(returnFactory); + return returnFactory; + } + else { + // wait for discovery group to get the list of initial connectors + return (ClientSessionFactoryInternal) createSessionFactory(); + } } @Override @@ -844,11 +853,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery factory.cleanup(); throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup); } - - addFactory(factory); - - return factory; } + + addFactory(factory); + + return factory; } public boolean isHA() { @@ -1494,16 +1503,18 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } public void factoryClosed(final ClientSessionFactory factory) { + boolean isEmpty; synchronized (factories) { factories.remove(factory); + isEmpty = factories.isEmpty(); + } - if (!clusterConnection && factories.isEmpty()) { - // Go back to using the broadcast or static list - synchronized (topologyArrayGuard) { - receivedTopology = false; + if (!clusterConnection && isEmpty) { + // Go back to using the broadcast or static list + synchronized (topologyArrayGuard) { + receivedTopology = false; - topologyArray = null; - } + topologyArray = null; } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java index 47669daf2d..6475a1334f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java @@ -23,8 +23,8 @@ import java.util.concurrent.CountDownLatch; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.api.jms.JMSFactoryType; -import org.apache.activemq.artemis.tests.util.JMSClusteredTestBase; import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory; +import org.apache.activemq.artemis.tests.util.JMSClusteredTestBase; import org.junit.Test; public class MultipleThreadsOpeningTest extends JMSClusteredTestBase { @@ -33,7 +33,7 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase { public void testMultipleOpen() throws Exception { cf1 = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName(), generateInVMParams(1))); - final int numberOfOpens = 2000; + final int numberOfOpens = 500; int numberOfThreads = 20; // I want all the threads aligned, just ready to start creating connections like in a car race final CountDownLatch flagAlignSemaphore = new CountDownLatch(numberOfThreads); @@ -41,6 +41,10 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase { class ThreadOpen extends Thread { + ThreadOpen(int i) { + super("MultipleThreadsOpeningTest/ThreadOpen::" + i); + } + int errors = 0; public void run() { @@ -50,8 +54,8 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase { flagStartRace.await(); for (int i = 0; i < numberOfOpens; i++) { - if (i % 1000 == 0) - System.out.println("tests " + i); + if (i % 100 == 0) + System.out.println("connections created on Thread " + Thread.currentThread() + " " + i); Connection conn = cf1.createConnection(); Session sess = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); sess.close(); @@ -68,18 +72,27 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase { ThreadOpen[] threads = new ThreadOpen[numberOfThreads]; for (int i = 0; i < numberOfThreads; i++) { - threads[i] = new ThreadOpen(); + threads[i] = new ThreadOpen(i); threads[i].start(); } flagAlignSemaphore.await(); flagStartRace.countDown(); - for (ThreadOpen t : threads) { - // 5 minutes seems long but this may take a bit of time in a slower box - t.join(300000); - assertFalse(t.isAlive()); - assertEquals("There are Errors on the test thread", 0, t.errors); + try { + for (ThreadOpen t : threads) { + t.join(60000); + assertFalse(t.isAlive()); + assertEquals("There are Errors on the test thread", 0, t.errors); + } + } + finally { + for (ThreadOpen t : threads) { + if (t.isAlive()) { + t.interrupt(); + } + t.join(1000); + } } } }