mirror of https://github.com/apache/activemq.git
send shutdown to transports asynchronously - as they may be blocked
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@637879 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
372f69aba9
commit
5e03394423
|
@ -21,6 +21,10 @@ import java.security.GeneralSecurityException;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.SynchronousQueue;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
@ -75,7 +79,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(DemandForwardingBridge.class);
|
private static final Log LOG = LogFactory.getLog(DemandForwardingBridge.class);
|
||||||
|
private static final ThreadPoolExecutor STOP_TASKS;
|
||||||
protected final Transport localBroker;
|
protected final Transport localBroker;
|
||||||
protected final Transport remoteBroker;
|
protected final Transport remoteBroker;
|
||||||
protected final IdGenerator idGenerator = new IdGenerator();
|
protected final IdGenerator idGenerator = new IdGenerator();
|
||||||
|
@ -114,6 +118,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
||||||
private BrokerInfo localBrokerInfo;
|
private BrokerInfo localBrokerInfo;
|
||||||
private BrokerInfo remoteBrokerInfo;
|
private BrokerInfo remoteBrokerInfo;
|
||||||
|
|
||||||
|
|
||||||
private AtomicBoolean started = new AtomicBoolean();
|
private AtomicBoolean started = new AtomicBoolean();
|
||||||
|
|
||||||
public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
|
public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
|
||||||
|
@ -331,10 +336,23 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
||||||
try {
|
try {
|
||||||
disposed = true;
|
disposed = true;
|
||||||
remoteBridgeStarted.set(false);
|
remoteBridgeStarted.set(false);
|
||||||
localBroker.oneway(new ShutdownInfo());
|
final CountDownLatch sendShutdown = new CountDownLatch(1);
|
||||||
remoteBroker.oneway(new ShutdownInfo());
|
STOP_TASKS.execute(new Runnable() {
|
||||||
} catch (IOException e) {
|
public void run() {
|
||||||
LOG.debug("Caught exception stopping", e);
|
try {
|
||||||
|
localBroker.oneway(new ShutdownInfo());
|
||||||
|
remoteBroker.oneway(new ShutdownInfo());
|
||||||
|
} catch (Throwable e) {
|
||||||
|
LOG.debug("Caught exception sending shutdown", e);
|
||||||
|
}finally {
|
||||||
|
sendShutdown.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if( !sendShutdown.await(100, TimeUnit.MILLISECONDS) ) {
|
||||||
|
LOG.debug("Network Could not shutdown in a timely manner");
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
ServiceStopper ss = new ServiceStopper();
|
ServiceStopper ss = new ServiceStopper();
|
||||||
ss.stop(localBroker);
|
ss.stop(localBroker);
|
||||||
|
@ -636,7 +654,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
e.printStackTrace();
|
LOG.warn("Caught an exception processing local command",e);
|
||||||
serviceLocalException(e);
|
serviceLocalException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -951,4 +969,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
||||||
return configuration.isDuplex() || createdByDuplex;
|
return configuration.isDuplex() || createdByDuplex;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static {
|
||||||
|
STOP_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
|
||||||
|
public Thread newThread(Runnable runnable) {
|
||||||
|
Thread thread = new Thread(runnable, "NetworkBridge: "+runnable);
|
||||||
|
thread.setDaemon(true);
|
||||||
|
return thread;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue