diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java b/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java index 5af901112d..035dc4e2a3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java +++ b/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java @@ -48,5 +48,13 @@ public final class DefaultThreadPools { public static TaskRunnerFactory getDefaultTaskRunnerFactory() { return DEFAULT_TASK_RUNNER_FACTORY; } + + /** + * Useful to cleanup when it is known that all brokers and connections are + * close and stopped, eg: when un deploying from web container. + */ + public static void shutdown() { + DEFAULT_TASK_RUNNER_FACTORY.shutdown(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java index 929d6e9867..50b3ff7bdb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java @@ -41,8 +41,7 @@ import org.apache.commons.logging.LogFactory; public class InactivityMonitor extends TransportFilter { private static final Log LOG = LogFactory.getLog(InactivityMonitor.class); - private static final ThreadPoolExecutor ASYNC_TASKS; - + private static ThreadPoolExecutor ASYNC_TASKS; private static int CHECKER_COUNTER; private static long DEFAULT_CHECK_TIME_MILLS = 30000; private static Timer READ_CHECK_TIMER; @@ -311,6 +310,7 @@ public class InactivityMonitor extends TransportFilter { writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime; synchronized( InactivityMonitor.class ) { if( CHECKER_COUNTER == 0 ) { + ASYNC_TASKS = createExecutor(); READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true); WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true); } @@ -354,20 +354,22 @@ public class InactivityMonitor extends TransportFilter { READ_CHECK_TIMER.cancel(); WRITE_CHECK_TIMER = null; READ_CHECK_TIMER = null; + ASYNC_TASKS.shutdownNow(); + ASYNC_TASKS = null; } } } } + private ThreadFactory factory = new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable); + thread.setDaemon(true); + return thread; + } + }; - static { - ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable); - thread.setDaemon(true); - return thread; - } - }); + private ThreadPoolExecutor createExecutor() { + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), factory); } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java index fca1ff37ad..27771bb2c0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java @@ -47,6 +47,9 @@ import org.apache.activemq.wireformat.WireFormat; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + +import static org.apache.activemq.thread.DefaultThreadPools.getDefaultTaskRunnerFactory; + /** * An implementation of the {@link Transport} interface using raw tcp/ip * @@ -55,7 +58,6 @@ import org.apache.commons.logging.LogFactory; */ public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable { private static final Log LOG = LogFactory.getLog(TcpTransport.class); - private static final ThreadPoolExecutor SOCKET_CLOSE; protected final URI remoteLocation; protected final URI localLocation; protected final WireFormat wireFormat; @@ -516,7 +518,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S //closing the socket can hang also final CountDownLatch latch = new CountDownLatch(1); - SOCKET_CLOSE.execute(new Runnable() { + getDefaultTaskRunnerFactory().execute(new Runnable() { public void run() { try { @@ -612,19 +614,6 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S return super.narrow(target); } - - static { - SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "TcpSocketClose: "+runnable); - thread.setPriority(Thread.MAX_PRIORITY); - thread.setDaemon(true); - return thread; - } - }); - } - - public int getReceiveCounter() { return receiveCounter; } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java index 464d07b72f..04bebeff81 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java @@ -28,6 +28,8 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.bugs.embedded.ThreadExplorer; import org.apache.activemq.network.NetworkConnector; +import static org.apache.activemq.thread.DefaultThreadPools.shutdown; + public class VmTransportNetworkBrokerTest extends TestCase { private static final String VM_BROKER_URI = @@ -60,10 +62,9 @@ public class VmTransportNetworkBrokerTest extends TestCase { assertTrue("Threads are leaking: " + ThreadExplorer.show("active sleep") + ", threadCount=" + threadCount + " threadCountAfterSleep=" + threadCountAfterSleep, threadCountAfterSleep < threadCount + 8); - connection.stop(); + connection.close(); broker.stop(); broker.waitUntilStopped(); - } public void testNoDanglingThreadsAfterStop() throws Exception { @@ -73,13 +74,22 @@ public class VmTransportNetworkBrokerTest extends TestCase { broker.setSchedulerSupport(true); broker.setDedicatedTaskRunner(true); broker.setPersistent(false); - broker.addConnector("tcp://localhost:61616"); + broker.addConnector("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000"); broker.start(); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000"); + Connection connection = cf.createConnection("system", "manager"); + connection.start(); + connection.close(); broker.stop(); broker.waitUntilStopped(); + shutdown(); + + // let it settle + TimeUnit.SECONDS.sleep(5); int threadCountAfterStop = Thread.activeCount(); - assertTrue("Threads are leaking: " + ThreadExplorer.show("active afer stop") + ". threadCount=" + threadCount + " threadCountAfterStop=" + threadCountAfterStop, + assertTrue("Threads are leaking: " + ThreadExplorer.show("active after stop") + ". threadCount=" + threadCount + " threadCountAfterStop=" + threadCountAfterStop, threadCountAfterStop == threadCount); }