This closes #369

This commit is contained in:
jbertram 2016-02-04 09:46:13 -06:00
commit d499e4d8cb
6 changed files with 73 additions and 62 deletions

View File

@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -119,6 +120,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
private final double retryIntervalMultiplier; // For exponential backoff private final double retryIntervalMultiplier; // For exponential backoff
private final CountDownLatch latchFinalTopology = new CountDownLatch(1);
private final long maxRetryInterval; private final long maxRetryInterval;
private int reconnectAttempts; private int reconnectAttempts;
@ -473,6 +476,18 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
interruptConnectAndCloseAllSessions(false); interruptConnectAndCloseAllSessions(false);
} }
@Override
public boolean waitForTopology(long timeout, TimeUnit unit) {
try {
return latchFinalTopology.await(timeout, unit);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
ActiveMQClientLogger.LOGGER.warn(e.getMessage(), e);
return false;
}
}
@Override @Override
public boolean isClosed() { public boolean isClosed() {
return closed || serverLocator.isClosed(); return closed || serverLocator.isClosed();
@ -881,7 +896,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
return connection; return connection;
} }
else { else {
connection = establishNewConnection(); RemotingConnection connection = establishNewConnection();
this.connection = connection;
//we check if we can actually connect. //we check if we can actually connect.
// we do it here as to receive the reply connection has to be not null // we do it here as to receive the reply connection has to be not null
@ -1083,7 +1100,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
transportConnection = openTransportConnection(backupConnector); transportConnection = openTransportConnection(backupConnector);
if ((transportConnection = openTransportConnection(backupConnector)) != null) { if (transportConnection != null) {
/*looks like the backup is now live, let's use that*/ /*looks like the backup is now live, let's use that*/
if (ClientSessionFactoryImpl.isDebug) { if (ClientSessionFactoryImpl.isDebug) {
@ -1319,6 +1336,11 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
String scaleDownGroupName, String scaleDownGroupName,
Pair<TransportConfiguration, TransportConfiguration> connectorPair, Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean isLast) { boolean isLast) {
if (isLast) {
latchFinalTopology.countDown();
}
// if it is our connector then set the live id used for failover // if it is our connector then set the live id used for failover
if (connectorPair.getA() != null && TransportConfigurationUtil.isSameHost(connectorPair.getA(), connectorConfig)) { if (connectorPair.getA() != null && TransportConfigurationUtil.isSameHost(connectorPair.getA(), connectorConfig)) {
liveNodeID = nodeID; liveNodeID = nodeID;

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.artemis.core.client.impl; package org.apache.activemq.artemis.core.client.impl;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -32,6 +33,8 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory {
boolean removeFailureListener(SessionFailureListener listener); boolean removeFailureListener(SessionFailureListener listener);
boolean waitForTopology(long timeout, TimeUnit unit);
void disableFinalizeCheck(); void disableFinalizeCheck();
String getLiveNodeId(); String getLiveNodeId();

View File

@ -461,11 +461,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
@Override @Override
public void resetToInitialConnectors() { public void resetToInitialConnectors() {
synchronized (topologyArrayGuard) { receivedTopology = false;
receivedTopology = false; topologyArray = null;
topologyArray = null; topology.clear();
topology.clear();
}
} }
/* /*
@ -807,32 +805,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
} while (retry); } while (retry);
} }
synchronized (topologyArrayGuard) { // ATM topology is never != null. Checking here just to be consistent with
// We always wait for the topology, as the server // how the sendSubscription happens.
// will send a single element if not cluster // in case this ever changes.
// so clients can know the id of the server they are connected to if (topology != null && !factory.waitForTopology(callTimeout, TimeUnit.MILLISECONDS)) {
final long timeout = System.currentTimeMillis() + callTimeout; throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup);
while (!isClosed() && !receivedTopology && timeout > System.currentTimeMillis()) {
// Now wait for the topology
try {
topologyArrayGuard.wait(1000);
}
catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
}
// We are waiting for the topology here,
// however to avoid a race where the connection is closed (and receivedtopology set to true)
// between the wait and this timeout here, we redo the check for timeout.
// if this becomes false there's no big deal and we will just ignore the issue
// notice that we can't add more locks here otherwise there wouldn't be able to avoid a deadlock
final boolean hasTimedOut = timeout > System.currentTimeMillis();
if (!hasTimedOut && !receivedTopology) {
if (factory != null)
factory.cleanup();
throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup);
}
} }
addFactory(factory); addFactory(factory);
@ -1457,19 +1434,17 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
updateArraysAndPairs(); updateArraysAndPairs();
} }
else { else {
synchronized (topologyArrayGuard) { if (topology.isEmpty()) {
if (topology.isEmpty()) { // Resetting the topology to its original condition as it was brand new
receivedTopology = false;
topologyArray = null;
}
else {
updateArraysAndPairs();
if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null) {
// Resetting the topology to its original condition as it was brand new // Resetting the topology to its original condition as it was brand new
receivedTopology = false; receivedTopology = false;
topologyArray = null;
}
else {
updateArraysAndPairs();
if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null) {
// Resetting the topology to its original condition as it was brand new
receivedTopology = false;
}
} }
} }
} }
@ -1507,11 +1482,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
updateArraysAndPairs(); updateArraysAndPairs();
if (last) { if (last) {
synchronized (topologyArrayGuard) { receivedTopology = true;
receivedTopology = true;
// Notify if waiting on getting topology
topologyArrayGuard.notifyAll();
}
} }
} }
@ -1600,12 +1571,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
} }
if (!clusterConnection && isEmpty) { if (!clusterConnection && isEmpty) {
// Go back to using the broadcast or static list receivedTopology = false;
synchronized (topologyArrayGuard) { topologyArray = null;
receivedTopology = false;
topologyArray = null;
}
} }
} }
@ -1632,6 +1599,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
/** /**
* for tests only and not part of the public interface. Do not use it. * for tests only and not part of the public interface. Do not use it.
*
* @return * @return
*/ */
public TransportConfiguration[] getInitialConnectors() { public TransportConfiguration[] getInitialConnectors() {
@ -1892,7 +1860,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
return buffer.toString(); return buffer.toString();
} }
private void feedInterceptors(final List<Interceptor> interceptors, final String interceptorList) { private void feedInterceptors(final List<Interceptor> interceptors, final String interceptorList) {
interceptors.clear(); interceptors.clear();
if (interceptorList == null || interceptorList.trim().equals("")) { if (interceptorList == null || interceptorList.trim().equals("")) {

View File

@ -84,7 +84,7 @@ public final class Topology {
/** /**
* It will remove all elements as if it haven't received anyone from the server. * It will remove all elements as if it haven't received anyone from the server.
*/ */
public void clear() { public synchronized void clear() {
topology.clear(); topology.clear();
} }

View File

@ -29,12 +29,30 @@ import org.junit.Test;
public class MultipleThreadsOpeningTest extends JMSClusteredTestBase { public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
/** created for https://issues.apache.org/jira/browse/ARTEMIS-385 */
@Test
public void testRepetitions() throws Exception {
// This test was eventually failing with way over more iterations.
// you might increase it for debugging
final int ITERATIONS = 50;
for (int i = 0; i < ITERATIONS; i++) {
System.out.println("#test " + i);
internalMultipleOpen(200, 1);
tearDown();
setUp();
}
}
@Test @Test
public void testMultipleOpen() throws Exception { public void testMultipleOpen() throws Exception {
cf1 = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName(), generateInVMParams(1))); internalMultipleOpen(20, 500);
}
final int numberOfOpens = 500; protected void internalMultipleOpen(final int numberOfThreads, final int numberOfOpens) throws Exception {
int numberOfThreads = 20;
cf1 = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName(), generateInVMParams(1)));
// I want all the threads aligned, just ready to start creating connections like in a car race // I want all the threads aligned, just ready to start creating connections like in a car race
final CountDownLatch flagAlignSemaphore = new CountDownLatch(numberOfThreads); final CountDownLatch flagAlignSemaphore = new CountDownLatch(numberOfThreads);
final CountDownLatch flagStartRace = new CountDownLatch(1); final CountDownLatch flagStartRace = new CountDownLatch(1);
@ -55,7 +73,7 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
flagStartRace.await(); flagStartRace.await();
for (int i = 0; i < numberOfOpens; i++) { for (int i = 0; i < numberOfOpens; i++) {
if (i % 100 == 0) if (i > 0 && i % 100 == 0)
System.out.println("connections created on Thread " + Thread.currentThread() + " " + i); System.out.println("connections created on Thread " + Thread.currentThread() + " " + i);
Connection conn = cf1.createConnection(); Connection conn = cf1.createConnection();
Session sess = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); Session sess = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

View File

@ -73,9 +73,9 @@ public class ConnectionLimitTest extends ActiveMQTestBase {
ServerLocator locator = createNonHALocator(true).setCallTimeout(3000); ServerLocator locator = createNonHALocator(true).setCallTimeout(3000);
ClientSessionFactory clientSessionFactory = locator.createSessionFactory(); ClientSessionFactory clientSessionFactory = locator.createSessionFactory();
ClientSession clientSession = addClientSession(clientSessionFactory.createSession()); ClientSession clientSession = addClientSession(clientSessionFactory.createSession());
ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory();
try { try {
ClientSessionFactory extraClientSessionFactory = locator.createSessionFactory();
ClientSession extraClientSession = addClientSession(extraClientSessionFactory.createSession()); ClientSession extraClientSession = addClientSession(extraClientSessionFactory.createSession());
fail("creating a session here should fail"); fail("creating a session here should fail");
} }