diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java index 5decd6c3ca..fa28379560 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java @@ -23,9 +23,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; -import java.util.function.Consumer; import javax.jms.IllegalStateException; +import javax.jms.JMSException; import javax.management.InstanceNotFoundException; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; @@ -42,6 +42,7 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; +import org.apache.activemq.broker.region.AbstractRegion; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationFactory; import org.apache.activemq.broker.region.DestinationInterceptor; @@ -61,6 +62,7 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; @@ -97,8 +99,7 @@ public class ManagedRegionBroker extends RegionBroker { private final Map dynamicDestinationProducers = new ConcurrentHashMap<>(); private final Map subscriptionKeys = new ConcurrentHashMap<>(); private final Map subscriptionMap = new ConcurrentHashMap<>(); - private final Map consumerSubscriptionMap = new ConcurrentHashMap<>(); - private final Set registeredMBeans = new ConcurrentHashMap<>().newKeySet(); + private final Set registeredMBeans = ConcurrentHashMap.newKeySet(); /* This is the first broker in the broker interceptor chain. */ private Broker contextBroker; @@ -217,7 +218,6 @@ public class ManagedRegionBroker extends RegionBroker { registerSubscription(objectName, sub.getConsumerInfo(), key, view); } subscriptionMap.put(sub, objectName); - consumerSubscriptionMap.put(sub.getConsumerInfo(), sub); return objectName; } catch (Exception e) { LOG.error("Failed to register subscription {}", sub, e); @@ -252,13 +252,62 @@ public class ManagedRegionBroker extends RegionBroker { @Override public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - if (consumerSubscriptionMap.containsKey(info)){ - Subscription sub = consumerSubscriptionMap.get(info); - unregisterSubscription(subscriptionMap.get(sub), true); + //Find subscriptions quickly by relying on the maps contained in the different Regions + //that map consumer ids and subscriptions + final Set subscriptions = findSubscriptions(info); + + if (!subscriptions.isEmpty()) { + for (Subscription sub : subscriptions) { + // unregister all consumer subs + unregisterSubscription(subscriptionMap.get(sub), true); + break; + } + } else { + //Fall back to old slow approach where we go through the entire subscription map case something went wrong + //and no subscriptions were found - should generally not happen + for (Subscription sub : subscriptionMap.keySet()) { + if (sub.getConsumerInfo().equals(info)) { + unregisterSubscription(subscriptionMap.get(sub), true); + } + } } + super.removeConsumer(context, info); } + private Set findSubscriptions(final ConsumerInfo info) { + final Set subscriptions = new HashSet<>(); + + try { + if (info.getDestination() != null) { + final ActiveMQDestination consumerDest = info.getDestination(); + //If it's composite then go through and find the subscription for every dest in case different + if (consumerDest.isComposite()) { + ActiveMQDestination[] destinations = consumerDest.getCompositeDestinations(); + for (ActiveMQDestination destination : destinations) { + addSubscriptionToList(subscriptions, info.getConsumerId(), destination); + } + } else { + //This is the case for a non-composite destination which would be most of the time + addSubscriptionToList(subscriptions, info.getConsumerId(), info.getDestination()); + } + } + } catch (Exception e) { + LOG.warn("Error finding subscription {}: {}", info, e.getMessage()); + } + + return subscriptions; + } + + private void addSubscriptionToList(Set subscriptions, + ConsumerId consumerId, ActiveMQDestination dest) throws JMSException { + final Subscription matchingSub = ((AbstractRegion) this.getRegion(dest)) + .getSubscriptions().get(consumerId); + if (matchingSub != null) { + subscriptions.add(matchingSub); + } + } + @Override public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { super.addProducer(context, info); @@ -296,7 +345,6 @@ public class ManagedRegionBroker extends RegionBroker { public void unregisterSubscription(Subscription sub) { ObjectName name = subscriptionMap.remove(sub); - consumerSubscriptionMap.remove(sub.getConsumerInfo()); if (name != null) { try { SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName()); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxConsumerRemovalTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxConsumerRemovalTest.java new file mode 100644 index 0000000000..2e356504a3 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxConsumerRemovalTest.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +import java.lang.reflect.Field; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.management.MBeanServer; +import javax.management.MBeanServerInvocationHandler; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import junit.framework.Test; +import junit.textui.TestRunner; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.memory.MemoryPersistenceAdapter; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class JmxConsumerRemovalTest extends EmbeddedBrokerTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(JmxConsumerRemovalTest.class); + + protected MBeanServer mbeanServer; + protected ManagedRegionBroker regionBroker; + protected Session session; + protected String clientID = "foo"; + + protected Connection connection; + protected boolean transacted; + + public static void main(String[] args) { + TestRunner.run(JmxConsumerRemovalTest.class); + } + + public static Test suite() { + return suite(JmxConsumerRemovalTest.class); + } + + public void testCompositeDestConsumerRemoval() throws Exception { + Map subscriptionMap = getSubscriptionMap(); + int consumersToAdd = 1000; + Set consumers = new HashSet<>(); + + final ActiveMQDestination dest = new ActiveMQQueue("test"); + dest.setCompositeDestinations(new ActiveMQDestination[]{new ActiveMQQueue("test1"), + new ActiveMQQueue("test2"), new ActiveMQQueue("test3")}); + + for (int i = 0; i < consumersToAdd; i++) { + consumers.add(session.createConsumer(dest)); + } + + //Create a lot of consumers and make sure they are all tracked in ManagedRegionBroker map + assertTrue(Wait.waitFor(() -> consumersToAdd == subscriptionMap.size(), 5000, 500)); + + for (MessageConsumer consumer : consumers) { + consumer.close(); + } + + //Make sure map removed all consumers after close + assertTrue(Wait.waitFor(() -> 0 == subscriptionMap.size(), 5000, 500)); + assertTrue(Wait.waitFor(() -> 0 == regionBroker.getQueueSubscribers().length + + regionBroker.getTopicSubscribers().length, 5000, 500)); + } + + public void testDurableConsumerRemoval() throws Exception { + testDurableConsumerRemoval(new ActiveMQTopic("wildcard.topic.1")); + } + + public void testDurableConsumerWildcardRemoval() throws Exception { + testDurableConsumerRemoval(new ActiveMQTopic("wildcard.topic.>")); + + } + public void testDurableConsumerRemoval(ActiveMQDestination dest) throws Exception { + int consumersToAdd = 1000; + Set durables = new HashSet<>(); + + //Create a lot of durables and then + for (int i = 0; i < consumersToAdd; i++) { + durables.add(session.createDurableSubscriber((Topic) dest, "sub" + i)); + } + + //Create a lot of consumers and make sure they are all tracked in ManagedRegionBroker map + assertTrue(Wait.waitFor(() -> consumersToAdd == getSubscriptionMap().size(), 5000, 500)); + + for (MessageConsumer consumer : durables) { + consumer.close(); + } + + //Make sure map removed all consumers after close + assertTrue(Wait.waitFor(() -> 0 == regionBroker.getDurableTopicSubscribers().length, 5000, 500)); + //Note we can't check the subscription map as the durables still exist, just offline + } + + public void testQueueConsumerRemoval() throws Exception { + testConsumerRemoval(new ActiveMQQueue("wildcard.queue.1")); + } + + public void testQueueConsumerRemovalWildcard() throws Exception { + testConsumerRemoval(new ActiveMQQueue("wildcard.queue.>")); + } + + public void testTopicConsumerRemoval() throws Exception { + testConsumerRemoval(new ActiveMQTopic("wildcard.topic.1")); + } + + public void testTopicConsumerRemovalWildcard() throws Exception { + testConsumerRemoval(new ActiveMQTopic("wildcard.topic.>")); + } + + private void testConsumerRemoval(ActiveMQDestination dest) throws Exception { + Map subscriptionMap = getSubscriptionMap(); + int consumersToAdd = 1000; + Set consumers = new HashSet<>(); + + for (int i = 0; i < consumersToAdd; i++) { + consumers.add(session.createConsumer(dest)); + } + + //Create a lot of consumers and make sure they are all tracked in ManagedRegionBroker map + assertTrue(Wait.waitFor(() -> consumersToAdd == subscriptionMap.size(), 5000, 500)); + + for (MessageConsumer consumer : consumers) { + consumer.close(); + } + + //Make sure map removed all consumers after close + assertTrue(Wait.waitFor(() -> 0 == subscriptionMap.size(), 5000, 500)); + assertTrue(Wait.waitFor(() -> 0 == regionBroker.getQueueSubscribers().length && + 0 == regionBroker.getTopicSubscribers().length, 5000, 500)); + } + + private Map getSubscriptionMap() throws Exception { + ManagedRegionBroker regionBroker = (ManagedRegionBroker) broker.getBroker().getAdaptor(ManagedRegionBroker.class); + Field subMapField = ManagedRegionBroker.class.getDeclaredField("subscriptionMap"); + subMapField.setAccessible(true); + return (Map) subMapField.get(regionBroker); + } + + protected void setUp() throws Exception { + bindAddress = "tcp://localhost:0"; + useTopic = true; + super.setUp(); + mbeanServer = broker.getManagementContext().getMBeanServer(); + regionBroker = (ManagedRegionBroker) broker.getBroker().getAdaptor(ManagedRegionBroker.class); + ((ActiveMQConnectionFactory)connectionFactory).setWatchTopicAdvisories(false); + connection = connectionFactory.createConnection(); + connection.setClientID(clientID); + connection.start(); + session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + } + + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + connection = null; + } + super.tearDown(); + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setUseJmx(true); + answer.addConnector(bindAddress); + answer.deleteAllMessages(); + return answer; + } + + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()); + } + + protected void echo(String text) { + LOG.info(text); + } + + /** + * Returns the name of the destination used in this test case + */ + protected String getDestinationString() { + return getClass().getName() + "." + getName(true); + } +}