mirror of https://github.com/apache/activemq.git
resolve: https://issues.apache.org/jira/browse/AMQ-2852 - have async close use default thread pool and have inactivity monitor include the task runner in its resource usage such that it can be shutdown. provide accessor to shutdown the default thread pool such that a webapp can cleanup if needed, org.apache.activemq.thread.DefaultThreadPools#shutdown - additional tests to cover same
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1049184 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1a0c582028
commit
1355cf5810
|
@ -49,4 +49,12 @@ public final class DefaultThreadPools {
|
||||||
return DEFAULT_TASK_RUNNER_FACTORY;
|
return DEFAULT_TASK_RUNNER_FACTORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Useful to cleanup when it is known that all brokers and connections are
|
||||||
|
* close and stopped, eg: when un deploying from web container.
|
||||||
|
*/
|
||||||
|
public static void shutdown() {
|
||||||
|
DEFAULT_TASK_RUNNER_FACTORY.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,8 +41,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
public class InactivityMonitor extends TransportFilter {
|
public class InactivityMonitor extends TransportFilter {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(InactivityMonitor.class);
|
private static final Log LOG = LogFactory.getLog(InactivityMonitor.class);
|
||||||
private static final ThreadPoolExecutor ASYNC_TASKS;
|
private static ThreadPoolExecutor ASYNC_TASKS;
|
||||||
|
|
||||||
private static int CHECKER_COUNTER;
|
private static int CHECKER_COUNTER;
|
||||||
private static long DEFAULT_CHECK_TIME_MILLS = 30000;
|
private static long DEFAULT_CHECK_TIME_MILLS = 30000;
|
||||||
private static Timer READ_CHECK_TIMER;
|
private static Timer READ_CHECK_TIMER;
|
||||||
|
@ -311,6 +310,7 @@ public class InactivityMonitor extends TransportFilter {
|
||||||
writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
|
writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
|
||||||
synchronized( InactivityMonitor.class ) {
|
synchronized( InactivityMonitor.class ) {
|
||||||
if( CHECKER_COUNTER == 0 ) {
|
if( CHECKER_COUNTER == 0 ) {
|
||||||
|
ASYNC_TASKS = createExecutor();
|
||||||
READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true);
|
READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true);
|
||||||
WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true);
|
WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true);
|
||||||
}
|
}
|
||||||
|
@ -354,20 +354,22 @@ public class InactivityMonitor extends TransportFilter {
|
||||||
READ_CHECK_TIMER.cancel();
|
READ_CHECK_TIMER.cancel();
|
||||||
WRITE_CHECK_TIMER = null;
|
WRITE_CHECK_TIMER = null;
|
||||||
READ_CHECK_TIMER = null;
|
READ_CHECK_TIMER = null;
|
||||||
|
ASYNC_TASKS.shutdownNow();
|
||||||
|
ASYNC_TASKS = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ThreadFactory factory = new ThreadFactory() {
|
||||||
|
public Thread newThread(Runnable runnable) {
|
||||||
|
Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable);
|
||||||
|
thread.setDaemon(true);
|
||||||
|
return thread;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
static {
|
private ThreadPoolExecutor createExecutor() {
|
||||||
ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
|
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
|
||||||
public Thread newThread(Runnable runnable) {
|
|
||||||
Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable);
|
|
||||||
thread.setDaemon(true);
|
|
||||||
return thread;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,6 +47,9 @@ import org.apache.activemq.wireformat.WireFormat;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
|
||||||
|
import static org.apache.activemq.thread.DefaultThreadPools.getDefaultTaskRunnerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An implementation of the {@link Transport} interface using raw tcp/ip
|
* An implementation of the {@link Transport} interface using raw tcp/ip
|
||||||
*
|
*
|
||||||
|
@ -55,7 +58,6 @@ import org.apache.commons.logging.LogFactory;
|
||||||
*/
|
*/
|
||||||
public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
|
public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
|
||||||
private static final Log LOG = LogFactory.getLog(TcpTransport.class);
|
private static final Log LOG = LogFactory.getLog(TcpTransport.class);
|
||||||
private static final ThreadPoolExecutor SOCKET_CLOSE;
|
|
||||||
protected final URI remoteLocation;
|
protected final URI remoteLocation;
|
||||||
protected final URI localLocation;
|
protected final URI localLocation;
|
||||||
protected final WireFormat wireFormat;
|
protected final WireFormat wireFormat;
|
||||||
|
@ -516,7 +518,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
||||||
//closing the socket can hang also
|
//closing the socket can hang also
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
SOCKET_CLOSE.execute(new Runnable() {
|
getDefaultTaskRunnerFactory().execute(new Runnable() {
|
||||||
|
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
|
@ -612,19 +614,6 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
||||||
return super.narrow(target);
|
return super.narrow(target);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static {
|
|
||||||
SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
|
|
||||||
public Thread newThread(Runnable runnable) {
|
|
||||||
Thread thread = new Thread(runnable, "TcpSocketClose: "+runnable);
|
|
||||||
thread.setPriority(Thread.MAX_PRIORITY);
|
|
||||||
thread.setDaemon(true);
|
|
||||||
return thread;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public int getReceiveCounter() {
|
public int getReceiveCounter() {
|
||||||
return receiveCounter;
|
return receiveCounter;
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.bugs.embedded.ThreadExplorer;
|
import org.apache.activemq.bugs.embedded.ThreadExplorer;
|
||||||
import org.apache.activemq.network.NetworkConnector;
|
import org.apache.activemq.network.NetworkConnector;
|
||||||
|
|
||||||
|
import static org.apache.activemq.thread.DefaultThreadPools.shutdown;
|
||||||
|
|
||||||
public class VmTransportNetworkBrokerTest extends TestCase {
|
public class VmTransportNetworkBrokerTest extends TestCase {
|
||||||
|
|
||||||
private static final String VM_BROKER_URI =
|
private static final String VM_BROKER_URI =
|
||||||
|
@ -60,10 +62,9 @@ public class VmTransportNetworkBrokerTest extends TestCase {
|
||||||
assertTrue("Threads are leaking: " + ThreadExplorer.show("active sleep") + ", threadCount=" + threadCount + " threadCountAfterSleep=" + threadCountAfterSleep,
|
assertTrue("Threads are leaking: " + ThreadExplorer.show("active sleep") + ", threadCount=" + threadCount + " threadCountAfterSleep=" + threadCountAfterSleep,
|
||||||
threadCountAfterSleep < threadCount + 8);
|
threadCountAfterSleep < threadCount + 8);
|
||||||
|
|
||||||
connection.stop();
|
connection.close();
|
||||||
broker.stop();
|
broker.stop();
|
||||||
broker.waitUntilStopped();
|
broker.waitUntilStopped();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testNoDanglingThreadsAfterStop() throws Exception {
|
public void testNoDanglingThreadsAfterStop() throws Exception {
|
||||||
|
@ -73,13 +74,22 @@ public class VmTransportNetworkBrokerTest extends TestCase {
|
||||||
broker.setSchedulerSupport(true);
|
broker.setSchedulerSupport(true);
|
||||||
broker.setDedicatedTaskRunner(true);
|
broker.setDedicatedTaskRunner(true);
|
||||||
broker.setPersistent(false);
|
broker.setPersistent(false);
|
||||||
broker.addConnector("tcp://localhost:61616");
|
broker.addConnector("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
|
||||||
broker.start();
|
broker.start();
|
||||||
|
|
||||||
|
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
|
||||||
|
Connection connection = cf.createConnection("system", "manager");
|
||||||
|
connection.start();
|
||||||
|
connection.close();
|
||||||
broker.stop();
|
broker.stop();
|
||||||
broker.waitUntilStopped();
|
broker.waitUntilStopped();
|
||||||
|
shutdown();
|
||||||
|
|
||||||
|
// let it settle
|
||||||
|
TimeUnit.SECONDS.sleep(5);
|
||||||
|
|
||||||
int threadCountAfterStop = Thread.activeCount();
|
int threadCountAfterStop = Thread.activeCount();
|
||||||
assertTrue("Threads are leaking: " + ThreadExplorer.show("active afer stop") + ". threadCount=" + threadCount + " threadCountAfterStop=" + threadCountAfterStop,
|
assertTrue("Threads are leaking: " + ThreadExplorer.show("active after stop") + ". threadCount=" + threadCount + " threadCountAfterStop=" + threadCountAfterStop,
|
||||||
threadCountAfterStop == threadCount);
|
threadCountAfterStop == threadCount);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue