From ada47fbc50e8a324538517ba262490499cd05530 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Mon, 5 Nov 2007 18:47:16 +0000 Subject: [PATCH] - memory leak fix: TransportConnection would leak memory if an error occured while start()ing the connection. Most visible when you create a network connector pointing at a remote broker that was down since this loops through creating TransportConnectors every few seconds. - Deadlock fix VMTransport could dead lock if during start() an error occured and stop was recusively called. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@592114 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/TransportConnection.java | 4 ++ .../org/apache/activemq/thread/Valve.java | 2 +- .../activemq/transport/vm/VMTransport.java | 44 ++++++++++++------- 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 007a8264c4..e12bb1b684 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -846,6 +846,10 @@ public class TransportConnection implements Service, Connection, Task, CommandVi active = true; this.processDispatch(connector.getBrokerInfo()); connector.onStarted(this); + } catch (Exception e) { + // Force clean up on an error starting up. + stop(); + throw e; } finally { // stop() can be called from within the above block, // but we want to be sure start() completes before diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java b/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java index abb2716a0f..75d45ae45b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java +++ b/activemq-core/src/main/java/org/apache/activemq/thread/Valve.java @@ -48,7 +48,7 @@ public final class Valve { } } - boolean isOn() { + public boolean isOn() { synchronized (mutex) { return on; } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index 4a3dbee461..799fe24dbe 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -115,7 +115,7 @@ public class VMTransport implements Transport, Task { enqueueValve.turnOff(); if (messageQueue != null && !async) { Object command; - while ((command = messageQueue.poll()) != null) { + while ((command = messageQueue.poll()) != null && !stopping.get() ) { transportListener.onCommand(command); } } @@ -124,27 +124,37 @@ public class VMTransport implements Transport, Task { } finally { enqueueValve.turnOn(); } + // If we get stopped while starting up, then do the actual stop now + // that the enqueueValve is back on. + if( stopping.get() ) { + stop(); + } } public void stop() throws Exception { - TaskRunner tr = null; - try { - stopping.set(true); - enqueueValve.turnOff(); - if (!disposed) { - started = false; - disposed = true; - if (taskRunner != null) { - tr = taskRunner; - taskRunner = null; + stopping.set(true); + + // If stop() is called while being start()ed.. then we can't stop until we return to the start() method. + if( enqueueValve.isOn() ) { + + TaskRunner tr = null; + try { + enqueueValve.turnOff(); + if (!disposed) { + started = false; + disposed = true; + if (taskRunner != null) { + tr = taskRunner; + taskRunner = null; + } } + } finally { + stopping.set(false); + enqueueValve.turnOn(); + } + if (tr != null) { + tr.shutdown(1000); } - } finally { - stopping.set(false); - enqueueValve.turnOn(); - } - if (tr != null) { - tr.shutdown(1000); } }