https://issues.apache.org/activemq/browse/AMQ-2448 - thread leak when network trying to connect to unavailable broker

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@932403 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-04-09 13:31:36 +00:00
parent 05f82a9b6d
commit 80a7ec59b4
5 changed files with 18 additions and 23 deletions

View File

@ -530,9 +530,6 @@
<!-- breaks hudson: disable till we get a chance to give it the time that it needs - http://hudson.zones.apache.org/hudson/job/ActiveMQ/org.apache.activemq$activemq-core/199/testReport/org.apache.activemq.network/BrokerNetworkWithStuckMessagesTest/testBrokerNetworkWithStuckMessages/ --> <!-- breaks hudson: disable till we get a chance to give it the time that it needs - http://hudson.zones.apache.org/hudson/job/ActiveMQ/org.apache.activemq$activemq-core/199/testReport/org.apache.activemq.network/BrokerNetworkWithStuckMessagesTest/testBrokerNetworkWithStuckMessages/ -->
<exclude>**/BrokerNetworkWithStuckMessagesTest.*</exclude> <exclude>**/BrokerNetworkWithStuckMessagesTest.*</exclude>
<!-- until https://issues.apache.org/activemq/browse/AMQ-2448 is fixed -->
<exclude>**/VmTransportNetworkBrokerTest.*</exclude>
</excludes> </excludes>
</configuration> </configuration>
</plugin> </plugin>

View File

@ -109,11 +109,15 @@ public class VMTransport implements Transport, Task {
peer.enqueueValve.decrement(); peer.enqueueValve.decrement();
} }
dispatch(peer, transportListener, command);
}
public void dispatch(VMTransport transport, TransportListener transportListener, Object command) {
if( transportListener!=null ) { if( transportListener!=null ) {
if( command == DISCONNECT ) { if( command == DISCONNECT ) {
transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.")); transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
} else { } else {
peer.receiveCounter++; transport.receiveCounter++;
transportListener.onCommand(command); transportListener.onCommand(command);
} }
} }
@ -129,7 +133,7 @@ public class VMTransport implements Transport, Task {
Object command; Object command;
while ((command = messageQueue.poll()) != null && !stopping.get() ) { while ((command = messageQueue.poll()) != null && !stopping.get() ) {
receiveCounter++; receiveCounter++;
transportListener.onCommand(command); dispatch(this, transportListener, command);
} }
} }
started = true; started = true;
@ -149,7 +153,14 @@ public class VMTransport implements Transport, Task {
// If stop() is called while being start()ed.. then we can't stop until we return to the start() method. // If stop() is called while being start()ed.. then we can't stop until we return to the start() method.
if( enqueueValve.isOn() ) { if( enqueueValve.isOn() ) {
// let the peer know that we are disconnecting..
try {
peer.transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
} catch (Exception ignore) {
}
TaskRunner tr = null; TaskRunner tr = null;
try { try {
enqueueValve.turnOff(); enqueueValve.turnOff();
@ -168,12 +179,10 @@ public class VMTransport implements Transport, Task {
if (tr != null) { if (tr != null) {
tr.shutdown(1000); tr.shutdown(1000);
} }
// let the peer know that we are disconnecting..
try {
oneway(DISCONNECT);
} catch (Exception ignore) {
}
} }
} }
/** /**

View File

@ -206,16 +206,7 @@ public class VMTransportFactory extends TransportFactory {
public static void stopped(VMTransportServer server) { public static void stopped(VMTransportServer server) {
String host = server.getBindURI().getHost(); String host = server.getBindURI().getHost();
SERVERS.remove(host); stopped(host);
TransportConnector connector = CONNECTORS.remove(host);
if (connector != null) {
LOG.debug("Shutting down VM connectors for broker: " + host);
ServiceSupport.dispose(connector);
BrokerService broker = BROKERS.remove(host);
if (broker != null) {
ServiceSupport.dispose(broker);
}
}
} }
public static void stopped(String host) { public static void stopped(String host) {

View File

@ -60,6 +60,5 @@
<property name="password" value="${activemq.password}"/> <property name="password" value="${activemq.password}"/>
</bean> </bean>
</property> </property>
<property name="useSingleConnection" value="true" />
</bean> </bean>
</beans> </beans>

View File

@ -58,6 +58,5 @@
<property name="password" value="${activemq.password}"/> <property name="password" value="${activemq.password}"/>
</bean> </bean>
</property> </property>
<property name="useSingleConnection" value="true" />
</bean> </bean>
</beans> </beans>