Merge pull request #908 from lucastetreault/AMQ-9107

[AMQ-9107] Remove consumers more efficiently in ManagedRegionBroker
This commit is contained in:
Jean-Baptiste Onofré 2022-10-13 05:58:04 +02:00 committed by GitHub
commit 96a011d4b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 26 additions and 24 deletions

View File

@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Consumer;
import javax.jms.IllegalStateException;
import javax.management.InstanceNotFoundException;
@ -79,23 +80,24 @@ public class ManagedRegionBroker extends RegionBroker {
private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class);
private final ManagementContext managementContext;
private final ObjectName brokerObjectName;
private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<>();
private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<>();
private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<>();
private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<>();
private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<>();
private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<>();
private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<>();
private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<>();
private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<>();
private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<>();
private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<>();
private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<>();
private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<>();
private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<>();
private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<>();
private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<>();
private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<>();
private final Map<ConsumerInfo, Subscription> consumerSubscriptionMap = new ConcurrentHashMap<>();
private final Set<ObjectName> registeredMBeans = new ConcurrentHashMap<>().newKeySet();
/* This is the first broker in the broker interceptor chain. */
private Broker contextBroker;
@ -215,6 +217,7 @@ public class ManagedRegionBroker extends RegionBroker {
registerSubscription(objectName, sub.getConsumerInfo(), key, view);
}
subscriptionMap.put(sub, objectName);
consumerSubscriptionMap.put(sub.getConsumerInfo(), sub);
return objectName;
} catch (Exception e) {
LOG.error("Failed to register subscription {}", sub, e);
@ -249,12 +252,10 @@ public class ManagedRegionBroker extends RegionBroker {
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
for (Subscription sub : subscriptionMap.keySet()) {
if (sub.getConsumerInfo().equals(info)) {
// unregister all consumer subs
if (consumerSubscriptionMap.containsKey(info)){
Subscription sub = consumerSubscriptionMap.get(info);
unregisterSubscription(subscriptionMap.get(sub), true);
}
}
super.removeConsumer(context, info);
}
@ -295,6 +296,7 @@ public class ManagedRegionBroker extends RegionBroker {
public void unregisterSubscription(Subscription sub) {
ObjectName name = subscriptionMap.remove(sub);
consumerSubscriptionMap.remove(sub.getConsumerInfo());
if (name != null) {
try {
SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
@ -484,7 +486,7 @@ public class ManagedRegionBroker extends RegionBroker {
}
protected void buildExistingSubscriptions() throws Exception {
Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<>();
Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
if (destinations != null) {
for (ActiveMQDestination dest : destinations) {
@ -609,7 +611,7 @@ public class ManagedRegionBroker extends RegionBroker {
}
private ObjectName[] onlyNonSuppressed (Set<ObjectName> set){
List<ObjectName> nonSuppressed = new ArrayList<ObjectName>();
List<ObjectName> nonSuppressed = new ArrayList<>();
for(ObjectName key : set){
if (managementContext.isAllowedToRegister(key)){
nonSuppressed.add(key);