mirror of https://github.com/apache/activemq.git
- memory leak fix: TransportConnection would leak memory if an error occured while start()ing the connection. Most visible when you create a network connector pointing at a remote broker that was down since this loops through creating TransportConnectors every few seconds.
- Deadlock fix VMTransport could dead lock if during start() an error occured and stop was recusively called. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@592114 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
62890b3fbb
commit
ada47fbc50
|
@ -846,6 +846,10 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
|
||||||
active = true;
|
active = true;
|
||||||
this.processDispatch(connector.getBrokerInfo());
|
this.processDispatch(connector.getBrokerInfo());
|
||||||
connector.onStarted(this);
|
connector.onStarted(this);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// Force clean up on an error starting up.
|
||||||
|
stop();
|
||||||
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
// stop() can be called from within the above block,
|
// stop() can be called from within the above block,
|
||||||
// but we want to be sure start() completes before
|
// but we want to be sure start() completes before
|
||||||
|
|
|
@ -48,7 +48,7 @@ public final class Valve {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean isOn() {
|
public boolean isOn() {
|
||||||
synchronized (mutex) {
|
synchronized (mutex) {
|
||||||
return on;
|
return on;
|
||||||
}
|
}
|
||||||
|
|
|
@ -115,7 +115,7 @@ public class VMTransport implements Transport, Task {
|
||||||
enqueueValve.turnOff();
|
enqueueValve.turnOff();
|
||||||
if (messageQueue != null && !async) {
|
if (messageQueue != null && !async) {
|
||||||
Object command;
|
Object command;
|
||||||
while ((command = messageQueue.poll()) != null) {
|
while ((command = messageQueue.poll()) != null && !stopping.get() ) {
|
||||||
transportListener.onCommand(command);
|
transportListener.onCommand(command);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -124,27 +124,37 @@ public class VMTransport implements Transport, Task {
|
||||||
} finally {
|
} finally {
|
||||||
enqueueValve.turnOn();
|
enqueueValve.turnOn();
|
||||||
}
|
}
|
||||||
|
// If we get stopped while starting up, then do the actual stop now
|
||||||
|
// that the enqueueValve is back on.
|
||||||
|
if( stopping.get() ) {
|
||||||
|
stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stop() throws Exception {
|
public void stop() throws Exception {
|
||||||
TaskRunner tr = null;
|
stopping.set(true);
|
||||||
try {
|
|
||||||
stopping.set(true);
|
// If stop() is called while being start()ed.. then we can't stop until we return to the start() method.
|
||||||
enqueueValve.turnOff();
|
if( enqueueValve.isOn() ) {
|
||||||
if (!disposed) {
|
|
||||||
started = false;
|
TaskRunner tr = null;
|
||||||
disposed = true;
|
try {
|
||||||
if (taskRunner != null) {
|
enqueueValve.turnOff();
|
||||||
tr = taskRunner;
|
if (!disposed) {
|
||||||
taskRunner = null;
|
started = false;
|
||||||
|
disposed = true;
|
||||||
|
if (taskRunner != null) {
|
||||||
|
tr = taskRunner;
|
||||||
|
taskRunner = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
stopping.set(false);
|
||||||
|
enqueueValve.turnOn();
|
||||||
|
}
|
||||||
|
if (tr != null) {
|
||||||
|
tr.shutdown(1000);
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
stopping.set(false);
|
|
||||||
enqueueValve.turnOn();
|
|
||||||
}
|
|
||||||
if (tr != null) {
|
|
||||||
tr.shutdown(1000);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue