fix intermittent failure with thread accounting test VmTransportNetworkBrokerTest, it did not lie. ci

This commit is contained in:
gtully 2015-06-19 12:34:35 +01:00
parent da3174cf98
commit b22184ebf6
2 changed files with 22 additions and 19 deletions

View File

@ -131,7 +131,7 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent {
public void serviceFailed(DiscoveryEvent devent) throws IOException { public void serviceFailed(DiscoveryEvent devent) throws IOException {
final SimpleDiscoveryEvent sevent = (SimpleDiscoveryEvent)devent; final SimpleDiscoveryEvent sevent = (SimpleDiscoveryEvent)devent;
if (sevent.failed.compareAndSet(false, true)) { if (running.get() && sevent.failed.compareAndSet(false, true)) {
listener.onServiceRemove(sevent); listener.onServiceRemove(sevent);
taskRunner.execute(new Runnable() { taskRunner.execute(new Runnable() {

View File

@ -20,8 +20,8 @@ import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection; import javax.jms.Connection;
import junit.framework.TestCase; import junit.framework.TestCase;
@ -31,6 +31,7 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.bugs.embedded.ThreadExplorer; import org.apache.activemq.bugs.embedded.ThreadExplorer;
import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -42,13 +43,12 @@ public class VmTransportNetworkBrokerTest extends TestCase {
private static final String VM_BROKER_URI = private static final String VM_BROKER_URI =
"vm://localhost?create=false"; "vm://localhost?create=false";
CountDownLatch started = new CountDownLatch(1);
CountDownLatch gotConnection = new CountDownLatch(1);
public void testNoThreadLeak() throws Exception { public void testNoThreadLeak() throws Exception {
// with VMConnection and simple discovery network connector // with VMConnection and simple discovery network connector
int originalThreadCount = Thread.activeCount(); Thread[] threads = filterDaemonThreads(ThreadExplorer.listThreads());
final int originalThreadCount = threads.length;
LOG.debug(ThreadExplorer.show("threads at beginning")); LOG.debug(ThreadExplorer.show("threads at beginning"));
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
@ -67,11 +67,11 @@ public class VmTransportNetworkBrokerTest extends TestCase {
TimeUnit.SECONDS.sleep(5); TimeUnit.SECONDS.sleep(5);
int threadCountAfterStart = Thread.activeCount(); int threadCountAfterStart = Thread.activeCount();
TimeUnit.SECONDS.sleep(30); TimeUnit.SECONDS.sleep(20);
int threadCountAfterSleep = Thread.activeCount(); int threadCountAfterSleep = Thread.activeCount();
assertTrue("Threads are leaking: " + ThreadExplorer.show("active sleep") + ", threadCount=" +threadCountAfterStart + " threadCountAfterSleep=" + threadCountAfterSleep, assertTrue("Threads are leaking: " + ThreadExplorer.show("active sleep") + ", threadCount=" +threadCountAfterStart + " threadCountAfterSleep=" + threadCountAfterSleep,
threadCountAfterSleep < threadCountAfterStart + 8); threadCountAfterSleep < 2 * threadCountAfterStart);
connection.close(); connection.close();
broker.stop(); broker.stop();
@ -92,15 +92,18 @@ public class VmTransportNetworkBrokerTest extends TestCase {
broker.stop(); broker.stop();
broker.waitUntilStopped(); broker.waitUntilStopped();
// let it settle final AtomicInteger threadCountAfterStop = new AtomicInteger();
TimeUnit.SECONDS.sleep(5); boolean ok = Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info(ThreadExplorer.show("active after stop"));
// get final threads but filter out any daemon threads that the JVM may have created.
Thread[] threads = filterDaemonThreads(ThreadExplorer.listThreads());
threadCountAfterStop.set(threads.length);
return threadCountAfterStop.get() <= originalThreadCount;
}
});
// get final threads but filter out any daemon threads that the JVM may have created.
Thread[] threads = filterDaemonThreads(ThreadExplorer.listThreads());
int threadCountAfterStop = threads.length;
// lets see the thread counts at INFO level so they are always in the test log
LOG.info(ThreadExplorer.show("active after stop"));
LOG.info("originalThreadCount=" + originalThreadCount + " threadCountAfterStop=" + threadCountAfterStop); LOG.info("originalThreadCount=" + originalThreadCount + " threadCountAfterStop=" + threadCountAfterStop);
assertTrue("Threads are leaking: " + assertTrue("Threads are leaking: " +
@ -108,8 +111,8 @@ public class VmTransportNetworkBrokerTest extends TestCase {
". originalThreadCount=" + ". originalThreadCount=" +
originalThreadCount + originalThreadCount +
" threadCountAfterStop=" + " threadCountAfterStop=" +
threadCountAfterStop, threadCountAfterStop.get(),
threadCountAfterStop <= originalThreadCount); ok);
} }
@ -142,7 +145,7 @@ public class VmTransportNetworkBrokerTest extends TestCase {
Thread thread = threadList.get(i); Thread thread = threadList.get(i);
LOG.debug("Inspecting thread " + thread.getName()); LOG.debug("Inspecting thread " + thread.getName());
if (thread.isDaemon()) { if (thread.isDaemon() && !thread.getName().contains("ActiveMQ")) {
LOG.debug("Removing deamon thread."); LOG.debug("Removing deamon thread.");
threadList.remove(thread); threadList.remove(thread);
Thread.sleep(100); Thread.sleep(100);