From 5cc1a557ff9d272e7eebcbee9ac5cfa294a580d9 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Tue, 20 Dec 2011 20:42:18 +0000 Subject: [PATCH] fix for: https://issues.apache.org/jira/browse/AMQ-3625 Ensure that incoming commands get exception responses after the initial error is triggered so that client transports don't block wia intg for responses. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1221484 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/TransportConnection.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 757b1d822b..02a1d193c1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -119,6 +119,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); private String duplexNetworkConnectorId; + private Throwable stopError = null; /** * @param taskRunnerFactory - can be null if you want direct dispatch to the transport @@ -139,14 +140,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor { this.transport.setTransportListener(new DefaultTransportListener() { @Override public void onCommand(Object o) { - - if (pendingStop) { - if (LOG.isTraceEnabled()) { - LOG.trace("Ignoring Command due to pending stop: " + o); - } - return; - } - serviceLock.readLock().lock(); try { if (!(o instanceof Command)) { @@ -258,6 +251,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor { ConnectionError ce = new ConnectionError(); ce.setException(e); dispatchSync(ce); + // Record the error that caused the transport to stop + this.stopError = e; // Wait a little bit to try to get the output buffer to flush // the exption notification to the client. try { @@ -292,7 +287,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor { boolean responseRequired = command.isResponseRequired(); int commandId = command.getCommandId(); try { - response = command.visit(this); + if (!pendingStop) { + response = command.visit(this); + } else { + response = new ExceptionResponse(this.stopError); + } } catch (Throwable e) { if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) { SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async") @@ -301,7 +300,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { if(e instanceof java.lang.SecurityException){ // still need to close this down - in case the peer of this transport doesn't play nice - delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage()); + delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e); } if (responseRequired) { @@ -928,10 +927,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } } - public void delayedStop(final int waitTime, final String reason) { + public void delayedStop(final int waitTime, final String reason, Throwable cause) { if (waitTime > 0) { synchronized (this) { pendingStop = true; + stopError = cause; } try { DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {