From 5e03394423e1399788704badd0e2eb72c08478c7 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Mon, 17 Mar 2008 13:30:38 +0000 Subject: [PATCH] send shutdown to transports asynchronously - as they may be blocked git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@637879 13f79535-47bb-0310-9956-ffa450edef68 --- .../DemandForwardingBridgeSupport.java | 40 ++++++++++++++++--- 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 6ebb177742..e21387465b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -21,6 +21,10 @@ import java.security.GeneralSecurityException; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -75,7 +79,7 @@ import org.apache.commons.logging.LogFactory; public abstract class DemandForwardingBridgeSupport implements NetworkBridge { private static final Log LOG = LogFactory.getLog(DemandForwardingBridge.class); - + private static final ThreadPoolExecutor STOP_TASKS; protected final Transport localBroker; protected final Transport remoteBroker; protected final IdGenerator idGenerator = new IdGenerator(); @@ -113,6 +117,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { private boolean createdByDuplex; private BrokerInfo localBrokerInfo; private BrokerInfo remoteBrokerInfo; + private AtomicBoolean started = new AtomicBoolean(); @@ -331,10 +336,23 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { try { disposed = true; remoteBridgeStarted.set(false); - localBroker.oneway(new ShutdownInfo()); - remoteBroker.oneway(new ShutdownInfo()); - } catch (IOException e) { - LOG.debug("Caught exception stopping", e); + final CountDownLatch sendShutdown = new CountDownLatch(1); + STOP_TASKS.execute(new Runnable() { + public void run() { + try { + localBroker.oneway(new ShutdownInfo()); + remoteBroker.oneway(new ShutdownInfo()); + } catch (Throwable e) { + LOG.debug("Caught exception sending shutdown", e); + }finally { + sendShutdown.countDown(); + } + + } + }); + if( !sendShutdown.await(100, TimeUnit.MILLISECONDS) ) { + LOG.debug("Network Could not shutdown in a timely manner"); + } } finally { ServiceStopper ss = new ServiceStopper(); ss.stop(localBroker); @@ -636,7 +654,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { } } } catch (Throwable e) { - e.printStackTrace(); + LOG.warn("Caught an exception processing local command",e); serviceLocalException(e); } } @@ -950,5 +968,15 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { protected boolean isDuplex() { return configuration.isDuplex() || createdByDuplex; } + + static { + STOP_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "NetworkBridge: "+runnable); + thread.setDaemon(true); + return thread; + } + }); + } }