Added a ReentrantReadWriteLock to guard against a service call executing while a connection is being shutdown.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@634505 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2008-03-07 02:33:32 +00:00
parent bb763f5f7a
commit 4c481ec6a9
1 changed files with 61 additions and 37 deletions

View File

@ -18,7 +18,6 @@ package org.apache.activemq.broker;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@ -32,8 +31,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ft.MasterBroker; import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics; import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.RegionBroker;
@ -90,7 +89,6 @@ import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport; import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.URISupport; import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -98,7 +96,7 @@ import org.apache.commons.logging.LogFactory;
/** /**
* @version $Revision: 1.8 $ * @version $Revision: 1.8 $
*/ */
public class TransportConnection implements Service, Connection, Task, CommandVisitor { public class TransportConnection implements Connection, Task, CommandVisitor {
private static final Log LOG = LogFactory.getLog(TransportConnection.class); private static final Log LOG = LogFactory.getLog(TransportConnection.class);
private static final Log TRANSPORTLOG = LogFactory.getLog(TransportConnection.class.getName() private static final Log TRANSPORTLOG = LogFactory.getLog(TransportConnection.class.getName()
@ -137,9 +135,8 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
private boolean starting; private boolean starting;
private boolean pendingStop; private boolean pendingStop;
private long timeStamp; private long timeStamp;
private final AtomicBoolean stopped = new AtomicBoolean(false); private final AtomicBoolean stopping = new AtomicBoolean(false);
private final AtomicBoolean transportDisposed = new AtomicBoolean(); private CountDownLatch stopped = new CountDownLatch(1);
private CountDownLatch stopLatch = new CountDownLatch(1);
private final AtomicBoolean asyncException = new AtomicBoolean(false); private final AtomicBoolean asyncException = new AtomicBoolean(false);
private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>(); private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>(); private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
@ -151,7 +148,10 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
private DemandForwardingBridge duplexBridge; private DemandForwardingBridge duplexBridge;
private final TaskRunnerFactory taskRunnerFactory; private final TaskRunnerFactory taskRunnerFactory;
private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
/** /**
* @param connector * @param connector
* @param transport * @param transport
@ -175,15 +175,25 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
this.transport.setTransportListener(new DefaultTransportListener() { this.transport.setTransportListener(new DefaultTransportListener() {
public void onCommand(Object o) { public void onCommand(Object o) {
Command command = (Command)o; serviceLock.readLock().lock();
Response response = service(command); try {
if (response != null) { Command command = (Command)o;
dispatchSync(response); Response response = service(command);
if (response != null) {
dispatchSync(response);
}
} finally {
serviceLock.readLock().unlock();
} }
} }
public void onException(IOException exception) { public void onException(IOException exception) {
serviceTransportException(exception); serviceLock.readLock().lock();
try {
serviceTransportException(exception);
} finally {
serviceLock.readLock().unlock();
}
} }
}); });
connected = true; connected = true;
@ -199,12 +209,12 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
} }
public void serviceTransportException(IOException e) { public void serviceTransportException(IOException e) {
if (!stopped.get()) { if (!stopping.get()) {
transportException.set(e); transportException.set(e);
if (TRANSPORTLOG.isDebugEnabled()) { if (TRANSPORTLOG.isDebugEnabled()) {
TRANSPORTLOG.debug("Transport failed: " + e, e); TRANSPORTLOG.debug("Transport failed: " + e, e);
} }
ServiceSupport.dispose(this); stopAsync();
} }
} }
@ -218,7 +228,6 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
public void serviceExceptionAsync(final IOException e) { public void serviceExceptionAsync(final IOException e) {
if (asyncException.compareAndSet(false, true)) { if (asyncException.compareAndSet(false, true)) {
new Thread("Async Exception Handler") { new Thread("Async Exception Handler") {
public void run() { public void run() {
serviceException(e); serviceException(e);
} }
@ -241,7 +250,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
// Handle the case where the broker is stopped // Handle the case where the broker is stopped
// But the client is still connected. // But the client is still connected.
if (!stopped.get()) { if (!stopping.get()) {
if (SERVICELOG.isDebugEnabled()) { if (SERVICELOG.isDebugEnabled()) {
SERVICELOG SERVICELOG
.debug("Broker has been stopped. Notifying client and closing his connection."); .debug("Broker has been stopped. Notifying client and closing his connection.");
@ -259,9 +268,9 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
} }
// Worst case is we just kill the connection before the // Worst case is we just kill the connection before the
// notification gets to him. // notification gets to him.
ServiceSupport.dispose(this); stopAsync();
} }
} else if (!stopped.get() && !inServiceException) { } else if (!stopping.get() && !inServiceException) {
inServiceException = true; inServiceException = true;
try { try {
SERVICELOG.error("Async error occurred: " + e, e); SERVICELOG.error("Async error occurred: " + e, e);
@ -324,15 +333,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
} }
public Response processShutdown(ShutdownInfo info) throws Exception { public Response processShutdown(ShutdownInfo info) throws Exception {
new Thread("Async Exception Handler") { stopAsync();
public void run() {
try {
TransportConnection.this.stop();
} catch (Exception e) {
serviceException(e);
}
}
}.start();
return null; return null;
} }
@ -735,7 +736,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
} }
public void dispatchAsync(Command message) { public void dispatchAsync(Command message) {
if (!stopped.get()) { if (!stopping.get()) {
//getStatistics().getEnqueues().increment(); //getStatistics().getEnqueues().increment();
if (taskRunner == null) { if (taskRunner == null) {
dispatchSync(message); dispatchSync(message);
@ -763,7 +764,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
final MessageDispatch messageDispatch = (MessageDispatch)(command.isMessageDispatch() final MessageDispatch messageDispatch = (MessageDispatch)(command.isMessageDispatch()
? command : null); ? command : null);
try { try {
if (!stopped.get()) { if (!stopping.get()) {
if (messageDispatch != null) { if (messageDispatch != null) {
broker.preProcessDispatch(messageDispatch); broker.preProcessDispatch(messageDispatch);
} }
@ -783,7 +784,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
public boolean iterate() { public boolean iterate() {
try { try {
if (stopped.get()) { if (stopping.get()) {
if (dispatchStopped.compareAndSet(false, true)) { if (dispatchStopped.compareAndSet(false, true)) {
if (transportException.get() == null) { if (transportException.get() == null) {
try { try {
@ -865,8 +866,14 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
} }
} }
} }
public void stop() throws Exception { public void stop() throws Exception {
stopAsync();
if( !stopped.await(10, TimeUnit.SECONDS) ) {
LOG.info("Could not shutdown the connection to '" + transport.getRemoteAddress()+ "' in a timely manner.");
}
}
public void stopAsync() {
// If we're in the middle of starting // If we're in the middle of starting
// then go no further... for now. // then go no further... for now.
synchronized (this) { synchronized (this) {
@ -876,14 +883,31 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return; return;
} }
} }
if (stopped.compareAndSet(false, true)) { if (stopping.compareAndSet(false, true)) {
doStop(); new Thread("ActiveMQ Transport Stopper: "+ transport.getRemoteAddress()) {
stopLatch.countDown(); @Override
} else { public void run() {
stopLatch.await(1, TimeUnit.SECONDS); // make sure we are not servicing client requests while we are shutting down.
serviceLock.writeLock().lock();
try {
doStop();
} catch (Throwable e) {
LOG.info("Error occured while shutting down a connection to '" + transport.getRemoteAddress()+ "': "+e);
LOG.debug("Error occured while shutting down a connection to '" + transport.getRemoteAddress()+ "': ", e);
} finally {
stopped.countDown();
serviceLock.writeLock().unlock();
}
}
}.start();
} }
} }
@Override
public String toString() {
return "Transport Connection to: "+transport.getRemoteAddress();
}
protected void doStop() throws Exception, InterruptedException { protected void doStop() throws Exception, InterruptedException {
LOG.debug("Stopping connection: " + transport.getRemoteAddress()); LOG.debug("Stopping connection: " + transport.getRemoteAddress());
connector.onStopped(this); connector.onStopped(this);