From 4dee173962a08dbcb88e33f61ec281209c0b9156 Mon Sep 17 00:00:00 2001 From: gtully Date: Wed, 17 Oct 2018 11:04:47 +0100 Subject: [PATCH] AMQ-7077 AMQ-6421 - tie queue consumer slow flag detection to abort slow consumer policy, fire advisory when set. --- .../amqp/interop/AmqpSlowReceiverTest.java | 11 +- .../broker/region/AbstractSubscription.java | 3 + .../broker/region/PrefetchSubscription.java | 2 +- .../policy/AbortSlowAckConsumerStrategy.java | 10 ++ .../policy/AbortSlowAckConsumer0Test.java | 10 +- .../policy/AbortSlowAckConsumer1Test.java | 12 ++ .../policy/AbortSlowAckConsumer2Test.java | 9 ++ .../broker/policy/AbortSlowConsumer0Test.java | 18 ++- .../broker/policy/AbortSlowConsumer1Test.java | 4 +- .../broker/policy/AbortSlowConsumer2Test.java | 2 +- .../broker/policy/AbortSlowConsumerBase.java | 1 - .../org/apache/activemq/bugs/AMQ5844Test.java | 4 +- .../org/apache/activemq/bugs/AMQ7077Test.java | 104 ++++++++++++++++++ 13 files changed, 167 insertions(+), 23 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7077Test.java diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSlowReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSlowReceiverTest.java index 86626a8e70..d064af8369 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSlowReceiverTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSlowReceiverTest.java @@ -32,7 +32,7 @@ import javax.management.openmbean.TabularData; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; -import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; +import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.transport.amqp.client.AmqpClient; @@ -45,14 +45,14 @@ import org.apache.activemq.util.Wait; import org.junit.Test; /** - * Test the handling of consumer abort when the AbortSlowConsumerStrategy is used. + * Test the handling of consumer abort when the AbortSlowAckConsumerStrategy is used. */ public class AmqpSlowReceiverTest extends AmqpClientTestSupport { private final long DEFAULT_CHECK_PERIOD = 1000; private final long DEFAULT_MAX_SLOW_DURATION = 3000; - private AbortSlowConsumerStrategy strategy; + private AbortSlowAckConsumerStrategy strategy; @Test(timeout = 60 * 1000) public void testSlowConsumerIsAborted() throws Exception { @@ -105,7 +105,7 @@ public class AmqpSlowReceiverTest extends AmqpClientTestSupport { AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean) brokerService.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName, AbortSlowConsumerStrategyViewMBean.class, true); - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(6); TabularData slowOnes = abortPolicy.getSlowConsumers(); assertEquals("one slow consumers", 1, slowOnes.size()); @@ -148,10 +148,11 @@ public class AmqpSlowReceiverTest extends AmqpClientTestSupport { @Override protected void performAdditionalConfiguration(BrokerService brokerService) throws Exception { - strategy = new AbortSlowConsumerStrategy(); + strategy = new AbortSlowAckConsumerStrategy(); strategy.setAbortConnection(false); strategy.setCheckPeriod(DEFAULT_CHECK_PERIOD); strategy.setMaxSlowDuration(DEFAULT_MAX_SLOW_DURATION); + strategy.setMaxTimeSinceLastAck(DEFAULT_MAX_SLOW_DURATION); PolicyEntry policy = new PolicyEntry(); policy.setSlowConsumerStrategy(strategy); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java index fae9c7c45c..9f7e8e4908 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java @@ -363,4 +363,7 @@ public abstract class AbstractSubscription implements Subscription { } } + public CopyOnWriteArrayList getDestinations() { + return destinations; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 8f1ac0a360..a265570c94 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -671,7 +671,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { // decrement after dispatch has taken ownership to avoid usage jitter node.decrementReferenceCount(); } - } else if (!isSlowConsumer()) { + } else if (!pending.isEmpty() && !isSlowConsumer()) { setSlowConsumer(true); slowConsumerTargets = destinations; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java index 1bbca520d4..063fdecfd3 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.AbstractSubscription; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Subscription; import org.slf4j.Logger; @@ -128,6 +129,15 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy { LOG.debug("sub: {} is now slow", subscriber.getConsumerInfo().getConsumerId()); SlowConsumerEntry entry = new SlowConsumerEntry(subscriber.getContext()); entry.mark(); // mark consumer on first run + if (subscriber instanceof AbstractSubscription) { + AbstractSubscription abstractSubscription = (AbstractSubscription) subscriber; + if (!abstractSubscription.isSlowConsumer()) { + abstractSubscription.setSlowConsumer(true); + for (Destination destination: abstractSubscription.getDestinations()) { + // destination.slowConsumer(broker.getAdminConnectionContext(), abstractSubscription); + } + } + } slowConsumers.put(subscriber, entry); } else if (getMaxSlowCount() > 0) { slowConsumers.get(subscriber).slow(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java index 92225d71f9..26cf28d74c 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java @@ -19,6 +19,8 @@ package org.apache.activemq.broker.policy; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.TimeUnit; import javax.jms.ConnectionFactory; @@ -42,8 +44,14 @@ public class AbortSlowAckConsumer0Test extends AbortSlowConsumer0Test { protected long maxTimeSinceLastAck = 5 * 1000; protected AbortSlowAckConsumerStrategy strategy; + @Parameterized.Parameters(name = "isTopic({0})") + public static Collection getTestParameters() { + return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}}); + } + public AbortSlowAckConsumer0Test(Boolean isTopic) { - super(isTopic); + super(); + this.topic = isTopic; } @Override diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java index 6d3e970c78..a9ccb447e5 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer1Test.java @@ -26,11 +26,23 @@ import org.apache.activemq.broker.region.policy.PolicyMap; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.util.Arrays; +import java.util.Collection; + @RunWith(value = Parameterized.class) public class AbortSlowAckConsumer1Test extends AbortSlowConsumer1Test { protected long maxTimeSinceLastAck = 5 * 1000; + @Parameterized.Parameters(name = "abortConnection({0})-isTopic({1})") + public static Collection getTestParameters() { + return Arrays.asList(new Object[][]{ + {Boolean.TRUE, Boolean.TRUE}, + {Boolean.FALSE, Boolean.TRUE}, + {Boolean.TRUE, Boolean.FALSE}, + {Boolean.FALSE, Boolean.FALSE}}); + } + public AbortSlowAckConsumer1Test(Boolean abortConnection, Boolean topic) { super(abortConnection, topic); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java index 948613e0c8..b8bdd7cfda 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer2Test.java @@ -26,11 +26,20 @@ import org.apache.activemq.broker.region.policy.PolicyMap; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.util.Arrays; +import java.util.Collection; + @RunWith(value = Parameterized.class) public class AbortSlowAckConsumer2Test extends AbortSlowConsumer2Test { protected long maxTimeSinceLastAck = 5 * 1000; + + @Parameterized.Parameters(name = "isTopic({0})") + public static Collection getTestParameters() { + return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}}); + } + public AbortSlowAckConsumer2Test(Boolean topic) { super(topic); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java index 85d37b90ff..d04594996a 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java @@ -49,24 +49,19 @@ import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.util.MessageIdList; import org.apache.activemq.util.SocketProxy; import org.apache.activemq.util.Wait; +import org.junit.Assume; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@RunWith(value = Parameterized.class) public class AbortSlowConsumer0Test extends AbortSlowConsumerBase { private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumer0Test.class); - @Parameterized.Parameters(name = "isTopic({0})") - public static Collection getTestParameters() { - return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}}); - } - - public AbortSlowConsumer0Test(Boolean isTopic) { - this.topic = isTopic; + public AbortSlowConsumer0Test() { + this.topic = true; } @Test @@ -125,8 +120,11 @@ public class AbortSlowConsumer0Test extends AbortSlowConsumerBase { assertEquals("no slow consumers left", 0, slowOnes.size()); // verify mbean gone with destination - broker.getAdminView().removeTopic(amqDest.getPhysicalName()); - + if (topic) { + broker.getAdminView().removeTopic(amqDest.getPhysicalName()); + } else { + broker.getAdminView().removeQueue(amqDest.getPhysicalName()); + } try { abortPolicy.getSlowConsumers(); fail("expect not found post destination removal"); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java index e17b362cf6..38ed65f2b1 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer1Test.java @@ -42,9 +42,7 @@ public class AbortSlowConsumer1Test extends AbortSlowConsumerBase { public static Collection getTestParameters() { return Arrays.asList(new Object[][]{ {Boolean.TRUE, Boolean.TRUE}, - {Boolean.TRUE, Boolean.FALSE}, - {Boolean.FALSE, Boolean.TRUE}, - {Boolean.FALSE, Boolean.FALSE}}); + {Boolean.FALSE, Boolean.TRUE}}); } public AbortSlowConsumer1Test(Boolean abortConnection, Boolean topic) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java index 72630278cc..051d4cd548 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer2Test.java @@ -31,7 +31,7 @@ public class AbortSlowConsumer2Test extends AbortSlowConsumerBase { @Parameterized.Parameters(name = "isTopic({0})") public static Collection getTestParameters() { - return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}}); + return Arrays.asList(new Object[][]{{Boolean.TRUE}}); } public AbortSlowConsumer2Test(Boolean isTopic) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java index 2ae05a0a27..4c882e22a3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java @@ -41,7 +41,6 @@ public class AbortSlowConsumerBase extends JmsMultipleClientsTestSupport impleme @Before public void setUp() throws Exception { exceptions.clear(); - topic = true; underTest = createSlowConsumerStrategy(); super.setUp(); createDestination(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5844Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5844Test.java index 64851aecab..17db041ac8 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5844Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5844Test.java @@ -20,6 +20,7 @@ import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQMessageConsumer; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -108,12 +109,13 @@ public class AMQ5844Test { broker.setBrokerName("Main"); PolicyEntry policy = new PolicyEntry(); - AbortSlowConsumerStrategy abortSlowConsumerStrategy = new AbortSlowConsumerStrategy(); + AbortSlowAckConsumerStrategy abortSlowConsumerStrategy = new AbortSlowAckConsumerStrategy(); abortSlowConsumerStrategy.setAbortConnection(false); //time in milliseconds between checks for slow subscriptions abortSlowConsumerStrategy.setCheckPeriod(checkPeriod); //time in milliseconds that a sub can remain slow before triggering an abort abortSlowConsumerStrategy.setMaxSlowDuration(maxSlowDuration); + abortSlowConsumerStrategy.setMaxTimeSinceLastAck(maxSlowDuration); policy.setSlowConsumerStrategy(abortSlowConsumerStrategy); policy.setQueuePrefetch(0); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7077Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7077Test.java new file mode 100644 index 0000000000..8b41a14b27 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7077Test.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.*; +import java.net.URI; + +import static org.junit.Assert.assertNotNull; + +public class AMQ7077Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ7077Test.class); + + private BrokerService brokerService; + private String connectionUri; + + protected ConnectionFactory createConnectionFactory() throws Exception { + ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory(connectionUri); + conFactory.setWatchTopicAdvisories(false); + return conFactory; + } + + protected AbortSlowAckConsumerStrategy createSlowConsumerStrategy() { + AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy(); + strategy.setCheckPeriod(500); + strategy.setMaxTimeSinceLastAck(1000); + strategy.setMaxSlowCount(2); + strategy.setIgnoreIdleConsumers(false); + return strategy; + } + + @Before + public void setUp() throws Exception { + brokerService = BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false&useJmx=true")); + PolicyEntry policy = new PolicyEntry(); + + policy.setSlowConsumerStrategy(createSlowConsumerStrategy()); + policy.setQueuePrefetch(10); + policy.setTopicPrefetch(10); + policy.setAdvisoryForSlowConsumers(true); + PolicyMap pMap = new PolicyMap(); + pMap.put(new ActiveMQQueue(">"), policy); + brokerService.setUseJmx(false); + brokerService.setDestinationPolicy(pMap); + brokerService.addConnector("tcp://0.0.0.0:0"); + brokerService.start(); + + connectionUri = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString(); + } + + @Test + public void testAdvisoryOnSlowAckDetection() throws Exception { + Connection connection = createConnectionFactory().createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination destination = session.createQueue("DD"); + + MessageConsumer consumer = session.createConsumer(destination); + // will be idle and can get removed but will be marked slow and now produce an advisory + + MessageConsumer advisoryConsumer = session.createConsumer(AdvisorySupport.getSlowConsumerAdvisoryTopic(destination)); + Message message = advisoryConsumer.receive(10000); + if (message == null) { + message = advisoryConsumer.receive(2000); + } + assertNotNull("Got advisory", message); + connection.close(); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } +}