AMQ-6494 Return ExceptionResponse during broker service shutdown

This commit is contained in:
Hadrian Zbarcea 2016-11-03 02:04:16 -04:00
parent 016ae05d0e
commit dce2b61f87
2 changed files with 13 additions and 25 deletions

View File

@ -22,13 +22,11 @@ import java.net.SocketException;
import java.net.URI; import java.net.URI;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -40,13 +38,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.transaction.xa.XAResource; import javax.transaction.xa.XAResource;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.ConnectionStatistics; import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.BrokerSubscriptionInfo; import org.apache.activemq.command.BrokerSubscriptionInfo;
@ -107,7 +101,6 @@ import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport; import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.NetworkBridgeUtils; import org.apache.activemq.util.NetworkBridgeUtils;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.MDC; import org.slf4j.MDC;
@ -196,14 +189,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString()); throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
} }
Command command = (Command) o; Command command = (Command) o;
if (!brokerService.isStopping()) {
Response response = service(command); Response response = service(command);
if (response != null && !brokerService.isStopping()) { if (response != null) {
dispatchSync(response); dispatchSync(response);
} }
} else {
throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
}
} finally { } finally {
serviceLock.readLock().unlock(); serviceLock.readLock().unlock();
} }
@ -332,10 +321,13 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
boolean responseRequired = command.isResponseRequired(); boolean responseRequired = command.isResponseRequired();
int commandId = command.getCommandId(); int commandId = command.getCommandId();
try { try {
if (!pendingStop) { if (brokerService.isStopping()) {
response = responseRequired ? new ExceptionResponse(
new BrokerStoppedException("Broker " + brokerService + " is being stopped")) : null;
} else if (!pendingStop) {
response = command.visit(this); response = command.visit(this);
} else { } else {
response = new ExceptionResponse(transportException.get()); response = responseRequired ? new ExceptionResponse(transportException.get()) : null;
} }
} catch (Throwable e) { } catch (Throwable e) {
if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) { if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
@ -465,10 +457,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
public int getActiveTransactionCount() { public int getActiveTransactionCount() {
int rc = 0; int rc = 0;
for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) { for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) {
Collection<TransactionState> transactions = cs.getTransactionStates(); rc += cs.getTransactionStates().size();
for (TransactionState transaction : transactions) {
rc++;
}
} }
return rc; return rc;
} }

View File

@ -1,3 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- <!--
Licensed to the Apache Software Foundation (ASF) under one or more Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with contributor license agreements. See the NOTICE file distributed with
@ -26,8 +27,7 @@
<jmsBridgeConnectors> <jmsBridgeConnectors>
<jmsQueueConnector> <jmsQueueConnector>
<inboundQueueBridges> <inboundQueueBridges>
<inboundQueueBridge inboundQueueName="QueueA" localQueueName = "localTestQ" <inboundQueueBridge inboundQueueName="QueueA" localQueueName="localTestQ" doHandleReplyTo="false" />
doHandleReplyTo="false"/>
</inboundQueueBridges> </inboundQueueBridges>
</jmsQueueConnector> </jmsQueueConnector>
</jmsBridgeConnectors> </jmsBridgeConnectors>
@ -35,4 +35,3 @@ doHandleReplyTo="false"/>
</broker> </broker>
</beans> </beans>
<!-- END SNIPPET: example -->