From 9dd751149f7489f99d430d3f1240f2bfa2e70c69 Mon Sep 17 00:00:00 2001 From: gtully Date: Thu, 18 Oct 2018 16:49:29 +0100 Subject: [PATCH] AMQ-7079 AMQ-7077 AMQ-6421 - check for consumers that have been destroyed --- .../policy/AbortSlowAckConsumerStrategy.java | 27 ++++++++----- .../org/apache/activemq/bugs/AMQ7077Test.java | 38 +++++++++++++++++-- 2 files changed, 52 insertions(+), 13 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java index 063fdecfd3..edb0cef956 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java @@ -16,10 +16,7 @@ */ package org.apache.activemq.broker.region.policy; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; @@ -74,12 +71,22 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy { return; } - if (getMaxSlowDuration() > 0) { - // For subscriptions that are already slow we mark them again and check below if - // they've exceeded their configured lifetime. - for (SlowConsumerEntry entry : slowConsumers.values()) { - entry.mark(); + + List subscribersDestroyed = new LinkedList(); + // check for removed consumers also + for (Map.Entry entry : slowConsumers.entrySet()) { + if (getMaxSlowDuration() > 0) { + // For subscriptions that are already slow we mark them again and check below if + // they've exceeded their configured lifetime. + entry.getValue().mark(); } + if (!entry.getKey().isSlowConsumer()) { + subscribersDestroyed.add(entry.getKey()); + } + } + + for (Subscription subscription: subscribersDestroyed) { + slowConsumers.remove(subscription); } List disposed = new ArrayList(); @@ -134,7 +141,7 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy { if (!abstractSubscription.isSlowConsumer()) { abstractSubscription.setSlowConsumer(true); for (Destination destination: abstractSubscription.getDestinations()) { - // destination.slowConsumer(broker.getAdminConnectionContext(), abstractSubscription); + destination.slowConsumer(broker.getAdminConnectionContext(), abstractSubscription); } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7077Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7077Test.java index 8b41a14b27..67d478f464 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7077Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7077Test.java @@ -20,10 +20,13 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean; +import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.util.Wait; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -31,9 +34,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.*; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.openmbean.TabularData; import java.net.URI; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class AMQ7077Test { @@ -52,7 +59,8 @@ public class AMQ7077Test { AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy(); strategy.setCheckPeriod(500); strategy.setMaxTimeSinceLastAck(1000); - strategy.setMaxSlowCount(2); + strategy.setMaxSlowDuration(0); + strategy.setMaxSlowCount(4); strategy.setIgnoreIdleConsumers(false); return strategy; } @@ -68,7 +76,6 @@ public class AMQ7077Test { policy.setAdvisoryForSlowConsumers(true); PolicyMap pMap = new PolicyMap(); pMap.put(new ActiveMQQueue(">"), policy); - brokerService.setUseJmx(false); brokerService.setDestinationPolicy(pMap); brokerService.addConnector("tcp://0.0.0.0:0"); brokerService.start(); @@ -84,16 +91,41 @@ public class AMQ7077Test { Destination destination = session.createQueue("DD"); + MessageConsumer advisoryConsumer = session.createConsumer(AdvisorySupport.getSlowConsumerAdvisoryTopic(destination)); + MessageConsumer consumer = session.createConsumer(destination); // will be idle and can get removed but will be marked slow and now produce an advisory - MessageConsumer advisoryConsumer = session.createConsumer(AdvisorySupport.getSlowConsumerAdvisoryTopic(destination)); Message message = advisoryConsumer.receive(10000); if (message == null) { message = advisoryConsumer.receive(2000); } assertNotNull("Got advisory", message); connection.close(); + + QueueViewMBean queue = getProxyToQueue(((Queue) destination).getQueueName()); + ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy(); + assertNotNull(slowConsumerPolicyMBeanName); + + AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean) + brokerService.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName, AbortSlowConsumerStrategyViewMBean.class, true); + + assertTrue("slow list is gone on remove", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + TabularData slowOnes = abortPolicy.getSlowConsumers(); + LOG.info("slow ones:" + slowOnes); + return slowOnes.size() == 0; + } + })); + + } + + protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException { + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name); + QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext() + .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + return proxy; } @After