From 26945a471693159382348179f456c189f04ab72b Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 3 Feb 2016 21:14:26 -0500 Subject: [PATCH] ARTEMIS-385 On a possible race the Topology final notification may get lost when using many connection factories https://issues.apache.org/jira/browse/ARTEMIS-385 This fix will make sure we only wait for the topologies that are arriving from the current connection over the createFactory method --- .../client/impl/ClientSessionFactoryImpl.java | 26 ++++++- .../impl/ClientSessionFactoryInternal.java | 3 + .../core/client/impl/ServerLocatorImpl.java | 76 ++++++------------- .../artemis/core/client/impl/Topology.java | 2 +- .../cluster/MultipleThreadsOpeningTest.java | 26 ++++++- .../server/ConnectionLimitTest.java | 2 +- 6 files changed, 73 insertions(+), 62 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 5a6241f7a8..384bced55b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -119,6 +120,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C private final double retryIntervalMultiplier; // For exponential backoff + private final CountDownLatch latchFinalTopology = new CountDownLatch(1); + private final long maxRetryInterval; private int reconnectAttempts; @@ -473,6 +476,18 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C interruptConnectAndCloseAllSessions(false); } + @Override + public boolean waitForTopology(long timeout, TimeUnit unit) { + try { + return latchFinalTopology.await(timeout, unit); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + ActiveMQClientLogger.LOGGER.warn(e.getMessage(), e); + return false; + } + } + @Override public boolean isClosed() { return closed || serverLocator.isClosed(); @@ -881,7 +896,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C return connection; } else { - connection = establishNewConnection(); + RemotingConnection connection = establishNewConnection(); + + this.connection = connection; //we check if we can actually connect. // we do it here as to receive the reply connection has to be not null @@ -1083,7 +1100,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C transportConnection = openTransportConnection(backupConnector); - if ((transportConnection = openTransportConnection(backupConnector)) != null) { + if (transportConnection != null) { /*looks like the backup is now live, let's use that*/ if (ClientSessionFactoryImpl.isDebug) { @@ -1319,6 +1336,11 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C String scaleDownGroupName, Pair connectorPair, boolean isLast) { + + if (isLast) { + latchFinalTopology.countDown(); + } + // if it is our connector then set the live id used for failover if (connectorPair.getA() != null && TransportConfigurationUtil.isSameHost(connectorPair.getA(), connectorConfig)) { liveNodeID = nodeID; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java index ba2dab7694..be98a8e610 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.client.impl; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -32,6 +33,8 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory { boolean removeFailureListener(SessionFailureListener listener); + boolean waitForTopology(long timeout, TimeUnit unit); + void disableFinalizeCheck(); String getLiveNodeId(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index 4893f6cb85..22a2e0ef26 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -461,11 +461,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery @Override public void resetToInitialConnectors() { - synchronized (topologyArrayGuard) { - receivedTopology = false; - topologyArray = null; - topology.clear(); - } + receivedTopology = false; + topologyArray = null; + topology.clear(); } /* @@ -807,32 +805,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } while (retry); } - synchronized (topologyArrayGuard) { - // We always wait for the topology, as the server - // will send a single element if not cluster - // so clients can know the id of the server they are connected to - final long timeout = System.currentTimeMillis() + callTimeout; - while (!isClosed() && !receivedTopology && timeout > System.currentTimeMillis()) { - // Now wait for the topology - try { - topologyArrayGuard.wait(1000); - } - catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); - } - } - - // We are waiting for the topology here, - // however to avoid a race where the connection is closed (and receivedtopology set to true) - // between the wait and this timeout here, we redo the check for timeout. - // if this becomes false there's no big deal and we will just ignore the issue - // notice that we can't add more locks here otherwise there wouldn't be able to avoid a deadlock - final boolean hasTimedOut = timeout > System.currentTimeMillis(); - if (!hasTimedOut && !receivedTopology) { - if (factory != null) - factory.cleanup(); - throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup); - } + // ATM topology is never != null. Checking here just to be consistent with + // how the sendSubscription happens. + // in case this ever changes. + if (topology != null && !factory.waitForTopology(callTimeout, TimeUnit.MILLISECONDS)) { + throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup); } addFactory(factory); @@ -1457,19 +1434,17 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery updateArraysAndPairs(); } else { - synchronized (topologyArrayGuard) { - if (topology.isEmpty()) { + if (topology.isEmpty()) { + // Resetting the topology to its original condition as it was brand new + receivedTopology = false; + topologyArray = null; + } + else { + updateArraysAndPairs(); + + if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null) { // Resetting the topology to its original condition as it was brand new receivedTopology = false; - topologyArray = null; - } - else { - updateArraysAndPairs(); - - if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null) { - // Resetting the topology to its original condition as it was brand new - receivedTopology = false; - } } } } @@ -1507,11 +1482,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery updateArraysAndPairs(); if (last) { - synchronized (topologyArrayGuard) { - receivedTopology = true; - // Notify if waiting on getting topology - topologyArrayGuard.notifyAll(); - } + receivedTopology = true; } } @@ -1600,12 +1571,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } if (!clusterConnection && isEmpty) { - // Go back to using the broadcast or static list - synchronized (topologyArrayGuard) { - receivedTopology = false; - - topologyArray = null; - } + receivedTopology = false; + topologyArray = null; } } @@ -1632,6 +1599,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery /** * for tests only and not part of the public interface. Do not use it. + * * @return */ public TransportConfiguration[] getInitialConnectors() { @@ -1892,7 +1860,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return buffer.toString(); } - private void feedInterceptors(final List interceptors, final String interceptorList) { + private void feedInterceptors(final List interceptors, final String interceptorList) { interceptors.clear(); if (interceptorList == null || interceptorList.trim().equals("")) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java index 526d9f0100..0573b2d9fc 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java @@ -84,7 +84,7 @@ public final class Topology { /** * It will remove all elements as if it haven't received anyone from the server. */ - public void clear() { + public synchronized void clear() { topology.clear(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java index b98bf07042..6ff3748603 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java @@ -29,12 +29,30 @@ import org.junit.Test; public class MultipleThreadsOpeningTest extends JMSClusteredTestBase { + /** created for https://issues.apache.org/jira/browse/ARTEMIS-385 */ + @Test + public void testRepetitions() throws Exception { + // This test was eventually failing with way over more iterations. + // you might increase it for debugging + final int ITERATIONS = 50; + + + for (int i = 0; i < ITERATIONS; i++) { + System.out.println("#test " + i); + internalMultipleOpen(200, 1); + tearDown(); + setUp(); + } + } + @Test public void testMultipleOpen() throws Exception { - cf1 = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName(), generateInVMParams(1))); + internalMultipleOpen(20, 500); + } - final int numberOfOpens = 500; - int numberOfThreads = 20; + protected void internalMultipleOpen(final int numberOfThreads, final int numberOfOpens) throws Exception { + + cf1 = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName(), generateInVMParams(1))); // I want all the threads aligned, just ready to start creating connections like in a car race final CountDownLatch flagAlignSemaphore = new CountDownLatch(numberOfThreads); final CountDownLatch flagStartRace = new CountDownLatch(1); @@ -55,7 +73,7 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase { flagStartRace.await(); for (int i = 0; i < numberOfOpens; i++) { - if (i % 100 == 0) + if (i > 0 && i % 100 == 0) System.out.println("connections created on Thread " + Thread.currentThread() + " " + i); Connection conn = cf1.createConnection(); Session sess = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConnectionLimitTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConnectionLimitTest.java index 67ea3ba402..67c478c7fb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConnectionLimitTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ConnectionLimitTest.java @@ -73,9 +73,9 @@ public class ConnectionLimitTest extends ActiveMQTestBase { ServerLocator locator = createNonHALocator(true).setCallTimeout(3000); ClientSessionFactory clientSessionFactory = locator.createSessionFactory(); ClientSession clientSession = addClientSession(clientSessionFactory.createSession()); - ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory(); try { + ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory(); ClientSession extraClientSession = addClientSession(extraClientSessionFactory.createSession()); fail("creating a session here should fail"); }