r239@34: chirino | 2007-02-23 14:48:46 -0500

Sync oneway of a ShutdownInfo from the broker to the client could deadlock the vm transport.  Sending that ShutdownInfo in the async dispatch thread now.
 
 


git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-4.1@511083 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-02-23 20:23:49 +00:00
parent e04bd6f0e1
commit aa829bee7b
1 changed files with 15 additions and 11 deletions

View File

@ -126,6 +126,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
protected final AtomicBoolean asyncException = new AtomicBoolean(false); protected final AtomicBoolean asyncException = new AtomicBoolean(false);
private ConnectionContext context; private ConnectionContext context;
private boolean networkConnection; private boolean networkConnection;
private CountDownLatch dispatchStopped = new CountDownLatch(1);
static class ConnectionState extends org.apache.activemq.state.ConnectionState { static class ConnectionState extends org.apache.activemq.state.ConnectionState {
private final ConnectionContext context; private final ConnectionContext context;
@ -793,6 +794,9 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
sub.run(); sub.run();
} }
} }
} else if( command.isShutdownInfo() ) {
dispatch(command);
dispatchStopped.countDown();
} else { } else {
dispatch(command); dispatch(command);
} }
@ -868,21 +872,19 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
if (masterBroker != null){ if (masterBroker != null){
masterBroker.stop(); masterBroker.stop();
} }
// If the transport has not failed yet,
// notify the peer that we are doing a normal shutdown.
if( transportException == null ) {
transport.oneway(new ShutdownInfo());
}
} catch (Exception ignore) { } catch (Exception ignore) {
//ignore.printStackTrace();
} }
transport.stop();
active = false;
if(disposed.compareAndSet(false, true)) { if(disposed.compareAndSet(false, true)) {
// Clear out what's on the queue so that we can send the Shutdown command quicker.
dispatchQueue.clear();
dispatchAsync(new ShutdownInfo());
// Wait up to 10 seconds for the shutdown command to be sent to
// the client.
dispatchStopped.await(10, TimeUnit.SECONDS);
if( taskRunner!=null ) if( taskRunner!=null )
taskRunner.shutdown(); taskRunner.shutdown();
@ -911,6 +913,8 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
stopLatch.countDown(); stopLatch.countDown();
} }
transport.stop();
active = false;
log.debug("Stopped connection: "+transport.getRemoteAddress()); log.debug("Stopped connection: "+transport.getRemoteAddress());
} }