This closes #208

This commit is contained in:
Hadrian Zbarcea 2016-11-04 09:38:44 -04:00
commit d756d35715
2 changed files with 13 additions and 25 deletions

View File

@ -22,13 +22,11 @@ import java.net.SocketException;
import java.net.URI; import java.net.URI;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -40,13 +38,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.transaction.xa.XAResource; import javax.transaction.xa.XAResource;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.ConnectionStatistics; import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.BrokerSubscriptionInfo; import org.apache.activemq.command.BrokerSubscriptionInfo;
@ -107,7 +101,6 @@ import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport; import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.NetworkBridgeUtils; import org.apache.activemq.util.NetworkBridgeUtils;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.MDC; import org.slf4j.MDC;
@ -196,13 +189,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString()); throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
} }
Command command = (Command) o; Command command = (Command) o;
if (!brokerService.isStopping()) { Response response = service(command);
Response response = service(command); if (response != null) {
if (response != null && !brokerService.isStopping()) { dispatchSync(response);
dispatchSync(response);
}
} else {
throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
} }
} finally { } finally {
serviceLock.readLock().unlock(); serviceLock.readLock().unlock();
@ -332,10 +321,13 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
boolean responseRequired = command.isResponseRequired(); boolean responseRequired = command.isResponseRequired();
int commandId = command.getCommandId(); int commandId = command.getCommandId();
try { try {
if (!pendingStop) { if (brokerService.isStopping()) {
response = responseRequired ? new ExceptionResponse(
new BrokerStoppedException("Broker " + brokerService + " is being stopped")) : null;
} else if (!pendingStop) {
response = command.visit(this); response = command.visit(this);
} else { } else {
response = new ExceptionResponse(transportException.get()); response = responseRequired ? new ExceptionResponse(transportException.get()) : null;
} }
} catch (Throwable e) { } catch (Throwable e) {
if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) { if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
@ -465,10 +457,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
public int getActiveTransactionCount() { public int getActiveTransactionCount() {
int rc = 0; int rc = 0;
for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) { for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) {
Collection<TransactionState> transactions = cs.getTransactionStates(); rc += cs.getTransactionStates().size();
for (TransactionState transaction : transactions) {
rc++;
}
} }
return rc; return rc;
} }

View File

@ -1,3 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- <!--
Licensed to the Apache Software Foundation (ASF) under one or more Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with contributor license agreements. See the NOTICE file distributed with
@ -21,18 +22,16 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="target/activemq-data/handle-replyto"> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="target/activemq-data/handle-replyto">
<jmsBridgeConnectors> <jmsBridgeConnectors>
<jmsQueueConnector> <jmsQueueConnector>
<inboundQueueBridges> <inboundQueueBridges>
<inboundQueueBridge inboundQueueName="QueueA" localQueueName = "localTestQ" <inboundQueueBridge inboundQueueName="QueueA" localQueueName="localTestQ" doHandleReplyTo="false" />
doHandleReplyTo="false"/>
</inboundQueueBridges> </inboundQueueBridges>
</jmsQueueConnector> </jmsQueueConnector>
</jmsBridgeConnectors> </jmsBridgeConnectors>
</broker> </broker>
</beans> </beans>
<!-- END SNIPPET: example -->