diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java index e856c5da53..5decd6c3ca 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; +import java.util.function.Consumer; import javax.jms.IllegalStateException; import javax.management.InstanceNotFoundException; @@ -79,23 +80,24 @@ public class ManagedRegionBroker extends RegionBroker { private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class); private final ManagementContext managementContext; private final ObjectName brokerObjectName; - private final Map topics = new ConcurrentHashMap(); - private final Map queues = new ConcurrentHashMap(); - private final Map temporaryQueues = new ConcurrentHashMap(); - private final Map temporaryTopics = new ConcurrentHashMap(); - private final Map queueSubscribers = new ConcurrentHashMap(); - private final Map topicSubscribers = new ConcurrentHashMap(); - private final Map durableTopicSubscribers = new ConcurrentHashMap(); - private final Map inactiveDurableTopicSubscribers = new ConcurrentHashMap(); - private final Map temporaryQueueSubscribers = new ConcurrentHashMap(); - private final Map temporaryTopicSubscribers = new ConcurrentHashMap(); - private final Map queueProducers = new ConcurrentHashMap(); - private final Map topicProducers = new ConcurrentHashMap(); - private final Map temporaryQueueProducers = new ConcurrentHashMap(); - private final Map temporaryTopicProducers = new ConcurrentHashMap(); - private final Map dynamicDestinationProducers = new ConcurrentHashMap(); - private final Map subscriptionKeys = new ConcurrentHashMap(); - private final Map subscriptionMap = new ConcurrentHashMap(); + private final Map topics = new ConcurrentHashMap<>(); + private final Map queues = new ConcurrentHashMap<>(); + private final Map temporaryQueues = new ConcurrentHashMap<>(); + private final Map temporaryTopics = new ConcurrentHashMap<>(); + private final Map queueSubscribers = new ConcurrentHashMap<>(); + private final Map topicSubscribers = new ConcurrentHashMap<>(); + private final Map durableTopicSubscribers = new ConcurrentHashMap<>(); + private final Map inactiveDurableTopicSubscribers = new ConcurrentHashMap<>(); + private final Map temporaryQueueSubscribers = new ConcurrentHashMap<>(); + private final Map temporaryTopicSubscribers = new ConcurrentHashMap<>(); + private final Map queueProducers = new ConcurrentHashMap<>(); + private final Map topicProducers = new ConcurrentHashMap<>(); + private final Map temporaryQueueProducers = new ConcurrentHashMap<>(); + private final Map temporaryTopicProducers = new ConcurrentHashMap<>(); + private final Map dynamicDestinationProducers = new ConcurrentHashMap<>(); + private final Map subscriptionKeys = new ConcurrentHashMap<>(); + private final Map subscriptionMap = new ConcurrentHashMap<>(); + private final Map consumerSubscriptionMap = new ConcurrentHashMap<>(); private final Set registeredMBeans = new ConcurrentHashMap<>().newKeySet(); /* This is the first broker in the broker interceptor chain. */ private Broker contextBroker; @@ -215,6 +217,7 @@ public class ManagedRegionBroker extends RegionBroker { registerSubscription(objectName, sub.getConsumerInfo(), key, view); } subscriptionMap.put(sub, objectName); + consumerSubscriptionMap.put(sub.getConsumerInfo(), sub); return objectName; } catch (Exception e) { LOG.error("Failed to register subscription {}", sub, e); @@ -249,11 +252,9 @@ public class ManagedRegionBroker extends RegionBroker { @Override public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - for (Subscription sub : subscriptionMap.keySet()) { - if (sub.getConsumerInfo().equals(info)) { - // unregister all consumer subs - unregisterSubscription(subscriptionMap.get(sub), true); - } + if (consumerSubscriptionMap.containsKey(info)){ + Subscription sub = consumerSubscriptionMap.get(info); + unregisterSubscription(subscriptionMap.get(sub), true); } super.removeConsumer(context, info); } @@ -295,6 +296,7 @@ public class ManagedRegionBroker extends RegionBroker { public void unregisterSubscription(Subscription sub) { ObjectName name = subscriptionMap.remove(sub); + consumerSubscriptionMap.remove(sub.getConsumerInfo()); if (name != null) { try { SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName()); @@ -484,7 +486,7 @@ public class ManagedRegionBroker extends RegionBroker { } protected void buildExistingSubscriptions() throws Exception { - Map subscriptions = new HashMap(); + Map subscriptions = new HashMap<>(); Set destinations = destinationFactory.getDestinations(); if (destinations != null) { for (ActiveMQDestination dest : destinations) { @@ -609,7 +611,7 @@ public class ManagedRegionBroker extends RegionBroker { } private ObjectName[] onlyNonSuppressed (Set set){ - List nonSuppressed = new ArrayList(); + List nonSuppressed = new ArrayList<>(); for(ObjectName key : set){ if (managementContext.isAllowedToRegister(key)){ nonSuppressed.add(key);