From 80a7ec59b4896fb23366485e929769fc9ab82982 Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Fri, 9 Apr 2010 13:31:36 +0000 Subject: [PATCH] https://issues.apache.org/activemq/browse/AMQ-2448 - thread leak when network trying to connect to unavailable broker git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@932403 13f79535-47bb-0310-9956-ffa450edef68 --- activemq-core/pom.xml | 3 --- .../activemq/transport/vm/VMTransport.java | 25 +++++++++++++------ .../transport/vm/VMTransportFactory.java | 11 +------- assembly/src/release/conf/camel.xml | 1 - .../camel/WEB-INF/applicationContext.xml | 1 - 5 files changed, 18 insertions(+), 23 deletions(-) diff --git a/activemq-core/pom.xml b/activemq-core/pom.xml index bb0b42091b..3f7076409b 100755 --- a/activemq-core/pom.xml +++ b/activemq-core/pom.xml @@ -530,9 +530,6 @@ **/BrokerNetworkWithStuckMessagesTest.* - - **/VmTransportNetworkBrokerTest.* - diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index 4bdc7426eb..b795c32ad6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -109,11 +109,15 @@ public class VMTransport implements Transport, Task { peer.enqueueValve.decrement(); } + dispatch(peer, transportListener, command); + } + + public void dispatch(VMTransport transport, TransportListener transportListener, Object command) { if( transportListener!=null ) { if( command == DISCONNECT ) { transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.")); } else { - peer.receiveCounter++; + transport.receiveCounter++; transportListener.onCommand(command); } } @@ -129,7 +133,7 @@ public class VMTransport implements Transport, Task { Object command; while ((command = messageQueue.poll()) != null && !stopping.get() ) { receiveCounter++; - transportListener.onCommand(command); + dispatch(this, transportListener, command); } } started = true; @@ -149,7 +153,14 @@ public class VMTransport implements Transport, Task { // If stop() is called while being start()ed.. then we can't stop until we return to the start() method. if( enqueueValve.isOn() ) { - + + // let the peer know that we are disconnecting.. + try { + peer.transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.")); + } catch (Exception ignore) { + } + + TaskRunner tr = null; try { enqueueValve.turnOff(); @@ -168,12 +179,10 @@ public class VMTransport implements Transport, Task { if (tr != null) { tr.shutdown(1000); } - // let the peer know that we are disconnecting.. - try { - oneway(DISCONNECT); - } catch (Exception ignore) { - } + + } + } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java index f1c03c72ce..3bdf95d4c8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java @@ -206,16 +206,7 @@ public class VMTransportFactory extends TransportFactory { public static void stopped(VMTransportServer server) { String host = server.getBindURI().getHost(); - SERVERS.remove(host); - TransportConnector connector = CONNECTORS.remove(host); - if (connector != null) { - LOG.debug("Shutting down VM connectors for broker: " + host); - ServiceSupport.dispose(connector); - BrokerService broker = BROKERS.remove(host); - if (broker != null) { - ServiceSupport.dispose(broker); - } - } + stopped(host); } public static void stopped(String host) { diff --git a/assembly/src/release/conf/camel.xml b/assembly/src/release/conf/camel.xml index 7378474853..8e5b7422fe 100644 --- a/assembly/src/release/conf/camel.xml +++ b/assembly/src/release/conf/camel.xml @@ -60,6 +60,5 @@ - diff --git a/assembly/src/release/webapps/camel/WEB-INF/applicationContext.xml b/assembly/src/release/webapps/camel/WEB-INF/applicationContext.xml index daeaea72ae..d242f8926c 100644 --- a/assembly/src/release/webapps/camel/WEB-INF/applicationContext.xml +++ b/assembly/src/release/webapps/camel/WEB-INF/applicationContext.xml @@ -58,6 +58,5 @@ -