additional broker side improvements for https://issues.apache.org/jira/browse/AMQ-2852 - have discovery and network connector and vm async tasks delegate to the the default thread pool executor, serialized the test to ensure shutdown is called once after verification

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1049527 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-12-15 12:44:47 +00:00
parent 738e9a19ea
commit 6348481c18
4 changed files with 34 additions and 58 deletions

View File

@ -25,9 +25,6 @@ import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -68,6 +65,8 @@ import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.ResponseCallback;
@ -92,7 +91,7 @@ import org.apache.commons.logging.LogFactory;
*/ */
public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware { public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
private static final Log LOG = LogFactory.getLog(DemandForwardingBridgeSupport.class); private static final Log LOG = LogFactory.getLog(DemandForwardingBridgeSupport.class);
private static final ThreadPoolExecutor ASYNC_TASKS; private final TaskRunnerFactory asyncTaskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory();
protected static final String DURABLE_SUB_PREFIX = "NC-DS_"; protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
protected final Transport localBroker; protected final Transport localBroker;
protected final Transport remoteBroker; protected final Transport remoteBroker;
@ -251,7 +250,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
protected void triggerLocalStartBridge() throws IOException { protected void triggerLocalStartBridge() throws IOException {
ASYNC_TASKS.execute(new Runnable() { asyncTaskRunner.execute(new Runnable() {
public void run() { public void run() {
final String originalName = Thread.currentThread().getName(); final String originalName = Thread.currentThread().getName();
Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker); Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
@ -267,7 +266,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
protected void triggerRemoteStartBridge() throws IOException { protected void triggerRemoteStartBridge() throws IOException {
ASYNC_TASKS.execute(new Runnable() { asyncTaskRunner.execute(new Runnable() {
public void run() { public void run() {
final String originalName = Thread.currentThread().getName(); final String originalName = Thread.currentThread().getName();
Thread.currentThread().setName("StartRemotelBridge: localBroker=" + localBroker); Thread.currentThread().setName("StartRemotelBridge: localBroker=" + localBroker);
@ -391,7 +390,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
try { try {
remoteBridgeStarted.set(false); remoteBridgeStarted.set(false);
final CountDownLatch sendShutdown = new CountDownLatch(1); final CountDownLatch sendShutdown = new CountDownLatch(1);
ASYNC_TASKS.execute(new Runnable() { asyncTaskRunner.execute(new Runnable() {
public void run() { public void run() {
try { try {
localBroker.oneway(new ShutdownInfo()); localBroker.oneway(new ShutdownInfo());
@ -433,7 +432,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error); LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
} }
LOG.debug("The remote Exception was: " + error, error); LOG.debug("The remote Exception was: " + error, error);
ASYNC_TASKS.execute(new Runnable() { asyncTaskRunner.execute(new Runnable() {
public void run() { public void run() {
ServiceSupport.dispose(getControllingService()); ServiceSupport.dispose(getControllingService());
} }
@ -647,7 +646,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (!disposed.get()) { if (!disposed.get()) {
LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error); LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
LOG.debug("The local Exception was:" + error, error); LOG.debug("The local Exception was:" + error, error);
ASYNC_TASKS.execute(new Runnable() { asyncTaskRunner.execute(new Runnable() {
public void run() { public void run() {
ServiceSupport.dispose(getControllingService()); ServiceSupport.dispose(getControllingService());
} }
@ -674,7 +673,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
// continue removal in separate thread to free up this thread for outstanding responses // continue removal in separate thread to free up this thread for outstanding responses
ASYNC_TASKS.execute(new Runnable() { asyncTaskRunner.execute(new Runnable() {
public void run() { public void run() {
sub.waitForCompletion(); sub.waitForCompletion();
try { try {
@ -1277,15 +1276,4 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
public void setBrokerService(BrokerService brokerService) { public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService; this.brokerService = brokerService;
} }
static {
ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "NetworkBridge");
thread.setDaemon(true);
return thread;
}
});
}
} }

View File

@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.DiscoveryEvent; import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.transport.discovery.DiscoveryAgent; import org.apache.activemq.transport.discovery.DiscoveryAgent;
import org.apache.activemq.transport.discovery.DiscoveryListener; import org.apache.activemq.transport.discovery.DiscoveryListener;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -39,7 +40,6 @@ import org.apache.commons.logging.LogFactory;
public class SimpleDiscoveryAgent implements DiscoveryAgent { public class SimpleDiscoveryAgent implements DiscoveryAgent {
private final static Log LOG = LogFactory.getLog(SimpleDiscoveryAgent.class); private final static Log LOG = LogFactory.getLog(SimpleDiscoveryAgent.class);
private static final ThreadPoolExecutor ASYNC_TASKS;
private long initialReconnectDelay = 1000; private long initialReconnectDelay = 1000;
private long maxReconnectDelay = 1000 * 30; private long maxReconnectDelay = 1000 * 30;
private long backOffMultiplier = 2; private long backOffMultiplier = 2;
@ -110,14 +110,14 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent {
if (event.failed.compareAndSet(false, true)) { if (event.failed.compareAndSet(false, true)) {
listener.onServiceRemove(event); listener.onServiceRemove(event);
ASYNC_TASKS.execute(new Runnable() { DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
public void run() { public void run() {
// We detect a failed connection attempt because the service // We detect a failed connection attempt because the service
// fails right // fails right
// away. // away.
if (event.connectTime + minConnectTime > System.currentTimeMillis()) { if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
LOG.debug("Failure occured soon after the discovery event was generated. It will be clasified as a connection failure: "+event); LOG.debug("Failure occurred soon after the discovery event was generated. It will be classified as a connection failure: "+event);
event.connectFailures++; event.connectFailures++;
@ -132,7 +132,7 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent {
return; return;
} }
LOG.debug("Waiting "+event.reconnectDelay+" ms before attepting to reconnect."); LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting to reconnect.");
sleepMutex.wait(event.reconnectDelay); sleepMutex.wait(event.reconnectDelay);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -163,7 +163,7 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent {
event.failed.set(false); event.failed.set(false);
listener.onServiceAdd(event); listener.onServiceAdd(event);
} }
}); }, "Simple Discovery Agent");
} }
} }
@ -214,16 +214,4 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent {
public void setUseExponentialBackOff(boolean useExponentialBackOff) { public void setUseExponentialBackOff(boolean useExponentialBackOff) {
this.useExponentialBackOff = useExponentialBackOff; this.useExponentialBackOff = useExponentialBackOff;
} }
static {
ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "Simple Discovery Agent: "+runnable);
thread.setDaemon(true);
return thread;
}
});
}
} }

View File

@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task; import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
@ -44,8 +45,6 @@ public class VMTransport implements Transport, Task {
private static final Object DISCONNECT = new Object(); private static final Object DISCONNECT = new Object();
private static final AtomicLong NEXT_ID = new AtomicLong(0); private static final AtomicLong NEXT_ID = new AtomicLong(0);
// still possible to configure dedicated task runner through system property but not programmatically
private static final TaskRunnerFactory TASK_RUNNER_FACTORY = new TaskRunnerFactory("VMTransport", Thread.NORM_PRIORITY, true, 1000, false);
protected VMTransport peer; protected VMTransport peer;
protected TransportListener transportListener; protected TransportListener transportListener;
protected boolean disposed; protected boolean disposed;
@ -331,7 +330,7 @@ public class VMTransport implements Transport, Task {
if (async) { if (async) {
synchronized (lazyInitMutext) { synchronized (lazyInitMutext) {
if (taskRunner == null) { if (taskRunner == null) {
taskRunner = TASK_RUNNER_FACTORY.createTaskRunner(this, "VMTransport: " + toString()); taskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString());
} }
} }
try { try {

View File

@ -27,8 +27,8 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.bugs.embedded.ThreadExplorer; import org.apache.activemq.bugs.embedded.ThreadExplorer;
import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.thread.DefaultThreadPools;
import static org.apache.activemq.thread.DefaultThreadPools.shutdown;
public class VmTransportNetworkBrokerTest extends TestCase { public class VmTransportNetworkBrokerTest extends TestCase {
@ -38,8 +38,10 @@ public class VmTransportNetworkBrokerTest extends TestCase {
CountDownLatch started = new CountDownLatch(1); CountDownLatch started = new CountDownLatch(1);
CountDownLatch gotConnection = new CountDownLatch(1); CountDownLatch gotConnection = new CountDownLatch(1);
public void testNoThreadLeakWithActiveVMConnection() throws Exception { public void testNoThreadLeak() throws Exception {
// with VMConnection and simple discovery network connector
int originalThreadCount = Thread.activeCount();
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
broker.setDedicatedTaskRunner(true); broker.setDedicatedTaskRunner(true);
broker.setPersistent(false); broker.setPersistent(false);
@ -55,43 +57,42 @@ public class VmTransportNetworkBrokerTest extends TestCase {
// let it settle // let it settle
TimeUnit.SECONDS.sleep(5); TimeUnit.SECONDS.sleep(5);
int threadCount = Thread.activeCount(); int threadCountAfterStart = Thread.activeCount();
TimeUnit.SECONDS.sleep(30); TimeUnit.SECONDS.sleep(30);
int threadCountAfterSleep = Thread.activeCount(); int threadCountAfterSleep = Thread.activeCount();
assertTrue("Threads are leaking: " + ThreadExplorer.show("active sleep") + ", threadCount=" + threadCount + " threadCountAfterSleep=" + threadCountAfterSleep, assertTrue("Threads are leaking: " + ThreadExplorer.show("active sleep") + ", threadCount=" +threadCountAfterStart + " threadCountAfterSleep=" + threadCountAfterSleep,
threadCountAfterSleep < threadCount + 8); threadCountAfterSleep < threadCountAfterStart + 8);
connection.close(); connection.close();
broker.stop(); broker.stop();
broker.waitUntilStopped(); broker.waitUntilStopped();
}
public void testNoDanglingThreadsAfterStop() throws Exception { // testNoDanglingThreadsAfterStop with tcp transport
int threadCount = Thread.activeCount(); broker = new BrokerService();
BrokerService broker = new BrokerService();
broker.setSchedulerSupport(true); broker.setSchedulerSupport(true);
broker.setDedicatedTaskRunner(true); broker.setDedicatedTaskRunner(true);
broker.setPersistent(false); broker.setPersistent(false);
broker.addConnector("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000"); broker.addConnector("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
broker.start(); broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000"); cf = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
Connection connection = cf.createConnection("system", "manager"); connection = cf.createConnection("system", "manager");
connection.start(); connection.start();
connection.close(); connection.close();
broker.stop(); broker.stop();
broker.waitUntilStopped(); broker.waitUntilStopped();
shutdown();
// must only be called when all brokers and connections are done!
DefaultThreadPools.shutdown();
// let it settle // let it settle
TimeUnit.SECONDS.sleep(5); TimeUnit.SECONDS.sleep(5);
int threadCountAfterStop = Thread.activeCount(); int threadCountAfterStop = Thread.activeCount();
assertTrue("Threads are leaking: " + ThreadExplorer.show("active after stop") + ". threadCount=" + threadCount + " threadCountAfterStop=" + threadCountAfterStop, assertTrue("Threads are leaking: " + ThreadExplorer.show("active after stop") + ". originalThreadCount=" + originalThreadCount + " threadCountAfterStop=" + threadCountAfterStop,
threadCountAfterStop == threadCount); threadCountAfterStop == originalThreadCount);
} }
} }