AMQ-3451: Ensure thread pools is shutdown properly to avoid any leaks. Do not use the old @deprecated thread pool.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1381985 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Claus Ibsen 2012-09-07 11:58:21 +00:00
parent 8530111cad
commit 8a01c5d16a
10 changed files with 160 additions and 113 deletions

View File

@ -52,7 +52,6 @@ import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.ProducerState; import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState; import org.apache.activemq.state.SessionState;
import org.apache.activemq.state.TransactionState; import org.apache.activemq.state.TransactionState;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task; import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
@ -117,6 +116,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
private DemandForwardingBridge duplexBridge; private DemandForwardingBridge duplexBridge;
private final TaskRunnerFactory taskRunnerFactory; private final TaskRunnerFactory taskRunnerFactory;
private final TaskRunnerFactory stopTaskRunnerFactory;
private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
private String duplexNetworkConnectorId; private String duplexNetworkConnectorId;
@ -125,18 +125,20 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
/** /**
* @param taskRunnerFactory - can be null if you want direct dispatch to the transport * @param taskRunnerFactory - can be null if you want direct dispatch to the transport
* else commands are sent async. * else commands are sent async.
* @param stopTaskRunnerFactory - can <b>not</b> be null, used for stopping this connection.
*/ */
public TransportConnection(TransportConnector connector, final Transport transport, Broker broker, public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
TaskRunnerFactory taskRunnerFactory) { TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) {
this.connector = connector; this.connector = connector;
this.broker = broker; this.broker = broker;
this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class); RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
brokerConnectionStates = rb.getConnectionStates(); brokerConnectionStates = rb.getConnectionStates();
if (connector != null) { if (connector != null) {
this.statistics.setParent(connector.getStatistics()); this.statistics.setParent(connector.getStatistics());
this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
} }
this.taskRunnerFactory = taskRunnerFactory; this.taskRunnerFactory = taskRunnerFactory;
this.stopTaskRunnerFactory = stopTaskRunnerFactory;
this.transport = transport; this.transport = transport;
this.transport.setTransportListener(new DefaultTransportListener() { this.transport.setTransportListener(new DefaultTransportListener() {
@Override @Override
@ -939,6 +941,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
public void stop() throws Exception { public void stop() throws Exception {
// do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory)
// as their lifecycle is handled elsewhere
stopAsync(); stopAsync();
while (!stopped.await(5, TimeUnit.SECONDS)) { while (!stopped.await(5, TimeUnit.SECONDS)) {
LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown."); LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown.");
@ -952,7 +957,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
stopError = cause; stopError = cause;
} }
try { try {
DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { stopTaskRunnerFactory.execute(new Runnable() {
public void run() { public void run() {
try { try {
Thread.sleep(waitTime); Thread.sleep(waitTime);
@ -961,9 +966,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} catch (InterruptedException e) { } catch (InterruptedException e) {
} }
} }
}, "delayedStop:" + transport.getRemoteAddress()); });
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn("cannot create stopAsync :", t); LOG.warn("Cannot create stopAsync. This exception will be ignored.", t);
} }
} }
} }
@ -988,7 +993,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
} }
try { try {
DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { stopTaskRunnerFactory.execute(new Runnable() {
public void run() { public void run() {
serviceLock.writeLock().lock(); serviceLock.writeLock().lock();
try { try {
@ -1000,9 +1005,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
serviceLock.writeLock().unlock(); serviceLock.writeLock().unlock();
} }
} }
}, "StopAsync:" + transport.getRemoteAddress()); });
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t); LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", t);
stopped.countDown(); stopped.countDown();
} }
} }
@ -1013,8 +1018,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return "Transport Connection to: " + transport.getRemoteAddress(); return "Transport Connection to: " + transport.getRemoteAddress();
} }
protected void doStop() throws Exception, InterruptedException { protected void doStop() throws Exception {
LOG.debug("Stopping connection: " + transport.getRemoteAddress()); LOG.debug("Stopping connection: {}", transport.getRemoteAddress());
connector.onStopped(this); connector.onStopped(this);
try { try {
synchronized (this) { synchronized (this) {
@ -1026,16 +1031,17 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
} }
} catch (Exception ignore) { } catch (Exception ignore) {
LOG.trace("Exception caught stopping", ignore); LOG.trace("Exception caught stopping. This exception is ignored.", ignore);
} }
try { try {
transport.stop(); transport.stop();
LOG.debug("Stopped transport: " + transport.getRemoteAddress()); LOG.debug("Stopped transport: " + transport.getRemoteAddress());
} catch (Exception e) { } catch (Exception e) {
LOG.debug("Could not stop transport: " + e, e); LOG.debug("Could not stop transport to " + transport.getRemoteAddress() + ". This exception is ignored.", e);
} }
if (taskRunner != null) { if (taskRunner != null) {
taskRunner.shutdown(1); taskRunner.shutdown(1);
taskRunner = null;
} }
active = false; active = false;
// Run the MessageDispatch callbacks so that message references get // Run the MessageDispatch callbacks so that message references get
@ -1063,14 +1069,14 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
for (TransportConnectionState cs : connectionStates) { for (TransportConnectionState cs : connectionStates) {
cs.getContext().getStopping().set(true); cs.getContext().getStopping().set(true);
try { try {
LOG.debug("Cleaning up connection resources: " + getRemoteAddress()); LOG.debug("Cleaning up connection resources: {}", getRemoteAddress());
processRemoveConnection(cs.getInfo().getConnectionId(), 0l); processRemoveConnection(cs.getInfo().getConnectionId(), 0l);
} catch (Throwable ignore) { } catch (Throwable ignore) {
ignore.printStackTrace(); ignore.printStackTrace();
} }
} }
} }
LOG.debug("Connection Stopped: " + getRemoteAddress()); LOG.debug("Connection Stopped: {}", getRemoteAddress());
} }
/** /**

View File

@ -32,7 +32,6 @@ import org.apache.activemq.broker.region.ConnectorStatistics;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.security.MessageAuthorizationPolicy; import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener; import org.apache.activemq.transport.TransportAcceptListener;
@ -220,7 +219,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
getServer().setAcceptListener(new TransportAcceptListener() { getServer().setAcceptListener(new TransportAcceptListener() {
public void onAccept(final Transport transport) { public void onAccept(final Transport transport) {
try { try {
DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() { public void run() {
try { try {
Connection connection = createConnection(transport); Connection connection = createConnection(transport);
@ -310,8 +309,10 @@ public class TransportConnector implements Connector, BrokerServiceAware {
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
protected Connection createConnection(Transport transport) throws IOException { protected Connection createConnection(Transport transport) throws IOException {
// prefer to use task runner from broker service as stop task runner, as we can then
// tie it to the lifecycle of the broker service
TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null
: taskRunnerFactory); : taskRunnerFactory, brokerService.getTaskRunnerFactory());
boolean statEnabled = this.getStatistics().isEnabled(); boolean statEnabled = this.getStatistics().isEnabled();
answer.getStatistics().setEnabled(statEnabled); answer.getStatistics().setEnabled(statEnabled);
answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy); answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);

View File

@ -49,9 +49,10 @@ public class ManagedTransportConnection extends TransportConnection {
private final boolean populateUserName; private final boolean populateUserName;
public ManagedTransportConnection(TransportConnector connector, Transport transport, Broker broker, public ManagedTransportConnection(TransportConnector connector, Transport transport, Broker broker,
TaskRunnerFactory factory, ManagementContext context, ObjectName connectorName) TaskRunnerFactory factory, TaskRunnerFactory stopFactory,
ManagementContext context, ObjectName connectorName)
throws IOException { throws IOException {
super(connector, transport, broker, factory); super(connector, transport, broker, factory, stopFactory);
this.managementContext = context; this.managementContext = context;
this.connectorName = connectorName; this.connectorName = connectorName;
this.mbean = new ConnectionView(this, managementContext); this.mbean = new ConnectionView(this, managementContext);

View File

@ -49,7 +49,10 @@ public class ManagedTransportConnector extends TransportConnector {
} }
protected Connection createConnection(Transport transport) throws IOException { protected Connection createConnection(Transport transport) throws IOException {
return new ManagedTransportConnection(this, transport, getBroker(), isDisableAsyncDispatch() ? null : getTaskRunnerFactory(), managementContext, connectorName); // prefer to use task runner from broker service as stop task runner, as we can then
// tie it to the lifecycle of the broker service
return new ManagedTransportConnection(this, transport, getBroker(), isDisableAsyncDispatch() ? null : getTaskRunnerFactory(),
getBrokerService().getTaskRunnerFactory(), managementContext, connectorName);
} }
protected static synchronized long getNextConnectionId() { protected static synchronized long getNextConnectionId() {

View File

@ -46,9 +46,9 @@ import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.state.ConnectionStateTracker; import org.apache.activemq.state.ConnectionStateTracker;
import org.apache.activemq.state.Tracked; import org.apache.activemq.state.Tracked;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task; import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.CompositeTransport; import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.FutureResponse;
@ -86,6 +86,7 @@ public class FailoverTransport implements CompositeTransport {
private URI connectedTransportURI; private URI connectedTransportURI;
private URI failedConnectTransportURI; private URI failedConnectTransportURI;
private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>(); private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>();
private final TaskRunnerFactory reconnectTaskFactory;
private final TaskRunner reconnectTask; private final TaskRunner reconnectTask;
private boolean started; private boolean started;
private boolean initialized; private boolean initialized;
@ -128,7 +129,9 @@ public class FailoverTransport implements CompositeTransport {
brokerSslContext = SslContext.getCurrentSslContext(); brokerSslContext = SslContext.getCurrentSslContext();
stateTracker.setTrackTransactions(true); stateTracker.setTrackTransactions(true);
// Setup a task that is used to reconnect the a connection async. // Setup a task that is used to reconnect the a connection async.
reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() { reconnectTaskFactory = new TaskRunnerFactory();
reconnectTaskFactory.init();
reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
public boolean iterate() { public boolean iterate() {
boolean result = false; boolean result = false;
if (!started) { if (!started) {
@ -345,26 +348,31 @@ public class FailoverTransport implements CompositeTransport {
Transport transportToStop = null; Transport transportToStop = null;
List<Transport> backupsToStop = new ArrayList<Transport>(backups.size()); List<Transport> backupsToStop = new ArrayList<Transport>(backups.size());
synchronized (reconnectMutex) { try {
if (LOG.isDebugEnabled()) { synchronized (reconnectMutex) {
LOG.debug("Stopped " + this); if (LOG.isDebugEnabled()) {
} LOG.debug("Stopped " + this);
if (!started) { }
return; if (!started) {
} return;
started = false; }
disposed = true; started = false;
connected = false; disposed = true;
connected = false;
if (connectedTransport.get() != null) { if (connectedTransport.get() != null) {
transportToStop = connectedTransport.getAndSet(null); transportToStop = connectedTransport.getAndSet(null);
}
reconnectMutex.notifyAll();
} }
reconnectMutex.notifyAll(); synchronized (sleepMutex) {
sleepMutex.notifyAll();
}
} finally {
reconnectTask.shutdown();
reconnectTaskFactory.shutdownNow();
} }
synchronized (sleepMutex) {
sleepMutex.notifyAll();
}
reconnectTask.shutdown();
synchronized(backupMutex) { synchronized(backupMutex) {
for (BackupTransport backup : backups) { for (BackupTransport backup : backups) {
backup.setDisposed(true); backup.setDisposed(true);

View File

@ -30,9 +30,9 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.state.ConnectionStateTracker; import org.apache.activemq.state.ConnectionStateTracker;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task; import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.CompositeTransport; import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.FutureResponse;
@ -63,6 +63,7 @@ public class FanoutTransport implements CompositeTransport {
private final ConnectionStateTracker stateTracker = new ConnectionStateTracker(); private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
private final ConcurrentHashMap<Integer, RequestCounter> requestMap = new ConcurrentHashMap<Integer, RequestCounter>(); private final ConcurrentHashMap<Integer, RequestCounter> requestMap = new ConcurrentHashMap<Integer, RequestCounter>();
private final TaskRunnerFactory reconnectTaskFactory;
private final TaskRunner reconnectTask; private final TaskRunner reconnectTask;
private boolean started; private boolean started;
@ -157,7 +158,9 @@ public class FanoutTransport implements CompositeTransport {
public FanoutTransport() throws InterruptedIOException { public FanoutTransport() throws InterruptedIOException {
// Setup a task that is used to reconnect the a connection async. // Setup a task that is used to reconnect the a connection async.
reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() { reconnectTaskFactory = new TaskRunnerFactory();
reconnectTaskFactory.init();
reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
public boolean iterate() { public boolean iterate() {
return doConnect(); return doConnect();
} }
@ -291,27 +294,31 @@ public class FanoutTransport implements CompositeTransport {
} }
public void stop() throws Exception { public void stop() throws Exception {
synchronized (reconnectMutex) { try {
ServiceStopper ss = new ServiceStopper(); synchronized (reconnectMutex) {
ServiceStopper ss = new ServiceStopper();
if (!started) { if (!started) {
return; return;
}
started = false;
disposed = true;
connected=false;
for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
FanoutTransportHandler th = iter.next();
if (th.transport != null) {
ss.stop(th.transport);
} }
} started = false;
disposed = true;
connected=false;
LOG.debug("Stopped: " + this); for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
ss.throwFirstException(); FanoutTransportHandler th = iter.next();
if (th.transport != null) {
ss.stop(th.transport);
}
}
LOG.debug("Stopped: " + this);
ss.throwFirstException();
}
} finally {
reconnectTask.shutdown();
reconnectTaskFactory.shutdownNow();
} }
reconnectTask.shutdown();
} }
public int getMinAckCount() { public int getMinAckCount() {

View File

@ -37,7 +37,7 @@ import javax.net.ssl.SSLSession;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
@ -52,9 +52,10 @@ public class NIOSSLTransport extends NIOTransport {
protected SSLEngine sslEngine; protected SSLEngine sslEngine;
protected SSLSession sslSession; protected SSLSession sslSession;
protected boolean handshakeInProgress = false; protected volatile boolean handshakeInProgress = false;
protected SSLEngineResult.Status status = null; protected SSLEngineResult.Status status = null;
protected SSLEngineResult.HandshakeStatus handshakeStatus = null; protected SSLEngineResult.HandshakeStatus handshakeStatus = null;
protected TaskRunnerFactory taskRunnerFactory;
public NIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { public NIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation); super(wireFormat, socketFactory, remoteLocation, localLocation);
@ -259,7 +260,7 @@ public class NIOSSLTransport extends NIOTransport {
case NEED_TASK: case NEED_TASK:
Runnable task; Runnable task;
while ((task = sslEngine.getDelegatedTask()) != null) { while ((task = sslEngine.getDelegatedTask()) != null) {
DefaultThreadPools.getDefaultTaskRunnerFactory().execute(task); taskRunnerFactory.execute(task);
} }
break; break;
case NEED_WRAP: case NEED_WRAP:
@ -273,8 +274,19 @@ public class NIOSSLTransport extends NIOTransport {
} }
} }
@Override
protected void doStart() throws Exception {
taskRunnerFactory = new TaskRunnerFactory("ActiveMQ NIOSSLTransport Task");
// no need to init as we can delay that until demand (eg in doHandshake)
super.doStart();
}
@Override @Override
protected void doStop(ServiceStopper stopper) throws Exception { protected void doStop(ServiceStopper stopper) throws Exception {
if (taskRunnerFactory != null) {
taskRunnerFactory.shutdownNow();
taskRunnerFactory = null;
}
if (channel != null) { if (channel != null) {
channel.close(); channel.close();
channel = null; channel = null;

View File

@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory; import javax.net.SocketFactory;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportLoggerFactory; import org.apache.activemq.transport.TransportLoggerFactory;
import org.apache.activemq.transport.TransportThreadSupport; import org.apache.activemq.transport.TransportThreadSupport;
@ -536,13 +536,17 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
//closing the socket can hang also //closing the socket can hang also
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { // need a async task for this
final TaskRunnerFactory taskRunnerFactory = new TaskRunnerFactory();
taskRunnerFactory.execute(new Runnable() {
public void run() { public void run() {
LOG.trace("Closing socket {}", socket);
try { try {
socket.close(); socket.close();
LOG.debug("Closed socket {}", socket);
} catch (IOException e) { } catch (IOException e) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Caught exception closing socket", e); LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
} }
} finally { } finally {
latch.countDown(); latch.countDown();
@ -554,14 +558,20 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
latch.await(1,TimeUnit.SECONDS); latch.await(1,TimeUnit.SECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} finally {
taskRunnerFactory.shutdownNow();
} }
} else { } else {
// close synchronously
LOG.trace("Closing socket {}", socket);
try { try {
socket.close(); socket.close();
LOG.debug("Closed socket {}", socket);
} catch (IOException e) { } catch (IOException e) {
LOG.debug("Caught exception closing socket",e); if (LOG.isDebugEnabled()) {
LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
}
} }
} }
} }

View File

@ -26,9 +26,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task; import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
@ -55,6 +55,7 @@ public class VMTransport implements Transport, Task {
// Implementation // Implementation
private LinkedBlockingQueue<Object> messageQueue; private LinkedBlockingQueue<Object> messageQueue;
private TaskRunnerFactory taskRunnerFactory;
private TaskRunner taskRunner; private TaskRunner taskRunner;
// Transport State // Transport State
@ -188,6 +189,7 @@ public class VMTransport implements Transport, Task {
tr.shutdown(TimeUnit.SECONDS.toMillis(1)); tr.shutdown(TimeUnit.SECONDS.toMillis(1));
} catch(Exception e) { } catch(Exception e) {
} }
taskRunner = null;
} }
// let the peer know that we are disconnecting after attempting // let the peer know that we are disconnecting after attempting
@ -197,6 +199,12 @@ public class VMTransport implements Transport, Task {
peer.transportListener.onCommand(new ShutdownInfo()); peer.transportListener.onCommand(new ShutdownInfo());
} catch (Exception ignore) { } catch (Exception ignore) {
} }
// shutdown task runner factory
if (taskRunnerFactory != null) {
taskRunnerFactory.shutdownNow();
taskRunnerFactory = null;
}
} }
} }
@ -280,7 +288,11 @@ public class VMTransport implements Transport, Task {
throw new TransportDisposedIOException("The Transport has been disposed"); throw new TransportDisposedIOException("The Transport has been disposed");
} }
taskRunner = result = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString()); if (taskRunnerFactory == null) {
taskRunnerFactory = new TaskRunnerFactory("ActiveMQ VMTransport: " + toString());
taskRunnerFactory.init();
}
taskRunner = result = taskRunnerFactory.createTaskRunner(this, "VMTransport: " + toString());
} }
} }
} }

View File

@ -30,7 +30,7 @@ public final class ThreadPoolUtils {
private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolUtils.class); private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolUtils.class);
public static final long DEFAULT_SHUTDOWN_AWAIT_TERMINATION = 30 * 1000L; public static final long DEFAULT_SHUTDOWN_AWAIT_TERMINATION = 10 * 1000L;
/** /**
* Shutdown the given executor service only (ie not graceful shutdown). * Shutdown the given executor service only (ie not graceful shutdown).
@ -38,7 +38,7 @@ public final class ThreadPoolUtils {
* @see java.util.concurrent.ExecutorService#shutdown() * @see java.util.concurrent.ExecutorService#shutdown()
*/ */
public static void shutdown(ExecutorService executorService) { public static void shutdown(ExecutorService executorService) {
doShutdown(executorService, -1, true); doShutdown(executorService, 0);
} }
/** /**
@ -70,7 +70,7 @@ public final class ThreadPoolUtils {
* with a timeout value of {@link #DEFAULT_SHUTDOWN_AWAIT_TERMINATION} millis. * with a timeout value of {@link #DEFAULT_SHUTDOWN_AWAIT_TERMINATION} millis.
*/ */
public static void shutdownGraceful(ExecutorService executorService) { public static void shutdownGraceful(ExecutorService executorService) {
doShutdown(executorService, DEFAULT_SHUTDOWN_AWAIT_TERMINATION, false); doShutdown(executorService, DEFAULT_SHUTDOWN_AWAIT_TERMINATION);
} }
/** /**
@ -83,62 +83,49 @@ public final class ThreadPoolUtils {
* forces a shutdown. The parameter <tt>shutdownAwaitTermination</tt> * forces a shutdown. The parameter <tt>shutdownAwaitTermination</tt>
* is used as timeout value waiting for orderly shutdown to * is used as timeout value waiting for orderly shutdown to
* complete normally, before going aggressively. * complete normally, before going aggressively.
* <p/>
* Notice if the given parameter <tt>shutdownAwaitTermination</tt> is negative, then a quick shutdown
* is commenced, by invoking the {@link java.util.concurrent.ExecutorService#shutdown()} method
* and then exit from this method (ie. no graceful shutdown is performed).
* *
* @param executorService the executor service to shutdown * @param executorService the executor service to shutdown
* @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown, if the value if negative * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown
* then the thread pool is <b>not</b> graceful shutdown, but a regular shutdown
* is commenced.
*/ */
public static void shutdownGraceful(ExecutorService executorService, long shutdownAwaitTermination) { public static void shutdownGraceful(ExecutorService executorService, long shutdownAwaitTermination) {
doShutdown(executorService, shutdownAwaitTermination, false); doShutdown(executorService, shutdownAwaitTermination);
} }
private static void doShutdown(ExecutorService executorService, long shutdownAwaitTermination, boolean quick) { private static void doShutdown(ExecutorService executorService, long shutdownAwaitTermination) {
// code from Apache Camel - org.apache.camel.impl.DefaultExecutorServiceManager // code from Apache Camel - org.apache.camel.impl.DefaultExecutorServiceManager
if (executorService == null) { if (executorService == null) {
return; return;
} }
if (quick) {
// do not shutdown graceful, but just quick shutdown on the thread pool
executorService.shutdown();
LOG.debug("Quick shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.",
new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()});
return;
}
if (shutdownAwaitTermination <= 0) {
throw new IllegalArgumentException("ShutdownAwaitTermination must be a positive number, was: " + shutdownAwaitTermination);
}
// shutting down a thread pool is a 2 step process. First we try graceful, and if that fails, then we go more aggressively // shutting down a thread pool is a 2 step process. First we try graceful, and if that fails, then we go more aggressively
// and try shutting down again. In both cases we wait at most the given shutdown timeout value given // and try shutting down again. In both cases we wait at most the given shutdown timeout value given
// (total wait could then be 2 x shutdownAwaitTermination) // (total wait could then be 2 x shutdownAwaitTermination, but when we shutdown the 2nd time we are aggressive and thus
boolean warned = false; // we ought to shutdown much faster)
StopWatch watch = new StopWatch();
if (!executorService.isShutdown()) { if (!executorService.isShutdown()) {
boolean warned = false;
StopWatch watch = new StopWatch();
LOG.trace("Shutdown of ExecutorService: {} with await termination: {} millis", executorService, shutdownAwaitTermination); LOG.trace("Shutdown of ExecutorService: {} with await termination: {} millis", executorService, shutdownAwaitTermination);
executorService.shutdown(); executorService.shutdown();
try {
if (!awaitTermination(executorService, shutdownAwaitTermination)) { if (shutdownAwaitTermination > 0) {
warned = true; try {
LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService);
executorService.shutdownNow();
// we are now shutting down aggressively, so wait to see if we can completely shutdown or not
if (!awaitTermination(executorService, shutdownAwaitTermination)) { if (!awaitTermination(executorService, shutdownAwaitTermination)) {
LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService); warned = true;
LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService);
executorService.shutdownNow();
// we are now shutting down aggressively, so wait to see if we can completely shutdown or not
if (!awaitTermination(executorService, shutdownAwaitTermination)) {
LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService);
}
} }
} catch (InterruptedException e) {
warned = true;
LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService);
// we were interrupted during shutdown, so force shutdown
executorService.shutdownNow();
} }
} catch (InterruptedException e) {
warned = true;
LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService);
// we were interrupted during shutdown, so force shutdown
executorService.shutdownNow();
} }
// if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log // if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log
@ -155,8 +142,8 @@ public final class ThreadPoolUtils {
/** /**
* Awaits the termination of the thread pool. * Awaits the termination of the thread pool.
* <p/> * <p/>
* This implementation will log every 5th second at INFO level that we are waiting, so the end user * This implementation will log every 2nd second at INFO level that we are waiting, so the end user
* can see we are not hanging in case it takes longer time to shutdown the pool. * can see we are not hanging in case it takes longer time to terminate the pool.
* *
* @param executorService the thread pool * @param executorService the thread pool
* @param shutdownAwaitTermination time in millis to use as timeout * @param shutdownAwaitTermination time in millis to use as timeout
@ -166,15 +153,15 @@ public final class ThreadPoolUtils {
public static boolean awaitTermination(ExecutorService executorService, long shutdownAwaitTermination) throws InterruptedException { public static boolean awaitTermination(ExecutorService executorService, long shutdownAwaitTermination) throws InterruptedException {
// log progress every 5th second so end user is aware of we are shutting down // log progress every 5th second so end user is aware of we are shutting down
StopWatch watch = new StopWatch(); StopWatch watch = new StopWatch();
long interval = Math.min(5000, shutdownAwaitTermination); long interval = Math.min(2000, shutdownAwaitTermination);
boolean done = false; boolean done = false;
while (!done && interval > 0) { while (!done && interval > 0) {
if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) { if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) {
done = true; done = true;
} else { } else {
LOG.info("Waited {} for ExecutorService: {} to shutdown...", TimeUtils.printDuration(watch.taken()), executorService); LOG.info("Waited {} for ExecutorService: {} to terminate...", TimeUtils.printDuration(watch.taken()), executorService);
// recalculate interval // recalculate interval
interval = Math.min(5000, shutdownAwaitTermination - watch.taken()); interval = Math.min(2000, shutdownAwaitTermination - watch.taken());
} }
} }