Ensure that incoming commands get exception responses after the initial error is triggered so that client transports don't block wia intg for responses.  

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1221484 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-12-20 20:42:18 +00:00
parent c856f30bc5
commit 5cc1a557ff
1 changed files with 11 additions and 11 deletions

View File

@ -119,6 +119,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
private String duplexNetworkConnectorId; private String duplexNetworkConnectorId;
private Throwable stopError = null;
/** /**
* @param taskRunnerFactory - can be null if you want direct dispatch to the transport * @param taskRunnerFactory - can be null if you want direct dispatch to the transport
@ -139,14 +140,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
this.transport.setTransportListener(new DefaultTransportListener() { this.transport.setTransportListener(new DefaultTransportListener() {
@Override @Override
public void onCommand(Object o) { public void onCommand(Object o) {
if (pendingStop) {
if (LOG.isTraceEnabled()) {
LOG.trace("Ignoring Command due to pending stop: " + o);
}
return;
}
serviceLock.readLock().lock(); serviceLock.readLock().lock();
try { try {
if (!(o instanceof Command)) { if (!(o instanceof Command)) {
@ -258,6 +251,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
ConnectionError ce = new ConnectionError(); ConnectionError ce = new ConnectionError();
ce.setException(e); ce.setException(e);
dispatchSync(ce); dispatchSync(ce);
// Record the error that caused the transport to stop
this.stopError = e;
// Wait a little bit to try to get the output buffer to flush // Wait a little bit to try to get the output buffer to flush
// the exption notification to the client. // the exption notification to the client.
try { try {
@ -292,7 +287,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
boolean responseRequired = command.isResponseRequired(); boolean responseRequired = command.isResponseRequired();
int commandId = command.getCommandId(); int commandId = command.getCommandId();
try { try {
if (!pendingStop) {
response = command.visit(this); response = command.visit(this);
} else {
response = new ExceptionResponse(this.stopError);
}
} catch (Throwable e) { } catch (Throwable e) {
if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) { if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async") SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
@ -301,7 +300,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
if(e instanceof java.lang.SecurityException){ if(e instanceof java.lang.SecurityException){
// still need to close this down - in case the peer of this transport doesn't play nice // still need to close this down - in case the peer of this transport doesn't play nice
delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage()); delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
} }
if (responseRequired) { if (responseRequired) {
@ -928,10 +927,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
} }
public void delayedStop(final int waitTime, final String reason) { public void delayedStop(final int waitTime, final String reason, Throwable cause) {
if (waitTime > 0) { if (waitTime > 0) {
synchronized (this) { synchronized (this) {
pendingStop = true; pendingStop = true;
stopError = cause;
} }
try { try {
DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {