AMQ-9107 - rework performance improvement for consumer closing in

managed region broker

This new approach just looks matching Subscriptions from the region for the
destination which prevents having to store another map and falls back to
the old approach if something went wrong.
This commit is contained in:
Christopher L. Shannon (cshannon) 2022-11-01 10:45:24 -04:00 committed by Christopher L. Shannon
parent ea10d984d0
commit d46b74d674
2 changed files with 270 additions and 8 deletions

View File

@ -23,9 +23,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Consumer;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.management.InstanceNotFoundException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
@ -42,6 +42,7 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.AbstractRegion;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFactory;
import org.apache.activemq.broker.region.DestinationInterceptor;
@ -61,6 +62,7 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@ -97,8 +99,7 @@ public class ManagedRegionBroker extends RegionBroker {
private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<>();
private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<>();
private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<>();
private final Map<ConsumerInfo, Subscription> consumerSubscriptionMap = new ConcurrentHashMap<>();
private final Set<ObjectName> registeredMBeans = new ConcurrentHashMap<>().newKeySet();
private final Set<ObjectName> registeredMBeans = ConcurrentHashMap.newKeySet();
/* This is the first broker in the broker interceptor chain. */
private Broker contextBroker;
@ -217,7 +218,6 @@ public class ManagedRegionBroker extends RegionBroker {
registerSubscription(objectName, sub.getConsumerInfo(), key, view);
}
subscriptionMap.put(sub, objectName);
consumerSubscriptionMap.put(sub.getConsumerInfo(), sub);
return objectName;
} catch (Exception e) {
LOG.error("Failed to register subscription {}", sub, e);
@ -252,13 +252,62 @@ public class ManagedRegionBroker extends RegionBroker {
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
if (consumerSubscriptionMap.containsKey(info)){
Subscription sub = consumerSubscriptionMap.get(info);
unregisterSubscription(subscriptionMap.get(sub), true);
//Find subscriptions quickly by relying on the maps contained in the different Regions
//that map consumer ids and subscriptions
final Set<Subscription> subscriptions = findSubscriptions(info);
if (!subscriptions.isEmpty()) {
for (Subscription sub : subscriptions) {
// unregister all consumer subs
unregisterSubscription(subscriptionMap.get(sub), true);
break;
}
} else {
//Fall back to old slow approach where we go through the entire subscription map case something went wrong
//and no subscriptions were found - should generally not happen
for (Subscription sub : subscriptionMap.keySet()) {
if (sub.getConsumerInfo().equals(info)) {
unregisterSubscription(subscriptionMap.get(sub), true);
}
}
}
super.removeConsumer(context, info);
}
private Set<Subscription> findSubscriptions(final ConsumerInfo info) {
final Set<Subscription> subscriptions = new HashSet<>();
try {
if (info.getDestination() != null) {
final ActiveMQDestination consumerDest = info.getDestination();
//If it's composite then go through and find the subscription for every dest in case different
if (consumerDest.isComposite()) {
ActiveMQDestination[] destinations = consumerDest.getCompositeDestinations();
for (ActiveMQDestination destination : destinations) {
addSubscriptionToList(subscriptions, info.getConsumerId(), destination);
}
} else {
//This is the case for a non-composite destination which would be most of the time
addSubscriptionToList(subscriptions, info.getConsumerId(), info.getDestination());
}
}
} catch (Exception e) {
LOG.warn("Error finding subscription {}: {}", info, e.getMessage());
}
return subscriptions;
}
private void addSubscriptionToList(Set<Subscription> subscriptions,
ConsumerId consumerId, ActiveMQDestination dest) throws JMSException {
final Subscription matchingSub = ((AbstractRegion) this.getRegion(dest))
.getSubscriptions().get(consumerId);
if (matchingSub != null) {
subscriptions.add(matchingSub);
}
}
@Override
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
super.addProducer(context, info);
@ -296,7 +345,6 @@ public class ManagedRegionBroker extends RegionBroker {
public void unregisterSubscription(Subscription sub) {
ObjectName name = subscriptionMap.remove(sub);
consumerSubscriptionMap.remove(sub.getConsumerInfo());
if (name != null) {
try {
SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());

View File

@ -0,0 +1,214 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import junit.framework.Test;
import junit.textui.TestRunner;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JmxConsumerRemovalTest extends EmbeddedBrokerTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(JmxConsumerRemovalTest.class);
protected MBeanServer mbeanServer;
protected ManagedRegionBroker regionBroker;
protected Session session;
protected String clientID = "foo";
protected Connection connection;
protected boolean transacted;
public static void main(String[] args) {
TestRunner.run(JmxConsumerRemovalTest.class);
}
public static Test suite() {
return suite(JmxConsumerRemovalTest.class);
}
public void testCompositeDestConsumerRemoval() throws Exception {
Map<Subscription, ObjectName> subscriptionMap = getSubscriptionMap();
int consumersToAdd = 1000;
Set<MessageConsumer> consumers = new HashSet<>();
final ActiveMQDestination dest = new ActiveMQQueue("test");
dest.setCompositeDestinations(new ActiveMQDestination[]{new ActiveMQQueue("test1"),
new ActiveMQQueue("test2"), new ActiveMQQueue("test3")});
for (int i = 0; i < consumersToAdd; i++) {
consumers.add(session.createConsumer(dest));
}
//Create a lot of consumers and make sure they are all tracked in ManagedRegionBroker map
assertTrue(Wait.waitFor(() -> consumersToAdd == subscriptionMap.size(), 5000, 500));
for (MessageConsumer consumer : consumers) {
consumer.close();
}
//Make sure map removed all consumers after close
assertTrue(Wait.waitFor(() -> 0 == subscriptionMap.size(), 5000, 500));
assertTrue(Wait.waitFor(() -> 0 == regionBroker.getQueueSubscribers().length +
regionBroker.getTopicSubscribers().length, 5000, 500));
}
public void testDurableConsumerRemoval() throws Exception {
testDurableConsumerRemoval(new ActiveMQTopic("wildcard.topic.1"));
}
public void testDurableConsumerWildcardRemoval() throws Exception {
testDurableConsumerRemoval(new ActiveMQTopic("wildcard.topic.>"));
}
public void testDurableConsumerRemoval(ActiveMQDestination dest) throws Exception {
int consumersToAdd = 1000;
Set<MessageConsumer> durables = new HashSet<>();
//Create a lot of durables and then
for (int i = 0; i < consumersToAdd; i++) {
durables.add(session.createDurableSubscriber((Topic) dest, "sub" + i));
}
//Create a lot of consumers and make sure they are all tracked in ManagedRegionBroker map
assertTrue(Wait.waitFor(() -> consumersToAdd == getSubscriptionMap().size(), 5000, 500));
for (MessageConsumer consumer : durables) {
consumer.close();
}
//Make sure map removed all consumers after close
assertTrue(Wait.waitFor(() -> 0 == regionBroker.getDurableTopicSubscribers().length, 5000, 500));
//Note we can't check the subscription map as the durables still exist, just offline
}
public void testQueueConsumerRemoval() throws Exception {
testConsumerRemoval(new ActiveMQQueue("wildcard.queue.1"));
}
public void testQueueConsumerRemovalWildcard() throws Exception {
testConsumerRemoval(new ActiveMQQueue("wildcard.queue.>"));
}
public void testTopicConsumerRemoval() throws Exception {
testConsumerRemoval(new ActiveMQTopic("wildcard.topic.1"));
}
public void testTopicConsumerRemovalWildcard() throws Exception {
testConsumerRemoval(new ActiveMQTopic("wildcard.topic.>"));
}
private void testConsumerRemoval(ActiveMQDestination dest) throws Exception {
Map<Subscription, ObjectName> subscriptionMap = getSubscriptionMap();
int consumersToAdd = 1000;
Set<MessageConsumer> consumers = new HashSet<>();
for (int i = 0; i < consumersToAdd; i++) {
consumers.add(session.createConsumer(dest));
}
//Create a lot of consumers and make sure they are all tracked in ManagedRegionBroker map
assertTrue(Wait.waitFor(() -> consumersToAdd == subscriptionMap.size(), 5000, 500));
for (MessageConsumer consumer : consumers) {
consumer.close();
}
//Make sure map removed all consumers after close
assertTrue(Wait.waitFor(() -> 0 == subscriptionMap.size(), 5000, 500));
assertTrue(Wait.waitFor(() -> 0 == regionBroker.getQueueSubscribers().length &&
0 == regionBroker.getTopicSubscribers().length, 5000, 500));
}
private Map<Subscription, ObjectName> getSubscriptionMap() throws Exception {
ManagedRegionBroker regionBroker = (ManagedRegionBroker) broker.getBroker().getAdaptor(ManagedRegionBroker.class);
Field subMapField = ManagedRegionBroker.class.getDeclaredField("subscriptionMap");
subMapField.setAccessible(true);
return (Map<Subscription, ObjectName>) subMapField.get(regionBroker);
}
protected void setUp() throws Exception {
bindAddress = "tcp://localhost:0";
useTopic = true;
super.setUp();
mbeanServer = broker.getManagementContext().getMBeanServer();
regionBroker = (ManagedRegionBroker) broker.getBroker().getAdaptor(ManagedRegionBroker.class);
((ActiveMQConnectionFactory)connectionFactory).setWatchTopicAdvisories(false);
connection = connectionFactory.createConnection();
connection.setClientID(clientID);
connection.start();
session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
}
protected void tearDown() throws Exception {
if (connection != null) {
connection.close();
connection = null;
}
super.tearDown();
}
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
answer.setUseJmx(true);
answer.addConnector(bindAddress);
answer.deleteAllMessages();
return answer;
}
@Override
protected ConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
}
protected void echo(String text) {
LOG.info(text);
}
/**
* Returns the name of the destination used in this test case
*/
protected String getDestinationString() {
return getClass().getName() + "." + getName(true);
}
}