diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index a32d4f6b8c..ac72534e39 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -22,13 +22,11 @@ import java.net.SocketException; import java.net.URI; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -40,13 +38,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.transaction.xa.XAResource; -import org.apache.activemq.advisory.AdvisoryBroker; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.region.ConnectionStatistics; -import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.broker.region.Subscription; -import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerSubscriptionInfo; @@ -107,7 +101,6 @@ import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.MarshallingSupport; import org.apache.activemq.util.NetworkBridgeUtils; -import org.apache.activemq.util.SubscriptionKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -196,13 +189,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor { throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString()); } Command command = (Command) o; - if (!brokerService.isStopping()) { - Response response = service(command); - if (response != null && !brokerService.isStopping()) { - dispatchSync(response); - } - } else { - throw new BrokerStoppedException("Broker " + brokerService + " is being stopped"); + Response response = service(command); + if (response != null) { + dispatchSync(response); } } finally { serviceLock.readLock().unlock(); @@ -332,10 +321,13 @@ public class TransportConnection implements Connection, Task, CommandVisitor { boolean responseRequired = command.isResponseRequired(); int commandId = command.getCommandId(); try { - if (!pendingStop) { + if (brokerService.isStopping()) { + response = responseRequired ? new ExceptionResponse( + new BrokerStoppedException("Broker " + brokerService + " is being stopped")) : null; + } else if (!pendingStop) { response = command.visit(this); } else { - response = new ExceptionResponse(transportException.get()); + response = responseRequired ? new ExceptionResponse(transportException.get()) : null; } } catch (Throwable e) { if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) { @@ -465,10 +457,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { public int getActiveTransactionCount() { int rc = 0; for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) { - Collection transactions = cs.getTransactionStates(); - for (TransactionState transaction : transactions) { - rc++; - } + rc += cs.getTransactionStates().size(); } return rc; } diff --git a/activemq-unit-tests/src/test/resources/handleReplyToActivemq.xml b/activemq-unit-tests/src/test/resources/handleReplyToActivemq.xml index b40cc59cc4..9a5df34fcc 100644 --- a/activemq-unit-tests/src/test/resources/handleReplyToActivemq.xml +++ b/activemq-unit-tests/src/test/resources/handleReplyToActivemq.xml @@ -1,3 +1,4 @@ +