diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java index 811839d2cf..1bbca520d4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java @@ -44,7 +44,6 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy { private final Map destinations = new ConcurrentHashMap(); private long maxTimeSinceLastAck = 30*1000; private boolean ignoreIdleConsumers = true; - private boolean ignoreNetworkConsumers = true; public AbortSlowAckConsumerStrategy() { this.name = "AbortSlowAckConsumerStrategy@" + hashCode(); @@ -215,34 +214,4 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy { public void setIgnoreIdleConsumers(boolean ignoreIdleConsumers) { this.ignoreIdleConsumers = ignoreIdleConsumers; } - - /** - * Returns whether the strategy is configured to ignore subscriptions that are from a network - * connection. - * - * @return true if the strategy will ignore network connection subscriptions when looking - * for slow consumers. - */ - public boolean isIgnoreNetworkSubscriptions() { - return ignoreNetworkConsumers; - } - - /** - * Sets whether the strategy is configured to ignore consumers that are part of a network - * connection to another broker. - * - * When configured to not ignore idle consumers this strategy acts not only on consumers - * that are actually slow but also on any consumer that has not received any messages for - * the maxTimeSinceLastAck. This allows for a way to evict idle consumers while also - * aborting slow consumers however for a network subscription this can create a lot of - * unnecessary churn and if the abort connection option is also enabled this can result - * in the entire network connection being torn down and rebuilt for no reason. - * - * @param ignoreNetworkConsumers - * Should this strategy ignore subscriptions made by a network connector. - */ - public void setIgnoreNetworkConsumers(boolean ignoreNetworkConsumers) { - this.ignoreNetworkConsumers = ignoreNetworkConsumers; - } - } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java index fe6ba44e88..62d583f501 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java @@ -57,6 +57,7 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable private long maxSlowDuration = 30*1000; private long checkPeriod = 30*1000; private boolean abortConnection = false; + private boolean ignoreNetworkConsumers = true; @Override public void setBrokerService(Broker broker) { @@ -94,6 +95,14 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable HashMap toAbort = new HashMap(); for (Entry entry : slowConsumers.entrySet()) { + Subscription subscription = entry.getKey(); + if (isIgnoreNetworkSubscriptions() && subscription.getConsumerInfo().isNetworkSubscription()) { + if (slowConsumers.remove(subscription) != null) { + LOG.info("network sub: {} is no longer slow", subscription.getConsumerInfo().getConsumerId()); + } + continue; + } + if (entry.getKey().isSlowConsumer()) { if (maxSlowDuration > 0 && (entry.getValue().markCount * checkPeriod >= maxSlowDuration) || maxSlowCount > 0 && entry.getValue().slowCount >= maxSlowCount) { @@ -269,6 +278,35 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable this.abortConnection = abortConnection; } + /** + * Returns whether the strategy is configured to ignore subscriptions that are from a network + * connection. + * + * @return true if the strategy will ignore network connection subscriptions when looking + * for slow consumers. + */ + public boolean isIgnoreNetworkSubscriptions() { + return ignoreNetworkConsumers; + } + + /** + * Sets whether the strategy is configured to ignore consumers that are part of a network + * connection to another broker. + * + * When configured to not ignore idle consumers this strategy acts not only on consumers + * that are actually slow but also on any consumer that has not received any messages for + * the maxTimeSinceLastAck. This allows for a way to evict idle consumers while also + * aborting slow consumers however for a network subscription this can create a lot of + * unnecessary churn and if the abort connection option is also enabled this can result + * in the entire network connection being torn down and rebuilt for no reason. + * + * @param ignoreNetworkConsumers + * Should this strategy ignore subscriptions made by a network connector. + */ + public void setIgnoreNetworkConsumers(boolean ignoreNetworkConsumers) { + this.ignoreNetworkConsumers = ignoreNetworkConsumers; + } + public void setName(String name) { this.name = name; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java index 3cfd59545d..92225d71f9 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java @@ -34,17 +34,13 @@ import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.junit.Test; import org.junit.runner.RunWith; -import org.junit.runners.BlockJUnit4ClassRunner; import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @RunWith(value = Parameterized.class) public class AbortSlowAckConsumer0Test extends AbortSlowConsumer0Test { - private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumer0Test.class); - protected long maxTimeSinceLastAck = 5 * 1000; - AbortSlowAckConsumerStrategy strategy; + protected long maxTimeSinceLastAck = 5 * 1000; + protected AbortSlowAckConsumerStrategy strategy; public AbortSlowAckConsumer0Test(Boolean isTopic) { super(isTopic); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java index 9f2344350d..85d37b90ff 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumer0Test.java @@ -16,11 +16,17 @@ */ package org.apache.activemq.broker.policy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.lang.reflect.UndeclaredThrowableException; import java.util.Arrays; import java.util.Collection; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; + import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; @@ -30,6 +36,7 @@ import javax.management.InstanceNotFoundException; import javax.management.ObjectName; import javax.management.openmbean.CompositeData; import javax.management.openmbean.TabularData; + import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQMessageConsumer; import org.apache.activemq.ActiveMQPrefetchPolicy; @@ -44,15 +51,10 @@ import org.apache.activemq.util.SocketProxy; import org.apache.activemq.util.Wait; import org.junit.Test; import org.junit.runner.RunWith; -import org.junit.runners.BlockJUnit4ClassRunner; import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import static org.junit.Assert.*; - - @RunWith(value = Parameterized.class) public class AbortSlowConsumer0Test extends AbortSlowConsumerBase { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java index ee28112dbe..2ae05a0a27 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerBase.java @@ -16,40 +16,21 @@ */ package org.apache.activemq.broker.policy; -import junit.framework.Test; +import java.util.ArrayList; +import java.util.List; + +import javax.jms.ExceptionListener; +import javax.jms.JMSException; + import org.apache.activemq.JmsMultipleClientsTestSupport; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean; -import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.util.MessageIdList; import org.junit.Before; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.Connection; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.management.InstanceNotFoundException; -import javax.management.ObjectName; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.TabularData; -import java.lang.reflect.UndeclaredThrowableException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; - public class AbortSlowConsumerBase extends JmsMultipleClientsTestSupport implements ExceptionListener { - private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerBase.class); - protected AbortSlowConsumerStrategy underTest; protected boolean abortConnection = false; protected long checkPeriod = 2 * 1000; @@ -92,5 +73,4 @@ public class AbortSlowConsumerBase extends JmsMultipleClientsTestSupport impleme exceptions.add(exception); exception.printStackTrace(); } - }