AMQ-7079 AMQ-7077 AMQ-6421 - check for consumers that have been destroyed

This commit is contained in:
gtully 2018-10-18 16:49:29 +01:00
parent b9c8f6228c
commit 9dd751149f
2 changed files with 52 additions and 13 deletions

View File

@ -16,10 +16,7 @@
*/
package org.apache.activemq.broker.region.policy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@ -74,12 +71,22 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy {
return;
}
if (getMaxSlowDuration() > 0) {
// For subscriptions that are already slow we mark them again and check below if
// they've exceeded their configured lifetime.
for (SlowConsumerEntry entry : slowConsumers.values()) {
entry.mark();
List<Subscription> subscribersDestroyed = new LinkedList<Subscription>();
// check for removed consumers also
for (Map.Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) {
if (getMaxSlowDuration() > 0) {
// For subscriptions that are already slow we mark them again and check below if
// they've exceeded their configured lifetime.
entry.getValue().mark();
}
if (!entry.getKey().isSlowConsumer()) {
subscribersDestroyed.add(entry.getKey());
}
}
for (Subscription subscription: subscribersDestroyed) {
slowConsumers.remove(subscription);
}
List<Destination> disposed = new ArrayList<Destination>();
@ -134,7 +141,7 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy {
if (!abstractSubscription.isSlowConsumer()) {
abstractSubscription.setSlowConsumer(true);
for (Destination destination: abstractSubscription.getDestinations()) {
// destination.slowConsumer(broker.getAdminConnectionContext(), abstractSubscription);
destination.slowConsumer(broker.getAdminConnectionContext(), abstractSubscription);
}
}
}

View File

@ -20,10 +20,13 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -31,9 +34,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.openmbean.TabularData;
import java.net.URI;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class AMQ7077Test {
@ -52,7 +59,8 @@ public class AMQ7077Test {
AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy();
strategy.setCheckPeriod(500);
strategy.setMaxTimeSinceLastAck(1000);
strategy.setMaxSlowCount(2);
strategy.setMaxSlowDuration(0);
strategy.setMaxSlowCount(4);
strategy.setIgnoreIdleConsumers(false);
return strategy;
}
@ -68,7 +76,6 @@ public class AMQ7077Test {
policy.setAdvisoryForSlowConsumers(true);
PolicyMap pMap = new PolicyMap();
pMap.put(new ActiveMQQueue(">"), policy);
brokerService.setUseJmx(false);
brokerService.setDestinationPolicy(pMap);
brokerService.addConnector("tcp://0.0.0.0:0");
brokerService.start();
@ -84,16 +91,41 @@ public class AMQ7077Test {
Destination destination = session.createQueue("DD");
MessageConsumer advisoryConsumer = session.createConsumer(AdvisorySupport.getSlowConsumerAdvisoryTopic(destination));
MessageConsumer consumer = session.createConsumer(destination);
// will be idle and can get removed but will be marked slow and now produce an advisory
MessageConsumer advisoryConsumer = session.createConsumer(AdvisorySupport.getSlowConsumerAdvisoryTopic(destination));
Message message = advisoryConsumer.receive(10000);
if (message == null) {
message = advisoryConsumer.receive(2000);
}
assertNotNull("Got advisory", message);
connection.close();
QueueViewMBean queue = getProxyToQueue(((Queue) destination).getQueueName());
ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy();
assertNotNull(slowConsumerPolicyMBeanName);
AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean)
brokerService.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName, AbortSlowConsumerStrategyViewMBean.class, true);
assertTrue("slow list is gone on remove", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
TabularData slowOnes = abortPolicy.getSlowConsumers();
LOG.info("slow ones:" + slowOnes);
return slowOnes.size() == 0;
}
}));
}
protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
return proxy;
}
@After