diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index b5e1c5542e..a86a36efb2 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -117,7 +117,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { protected BrokerInfo brokerInfo; protected final List dispatchQueue = new LinkedList(); protected TaskRunner taskRunner; - protected final AtomicReference transportException = new AtomicReference(); + protected final AtomicReference transportException = new AtomicReference(); protected AtomicBoolean dispatchStopped = new AtomicBoolean(false); private final Transport transport; private MessageAuthorizationPolicy messageAuthorizationPolicy; @@ -152,7 +152,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor { private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); private String duplexNetworkConnectorId; - private Throwable stopError = null; /** * @param taskRunnerFactory - can be null if you want direct dispatch to the transport @@ -245,7 +244,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) { TRANSPORTLOG.warn(this + " failed: " + e); } - stopAsync(); + stopAsync(e); } } @@ -295,7 +294,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { ce.setException(e); dispatchSync(ce); // Record the error that caused the transport to stop - this.stopError = e; + transportException.set(e); // Wait a little bit to try to get the output buffer to flush // the exception notification to the client. try { @@ -334,7 +333,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { if (!pendingStop) { response = command.visit(this); } else { - response = new ExceptionResponse(this.stopError); + response = new ExceptionResponse(transportException.get()); } } catch (Throwable e) { if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) { @@ -863,7 +862,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { iter.remove(); } try { - broker.removeConnection(cs.getContext(), cs.getInfo(), null); + broker.removeConnection(cs.getContext(), cs.getInfo(), transportException.get()); } catch (Throwable e) { SERVICELOG.warn("Failed to remove connection {}", cs.getInfo(), e); } @@ -1073,7 +1072,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { if (waitTime > 0) { synchronized (this) { pendingStop = true; - stopError = cause; + transportException.set(cause); } try { stopTaskRunnerFactory.execute(new Runnable() { @@ -1093,6 +1092,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } } + public void stopAsync(Throwable cause) { + transportException.set(cause); + stopAsync(); + } + public void stopAsync() { // If we're in the middle of starting then go no further... for now. synchronized (this) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index bdeeb7646e..e975b4c014 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -266,7 +266,7 @@ public class RegionBroker extends EmptyBroker { LOG.warn("Stealing link for clientId {} From Connection {}", clientId, oldContext.getConnection()); if (connection instanceof TransportConnection) { TransportConnection transportConnection = (TransportConnection) connection; - transportConnection.stopAsync(); + transportConnection.stopAsync(new IOException("Stealing link for clientId " + clientId + " From Connection " + oldContext.getConnection().getConnectionId())); } else { connection.stop(); } diff --git a/activemq-broker/src/test/java/org/apache/activemq/broker/LinkStealingTest.java b/activemq-broker/src/test/java/org/apache/activemq/broker/LinkStealingTest.java index ec944fbaa4..60fce40f47 100644 --- a/activemq-broker/src/test/java/org/apache/activemq/broker/LinkStealingTest.java +++ b/activemq-broker/src/test/java/org/apache/activemq/broker/LinkStealingTest.java @@ -18,20 +18,32 @@ package org.apache.activemq.broker; import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ConnectionInfo; import javax.jms.Connection; import javax.jms.InvalidClientIDException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; public class LinkStealingTest extends TestCase { protected BrokerService brokerService; protected int timeOutInSeconds = 10; + protected final AtomicReference removeException = new AtomicReference(); @Override protected void setUp() throws Exception { brokerService = new BrokerService(); brokerService.setPersistent(false); + brokerService.setPlugins(new BrokerPlugin[]{ + new BrokerPluginSupport() { + @Override + public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { + removeException.set(error); + super.removeConnection(context, info, error); + } + } + }); } @Override @@ -86,6 +98,7 @@ public class LinkStealingTest extends TestCase { exceptionFlag.set(true); } assertFalse(exceptionFlag.get()); + assertNotNull(removeException.get()); } }