mirror of https://github.com/apache/activemq.git
AMQ-9107 - rework performance improvement for consumer closing in
managed region broker
This new approach just looks matching Subscriptions from the region for the
destination which prevents having to store another map and falls back to
the old approach if something went wrong.
(cherry picked from commit d46b74d674
)
This commit is contained in:
parent
bc9e728123
commit
8cc7a45455
|
@ -23,9 +23,9 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import javax.jms.IllegalStateException;
|
||||
import javax.jms.JMSException;
|
||||
import javax.management.InstanceNotFoundException;
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.management.ObjectName;
|
||||
|
@ -42,6 +42,7 @@ import org.apache.activemq.broker.BrokerService;
|
|||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
|
||||
import org.apache.activemq.broker.region.AbstractRegion;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.DestinationFactory;
|
||||
import org.apache.activemq.broker.region.DestinationInterceptor;
|
||||
|
@ -61,6 +62,7 @@ import org.apache.activemq.command.ActiveMQDestination;
|
|||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.ConnectionInfo;
|
||||
import org.apache.activemq.command.ConsumerId;
|
||||
import org.apache.activemq.command.ConsumerInfo;
|
||||
import org.apache.activemq.command.Message;
|
||||
import org.apache.activemq.command.MessageAck;
|
||||
|
@ -97,8 +99,7 @@ public class ManagedRegionBroker extends RegionBroker {
|
|||
private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<>();
|
||||
private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<>();
|
||||
private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<>();
|
||||
private final Map<ConsumerInfo, Subscription> consumerSubscriptionMap = new ConcurrentHashMap<>();
|
||||
private final Set<ObjectName> registeredMBeans = new ConcurrentHashMap<>().newKeySet();
|
||||
private final Set<ObjectName> registeredMBeans = ConcurrentHashMap.newKeySet();
|
||||
/* This is the first broker in the broker interceptor chain. */
|
||||
private Broker contextBroker;
|
||||
|
||||
|
@ -217,7 +218,6 @@ public class ManagedRegionBroker extends RegionBroker {
|
|||
registerSubscription(objectName, sub.getConsumerInfo(), key, view);
|
||||
}
|
||||
subscriptionMap.put(sub, objectName);
|
||||
consumerSubscriptionMap.put(sub.getConsumerInfo(), sub);
|
||||
return objectName;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to register subscription {}", sub, e);
|
||||
|
@ -252,13 +252,62 @@ public class ManagedRegionBroker extends RegionBroker {
|
|||
|
||||
@Override
|
||||
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
|
||||
if (consumerSubscriptionMap.containsKey(info)){
|
||||
Subscription sub = consumerSubscriptionMap.get(info);
|
||||
unregisterSubscription(subscriptionMap.get(sub), true);
|
||||
//Find subscriptions quickly by relying on the maps contained in the different Regions
|
||||
//that map consumer ids and subscriptions
|
||||
final Set<Subscription> subscriptions = findSubscriptions(info);
|
||||
|
||||
if (!subscriptions.isEmpty()) {
|
||||
for (Subscription sub : subscriptions) {
|
||||
// unregister all consumer subs
|
||||
unregisterSubscription(subscriptionMap.get(sub), true);
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
//Fall back to old slow approach where we go through the entire subscription map case something went wrong
|
||||
//and no subscriptions were found - should generally not happen
|
||||
for (Subscription sub : subscriptionMap.keySet()) {
|
||||
if (sub.getConsumerInfo().equals(info)) {
|
||||
unregisterSubscription(subscriptionMap.get(sub), true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
super.removeConsumer(context, info);
|
||||
}
|
||||
|
||||
private Set<Subscription> findSubscriptions(final ConsumerInfo info) {
|
||||
final Set<Subscription> subscriptions = new HashSet<>();
|
||||
|
||||
try {
|
||||
if (info.getDestination() != null) {
|
||||
final ActiveMQDestination consumerDest = info.getDestination();
|
||||
//If it's composite then go through and find the subscription for every dest in case different
|
||||
if (consumerDest.isComposite()) {
|
||||
ActiveMQDestination[] destinations = consumerDest.getCompositeDestinations();
|
||||
for (ActiveMQDestination destination : destinations) {
|
||||
addSubscriptionToList(subscriptions, info.getConsumerId(), destination);
|
||||
}
|
||||
} else {
|
||||
//This is the case for a non-composite destination which would be most of the time
|
||||
addSubscriptionToList(subscriptions, info.getConsumerId(), info.getDestination());
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Error finding subscription {}: {}", info, e.getMessage());
|
||||
}
|
||||
|
||||
return subscriptions;
|
||||
}
|
||||
|
||||
private void addSubscriptionToList(Set<Subscription> subscriptions,
|
||||
ConsumerId consumerId, ActiveMQDestination dest) throws JMSException {
|
||||
final Subscription matchingSub = ((AbstractRegion) this.getRegion(dest))
|
||||
.getSubscriptions().get(consumerId);
|
||||
if (matchingSub != null) {
|
||||
subscriptions.add(matchingSub);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
|
||||
super.addProducer(context, info);
|
||||
|
@ -296,7 +345,6 @@ public class ManagedRegionBroker extends RegionBroker {
|
|||
|
||||
public void unregisterSubscription(Subscription sub) {
|
||||
ObjectName name = subscriptionMap.remove(sub);
|
||||
consumerSubscriptionMap.remove(sub.getConsumerInfo());
|
||||
if (name != null) {
|
||||
try {
|
||||
SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
|
||||
|
|
|
@ -0,0 +1,214 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.jmx;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.Topic;
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.MBeanServerInvocationHandler;
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.management.ObjectName;
|
||||
import junit.framework.Test;
|
||||
import junit.textui.TestRunner;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.Subscription;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
public class JmxConsumerRemovalTest extends EmbeddedBrokerTestSupport {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(JmxConsumerRemovalTest.class);
|
||||
|
||||
protected MBeanServer mbeanServer;
|
||||
protected ManagedRegionBroker regionBroker;
|
||||
protected Session session;
|
||||
protected String clientID = "foo";
|
||||
|
||||
protected Connection connection;
|
||||
protected boolean transacted;
|
||||
|
||||
public static void main(String[] args) {
|
||||
TestRunner.run(JmxConsumerRemovalTest.class);
|
||||
}
|
||||
|
||||
public static Test suite() {
|
||||
return suite(JmxConsumerRemovalTest.class);
|
||||
}
|
||||
|
||||
public void testCompositeDestConsumerRemoval() throws Exception {
|
||||
Map<Subscription, ObjectName> subscriptionMap = getSubscriptionMap();
|
||||
int consumersToAdd = 1000;
|
||||
Set<MessageConsumer> consumers = new HashSet<>();
|
||||
|
||||
final ActiveMQDestination dest = new ActiveMQQueue("test");
|
||||
dest.setCompositeDestinations(new ActiveMQDestination[]{new ActiveMQQueue("test1"),
|
||||
new ActiveMQQueue("test2"), new ActiveMQQueue("test3")});
|
||||
|
||||
for (int i = 0; i < consumersToAdd; i++) {
|
||||
consumers.add(session.createConsumer(dest));
|
||||
}
|
||||
|
||||
//Create a lot of consumers and make sure they are all tracked in ManagedRegionBroker map
|
||||
assertTrue(Wait.waitFor(() -> consumersToAdd == subscriptionMap.size(), 5000, 500));
|
||||
|
||||
for (MessageConsumer consumer : consumers) {
|
||||
consumer.close();
|
||||
}
|
||||
|
||||
//Make sure map removed all consumers after close
|
||||
assertTrue(Wait.waitFor(() -> 0 == subscriptionMap.size(), 5000, 500));
|
||||
assertTrue(Wait.waitFor(() -> 0 == regionBroker.getQueueSubscribers().length +
|
||||
regionBroker.getTopicSubscribers().length, 5000, 500));
|
||||
}
|
||||
|
||||
public void testDurableConsumerRemoval() throws Exception {
|
||||
testDurableConsumerRemoval(new ActiveMQTopic("wildcard.topic.1"));
|
||||
}
|
||||
|
||||
public void testDurableConsumerWildcardRemoval() throws Exception {
|
||||
testDurableConsumerRemoval(new ActiveMQTopic("wildcard.topic.>"));
|
||||
|
||||
}
|
||||
public void testDurableConsumerRemoval(ActiveMQDestination dest) throws Exception {
|
||||
int consumersToAdd = 1000;
|
||||
Set<MessageConsumer> durables = new HashSet<>();
|
||||
|
||||
//Create a lot of durables and then
|
||||
for (int i = 0; i < consumersToAdd; i++) {
|
||||
durables.add(session.createDurableSubscriber((Topic) dest, "sub" + i));
|
||||
}
|
||||
|
||||
//Create a lot of consumers and make sure they are all tracked in ManagedRegionBroker map
|
||||
assertTrue(Wait.waitFor(() -> consumersToAdd == getSubscriptionMap().size(), 5000, 500));
|
||||
|
||||
for (MessageConsumer consumer : durables) {
|
||||
consumer.close();
|
||||
}
|
||||
|
||||
//Make sure map removed all consumers after close
|
||||
assertTrue(Wait.waitFor(() -> 0 == regionBroker.getDurableTopicSubscribers().length, 5000, 500));
|
||||
//Note we can't check the subscription map as the durables still exist, just offline
|
||||
}
|
||||
|
||||
public void testQueueConsumerRemoval() throws Exception {
|
||||
testConsumerRemoval(new ActiveMQQueue("wildcard.queue.1"));
|
||||
}
|
||||
|
||||
public void testQueueConsumerRemovalWildcard() throws Exception {
|
||||
testConsumerRemoval(new ActiveMQQueue("wildcard.queue.>"));
|
||||
}
|
||||
|
||||
public void testTopicConsumerRemoval() throws Exception {
|
||||
testConsumerRemoval(new ActiveMQTopic("wildcard.topic.1"));
|
||||
}
|
||||
|
||||
public void testTopicConsumerRemovalWildcard() throws Exception {
|
||||
testConsumerRemoval(new ActiveMQTopic("wildcard.topic.>"));
|
||||
}
|
||||
|
||||
private void testConsumerRemoval(ActiveMQDestination dest) throws Exception {
|
||||
Map<Subscription, ObjectName> subscriptionMap = getSubscriptionMap();
|
||||
int consumersToAdd = 1000;
|
||||
Set<MessageConsumer> consumers = new HashSet<>();
|
||||
|
||||
for (int i = 0; i < consumersToAdd; i++) {
|
||||
consumers.add(session.createConsumer(dest));
|
||||
}
|
||||
|
||||
//Create a lot of consumers and make sure they are all tracked in ManagedRegionBroker map
|
||||
assertTrue(Wait.waitFor(() -> consumersToAdd == subscriptionMap.size(), 5000, 500));
|
||||
|
||||
for (MessageConsumer consumer : consumers) {
|
||||
consumer.close();
|
||||
}
|
||||
|
||||
//Make sure map removed all consumers after close
|
||||
assertTrue(Wait.waitFor(() -> 0 == subscriptionMap.size(), 5000, 500));
|
||||
assertTrue(Wait.waitFor(() -> 0 == regionBroker.getQueueSubscribers().length &&
|
||||
0 == regionBroker.getTopicSubscribers().length, 5000, 500));
|
||||
}
|
||||
|
||||
private Map<Subscription, ObjectName> getSubscriptionMap() throws Exception {
|
||||
ManagedRegionBroker regionBroker = (ManagedRegionBroker) broker.getBroker().getAdaptor(ManagedRegionBroker.class);
|
||||
Field subMapField = ManagedRegionBroker.class.getDeclaredField("subscriptionMap");
|
||||
subMapField.setAccessible(true);
|
||||
return (Map<Subscription, ObjectName>) subMapField.get(regionBroker);
|
||||
}
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
bindAddress = "tcp://localhost:0";
|
||||
useTopic = true;
|
||||
super.setUp();
|
||||
mbeanServer = broker.getManagementContext().getMBeanServer();
|
||||
regionBroker = (ManagedRegionBroker) broker.getBroker().getAdaptor(ManagedRegionBroker.class);
|
||||
((ActiveMQConnectionFactory)connectionFactory).setWatchTopicAdvisories(false);
|
||||
connection = connectionFactory.createConnection();
|
||||
connection.setClientID(clientID);
|
||||
connection.start();
|
||||
session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
connection = null;
|
||||
}
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService answer = new BrokerService();
|
||||
answer.setUseJmx(true);
|
||||
answer.addConnector(bindAddress);
|
||||
answer.deleteAllMessages();
|
||||
return answer;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ConnectionFactory createConnectionFactory() throws Exception {
|
||||
return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
|
||||
}
|
||||
|
||||
protected void echo(String text) {
|
||||
LOG.info(text);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the name of the destination used in this test case
|
||||
*/
|
||||
protected String getDestinationString() {
|
||||
return getClass().getName() + "." + getName(true);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue