Clean up test start / stop to streamline things and make the tests run
at a bit more reasonable speed.
This commit is contained in:
Timothy Bish 2015-02-20 11:41:23 -05:00
parent 2c92c34132
commit a2b78fdeb0
2 changed files with 17 additions and 81 deletions

View File

@ -20,12 +20,8 @@ import java.io.File;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.Set; import java.util.Set;
import java.util.Vector; import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
@ -59,6 +55,9 @@ public class AmqpTestSupport {
@Rule public TestName name = new TestName(); @Rule public TestName name = new TestName();
protected static final Logger LOG = LoggerFactory.getLogger(AmqpTestSupport.class); protected static final Logger LOG = LoggerFactory.getLogger(AmqpTestSupport.class);
protected ExecutorService testService = Executors.newSingleThreadExecutor();
protected BrokerService brokerService; protected BrokerService brokerService;
protected Vector<Throwable> exceptions = new Vector<Throwable>(); protected Vector<Throwable> exceptions = new Vector<Throwable>();
protected int numberOfMessages; protected int numberOfMessages;
@ -82,19 +81,8 @@ public class AmqpTestSupport {
public void setUp() throws Exception { public void setUp() throws Exception {
LOG.info("========== start " + getTestName() + " =========="); LOG.info("========== start " + getTestName() + " ==========");
exceptions.clear(); exceptions.clear();
if (killHungThreads("setUp")) {
LOG.warn("HUNG THREADS in setUp");
}
ExecutorService executor = Executors.newSingleThreadExecutor(); startBroker();
Future<Boolean> future = executor.submit(new SetUpTask());
try {
LOG.debug("SetUpTask started.");
future.get(60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
throw new Exception("startBroker timed out");
}
executor.shutdownNow();
this.numberOfMessages = 2000; this.numberOfMessages = 2000;
} }
@ -218,42 +206,8 @@ public class AmqpTestSupport {
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
stopBroker();
LOG.info("========== tearDown " + getTestName() + " =========="); LOG.info("========== tearDown " + getTestName() + " ==========");
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Boolean> future = executor.submit(new TearDownTask());
try {
LOG.debug("tearDown started.");
future.get(60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
throw new Exception("stopBroker timed out");
} finally {
executor.shutdownNow();
if (killHungThreads("tearDown")) {
LOG.warn("HUNG THREADS in tearDown");
}
}
}
private boolean killHungThreads(String stage) throws Exception{
Thread.sleep(500);
if (Thread.activeCount() == 1) {
return false;
}
LOG.warn("Hung Thread(s) on {} entry threadCount {} ", stage, Thread.activeCount());
Thread[] threads = new Thread[Thread.activeCount()];
Thread.enumerate(threads);
for (int i=0; i < threads.length; i++) {
Thread t = threads[i];
if (!t.getName().equals("main")) {
LOG.warn("KillHungThreads: Interrupting thread {}", t.getName());
t.interrupt();
}
}
LOG.warn("Hung Thread on {} exit threadCount {} ", stage, Thread.activeCount());
return true;
} }
public void sendMessages(Connection connection, Destination destination, int count) throws Exception { public void sendMessages(Connection connection, Destination destination, int count) throws Exception {
@ -310,30 +264,4 @@ public class AmqpTestSupport {
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
return proxy; return proxy;
} }
public class SetUpTask implements Callable<Boolean> {
@SuppressWarnings("unused")
private String testName;
@Override
public Boolean call() throws Exception {
LOG.debug("in SetUpTask.call, calling startBroker");
startBroker();
return Boolean.TRUE;
}
}
public class TearDownTask implements Callable<Boolean> {
@SuppressWarnings("unused")
private String testName;
@Override
public Boolean call() throws Exception {
LOG.debug("in TearDownTask.call(), calling stopBroker");
stopBroker();
return Boolean.TRUE;
}
}
} }

View File

@ -17,7 +17,6 @@
package org.apache.activemq.transport.amqp; package org.apache.activemq.transport.amqp;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -35,18 +34,26 @@ public class JMSClientTestSupport extends AmqpTestSupport {
protected Connection connection; protected Connection connection;
private Thread connectionCloseThread;
@Override @Override
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor(); Future<Boolean> future = testService.submit(new CloseConnectionTask());
Future<Boolean> future = executor.submit(new CloseConnectionTask());
try { try {
LOG.debug("tearDown started."); LOG.debug("tearDown started.");
future.get(60, TimeUnit.SECONDS); future.get(60, TimeUnit.SECONDS);
} catch (TimeoutException e) { } catch (TimeoutException e) {
if (connectionCloseThread != null) {
connectionCloseThread.interrupt();;
}
testService.shutdownNow();
testService = Executors.newSingleThreadExecutor();
throw new Exception("CloseConnection timed out"); throw new Exception("CloseConnection timed out");
} finally { } finally {
executor.shutdownNow(); connectionCloseThread = null;
connection = null;
super.tearDown(); super.tearDown();
} }
} }
@ -55,6 +62,7 @@ public class JMSClientTestSupport extends AmqpTestSupport {
@Override @Override
public Boolean call() throws Exception { public Boolean call() throws Exception {
if (connection != null) { if (connection != null) {
connectionCloseThread = Thread.currentThread();
LOG.debug("in CloseConnectionTask.call(), calling connection.close()"); LOG.debug("in CloseConnectionTask.call(), calling connection.close()");
connection.close(); connection.close();
} }