ARTEMIS-222 fixing a deadlock that appeared on the testsuite (MultipleThreadsOpeningTest)

https://issues.apache.org/jira/browse/ARTEMIS-222
This commit is contained in:
Clebert Suconic 2015-09-10 16:22:38 -04:00
parent d5a01287a5
commit f5a727259e
2 changed files with 66 additions and 42 deletions

View File

@ -561,13 +561,14 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
clusterTransportConfiguration = locator.clusterTransportConfiguration; clusterTransportConfiguration = locator.clusterTransportConfiguration;
} }
private synchronized TransportConfiguration selectConnector() { private TransportConfiguration selectConnector() {
Pair<TransportConfiguration, TransportConfiguration>[] usedTopology; Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;
synchronized (topologyArrayGuard) { synchronized (topologyArrayGuard) {
usedTopology = topologyArray; usedTopology = topologyArray;
} }
synchronized (this) {
// if the topologyArray is null, we will use the initialConnectors // if the topologyArray is null, we will use the initialConnectors
if (usedTopology != null) { if (usedTopology != null) {
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
@ -589,6 +590,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
return initialConnectors[pos]; return initialConnectors[pos];
} }
} }
}
public void start(Executor executor) throws Exception { public void start(Executor executor) throws Exception {
initialise(); initialise();
@ -637,17 +639,24 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
} }
private ClientSessionFactoryInternal connect(final boolean skipWarnings) throws ActiveMQException { private ClientSessionFactoryInternal connect(final boolean skipWarnings) throws ActiveMQException {
ClientSessionFactoryInternal returnFactory = null;
synchronized (this) { synchronized (this) {
// static list of initial connectors // static list of initial connectors
if (getNumInitialConnectors() > 0 && discoveryGroup == null) { if (getNumInitialConnectors() > 0 && discoveryGroup == null) {
ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) staticConnector.connect(skipWarnings); returnFactory = (ClientSessionFactoryInternal) staticConnector.connect(skipWarnings);
addFactory(sf);
return sf;
} }
} }
if (returnFactory != null) {
addFactory(returnFactory);
return returnFactory;
}
else {
// wait for discovery group to get the list of initial connectors // wait for discovery group to get the list of initial connectors
return (ClientSessionFactoryInternal) createSessionFactory(); return (ClientSessionFactoryInternal) createSessionFactory();
} }
}
@Override @Override
public ClientSessionFactoryInternal connectNoWarnings() throws ActiveMQException { public ClientSessionFactoryInternal connectNoWarnings() throws ActiveMQException {
@ -844,12 +853,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
factory.cleanup(); factory.cleanup();
throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup); throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup);
} }
}
addFactory(factory); addFactory(factory);
return factory; return factory;
} }
}
public boolean isHA() { public boolean isHA() {
return ha; return ha;
@ -1494,10 +1503,13 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
} }
public void factoryClosed(final ClientSessionFactory factory) { public void factoryClosed(final ClientSessionFactory factory) {
boolean isEmpty;
synchronized (factories) { synchronized (factories) {
factories.remove(factory); factories.remove(factory);
isEmpty = factories.isEmpty();
}
if (!clusterConnection && factories.isEmpty()) { if (!clusterConnection && isEmpty) {
// Go back to using the broadcast or static list // Go back to using the broadcast or static list
synchronized (topologyArrayGuard) { synchronized (topologyArrayGuard) {
receivedTopology = false; receivedTopology = false;
@ -1506,7 +1518,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
} }
} }
} }
}
public Topology getTopology() { public Topology getTopology() {
return topology; return topology;

View File

@ -23,8 +23,8 @@ import java.util.concurrent.CountDownLatch;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType; import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.tests.util.JMSClusteredTestBase;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory; import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.tests.util.JMSClusteredTestBase;
import org.junit.Test; import org.junit.Test;
public class MultipleThreadsOpeningTest extends JMSClusteredTestBase { public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
@ -33,7 +33,7 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
public void testMultipleOpen() throws Exception { public void testMultipleOpen() throws Exception {
cf1 = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName(), generateInVMParams(1))); cf1 = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName(), generateInVMParams(1)));
final int numberOfOpens = 2000; final int numberOfOpens = 500;
int numberOfThreads = 20; int numberOfThreads = 20;
// I want all the threads aligned, just ready to start creating connections like in a car race // I want all the threads aligned, just ready to start creating connections like in a car race
final CountDownLatch flagAlignSemaphore = new CountDownLatch(numberOfThreads); final CountDownLatch flagAlignSemaphore = new CountDownLatch(numberOfThreads);
@ -41,6 +41,10 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
class ThreadOpen extends Thread { class ThreadOpen extends Thread {
ThreadOpen(int i) {
super("MultipleThreadsOpeningTest/ThreadOpen::" + i);
}
int errors = 0; int errors = 0;
public void run() { public void run() {
@ -50,8 +54,8 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
flagStartRace.await(); flagStartRace.await();
for (int i = 0; i < numberOfOpens; i++) { for (int i = 0; i < numberOfOpens; i++) {
if (i % 1000 == 0) if (i % 100 == 0)
System.out.println("tests " + i); System.out.println("connections created on Thread " + Thread.currentThread() + " " + i);
Connection conn = cf1.createConnection(); Connection conn = cf1.createConnection();
Session sess = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); Session sess = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
sess.close(); sess.close();
@ -68,18 +72,27 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
ThreadOpen[] threads = new ThreadOpen[numberOfThreads]; ThreadOpen[] threads = new ThreadOpen[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++) { for (int i = 0; i < numberOfThreads; i++) {
threads[i] = new ThreadOpen(); threads[i] = new ThreadOpen(i);
threads[i].start(); threads[i].start();
} }
flagAlignSemaphore.await(); flagAlignSemaphore.await();
flagStartRace.countDown(); flagStartRace.countDown();
try {
for (ThreadOpen t : threads) { for (ThreadOpen t : threads) {
// 5 minutes seems long but this may take a bit of time in a slower box t.join(60000);
t.join(300000);
assertFalse(t.isAlive()); assertFalse(t.isAlive());
assertEquals("There are Errors on the test thread", 0, t.errors); assertEquals("There are Errors on the test thread", 0, t.errors);
} }
} }
finally {
for (ThreadOpen t : threads) {
if (t.isAlive()) {
t.interrupt();
}
t.join(1000);
}
}
}
} }