mirror of
https://github.com/apache/activemq.git
synced 2025-02-28 13:19:07 +00:00
avoid deadlock on stop()
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@639531 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
47183aeeaa
commit
0302ffded7
@ -844,21 +844,23 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
|||||||
return manageable;
|
return manageable;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void start() throws Exception {
|
public void start() throws Exception {
|
||||||
starting = true;
|
starting = true;
|
||||||
try {
|
try {
|
||||||
transport.start();
|
synchronized(this) {
|
||||||
|
transport.start();
|
||||||
|
|
||||||
if (taskRunnerFactory != null) {
|
if (taskRunnerFactory != null) {
|
||||||
taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
|
taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
|
||||||
+ getRemoteAddress());
|
+ getRemoteAddress());
|
||||||
} else {
|
} else {
|
||||||
taskRunner = null;
|
taskRunner = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
active = true;
|
active = true;
|
||||||
this.processDispatch(connector.getBrokerInfo());
|
this.processDispatch(connector.getBrokerInfo());
|
||||||
connector.onStarted(this);
|
connector.onStarted(this);
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// Force clean up on an error starting up.
|
// Force clean up on an error starting up.
|
||||||
stop();
|
stop();
|
||||||
@ -875,6 +877,13 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
public void stop() throws Exception {
|
public void stop() throws Exception {
|
||||||
|
synchronized (this) {
|
||||||
|
pendingStop = true;
|
||||||
|
if (starting) {
|
||||||
|
LOG.debug("stop() called in the middle of start(). Delaying...");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
stopAsync();
|
stopAsync();
|
||||||
while( !stopped.await(5, TimeUnit.SECONDS) ) {
|
while( !stopped.await(5, TimeUnit.SECONDS) ) {
|
||||||
LOG.info("The connection to '" + transport.getRemoteAddress()+ "' is taking a long time to shutdown.");
|
LOG.info("The connection to '" + transport.getRemoteAddress()+ "' is taking a long time to shutdown.");
|
||||||
@ -884,13 +893,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
|||||||
public void stopAsync() {
|
public void stopAsync() {
|
||||||
// If we're in the middle of starting
|
// If we're in the middle of starting
|
||||||
// then go no further... for now.
|
// then go no further... for now.
|
||||||
synchronized (this) {
|
|
||||||
pendingStop = true;
|
|
||||||
if (starting) {
|
|
||||||
LOG.debug("stop() called in the middle of start(). Delaying...");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (stopping.compareAndSet(false, true)) {
|
if (stopping.compareAndSet(false, true)) {
|
||||||
|
|
||||||
// Let all the connection contexts know we are shutting down
|
// Let all the connection contexts know we are shutting down
|
||||||
@ -947,10 +949,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (taskRunner != null) {
|
if (taskRunner != null) {
|
||||||
taskRunner.wakeup();
|
taskRunner.shutdown(1);
|
||||||
// Give it a change to stop gracefully.
|
|
||||||
dispatchStoppedLatch.await(5, TimeUnit.SECONDS);
|
|
||||||
taskRunner.shutdown();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
active = false;
|
active = false;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user