diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index f6ce000703..55ba224e35 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -925,11 +925,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor { if (duplexBridge != null) { duplexBridge.stop(); } - // If the transport has not failed yet, - // notify the peer that we are doing a normal shutdown. - if (transportException == null) { - transport.oneway(new ShutdownInfo()); - } } } catch (Exception ignore) { @@ -943,12 +938,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor { cs.getContext().getStopping().set(true); } - if (taskRunner != null) { - taskRunner.wakeup(); - // Give it a change to stop gracefully. - dispatchStoppedLatch.await(5, TimeUnit.SECONDS); - } - try { transport.stop(); LOG.debug("Stopped connection: " + transport.getRemoteAddress()); @@ -957,9 +946,12 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } if (taskRunner != null) { + taskRunner.wakeup(); + // Give it a change to stop gracefully. + dispatchStoppedLatch.await(5, TimeUnit.SECONDS); taskRunner.shutdown(); } - + active = false; // Run the MessageDispatch callbacks so that message references get diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/StubConnection.java b/activemq-core/src/test/java/org/apache/activemq/broker/StubConnection.java index 3911e07cf5..551e6eaa11 100755 --- a/activemq-core/src/test/java/org/apache/activemq/broker/StubConnection.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/StubConnection.java @@ -19,6 +19,7 @@ package org.apache.activemq.broker; import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.Service; import org.apache.activemq.command.Command; @@ -40,6 +41,7 @@ public class StubConnection implements Service { private Transport transport; private boolean shuttingDown; private TransportListener listener; + public AtomicReference error = new AtomicReference(); public StubConnection(BrokerService broker) throws Exception { this(TransportFactory.connect(broker.getVmConnectorURI())); @@ -63,13 +65,11 @@ public class StubConnection implements Service { } } - public void onException(IOException error) { + public void onException(IOException e) { if (listener != null) { - listener.onException(error); - } - if (!shuttingDown) { - error.printStackTrace(); + listener.onException(e); } + error.set(e); } }); transport.start();