Move option ignoreNetworkConsumers up to base AbortSlowConsumerStrategy
so it can be used for both the original version and the slow ack aware
version.
This commit is contained in:
Timothy Bish 2015-04-20 15:20:33 -04:00
parent a65ac586c2
commit a0835c2c21
5 changed files with 53 additions and 68 deletions

View File

@ -44,7 +44,6 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy {
private final Map<String, Destination> destinations = new ConcurrentHashMap<String, Destination>(); private final Map<String, Destination> destinations = new ConcurrentHashMap<String, Destination>();
private long maxTimeSinceLastAck = 30*1000; private long maxTimeSinceLastAck = 30*1000;
private boolean ignoreIdleConsumers = true; private boolean ignoreIdleConsumers = true;
private boolean ignoreNetworkConsumers = true;
public AbortSlowAckConsumerStrategy() { public AbortSlowAckConsumerStrategy() {
this.name = "AbortSlowAckConsumerStrategy@" + hashCode(); this.name = "AbortSlowAckConsumerStrategy@" + hashCode();
@ -215,34 +214,4 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy {
public void setIgnoreIdleConsumers(boolean ignoreIdleConsumers) { public void setIgnoreIdleConsumers(boolean ignoreIdleConsumers) {
this.ignoreIdleConsumers = ignoreIdleConsumers; this.ignoreIdleConsumers = ignoreIdleConsumers;
} }
/**
* Returns whether the strategy is configured to ignore subscriptions that are from a network
* connection.
*
* @return true if the strategy will ignore network connection subscriptions when looking
* for slow consumers.
*/
public boolean isIgnoreNetworkSubscriptions() {
return ignoreNetworkConsumers;
}
/**
* Sets whether the strategy is configured to ignore consumers that are part of a network
* connection to another broker.
*
* When configured to not ignore idle consumers this strategy acts not only on consumers
* that are actually slow but also on any consumer that has not received any messages for
* the maxTimeSinceLastAck. This allows for a way to evict idle consumers while also
* aborting slow consumers however for a network subscription this can create a lot of
* unnecessary churn and if the abort connection option is also enabled this can result
* in the entire network connection being torn down and rebuilt for no reason.
*
* @param ignoreNetworkConsumers
* Should this strategy ignore subscriptions made by a network connector.
*/
public void setIgnoreNetworkConsumers(boolean ignoreNetworkConsumers) {
this.ignoreNetworkConsumers = ignoreNetworkConsumers;
}
} }

View File

@ -57,6 +57,7 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
private long maxSlowDuration = 30*1000; private long maxSlowDuration = 30*1000;
private long checkPeriod = 30*1000; private long checkPeriod = 30*1000;
private boolean abortConnection = false; private boolean abortConnection = false;
private boolean ignoreNetworkConsumers = true;
@Override @Override
public void setBrokerService(Broker broker) { public void setBrokerService(Broker broker) {
@ -94,6 +95,14 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
HashMap<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>(); HashMap<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>();
for (Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) { for (Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) {
Subscription subscription = entry.getKey();
if (isIgnoreNetworkSubscriptions() && subscription.getConsumerInfo().isNetworkSubscription()) {
if (slowConsumers.remove(subscription) != null) {
LOG.info("network sub: {} is no longer slow", subscription.getConsumerInfo().getConsumerId());
}
continue;
}
if (entry.getKey().isSlowConsumer()) { if (entry.getKey().isSlowConsumer()) {
if (maxSlowDuration > 0 && (entry.getValue().markCount * checkPeriod >= maxSlowDuration) if (maxSlowDuration > 0 && (entry.getValue().markCount * checkPeriod >= maxSlowDuration)
|| maxSlowCount > 0 && entry.getValue().slowCount >= maxSlowCount) { || maxSlowCount > 0 && entry.getValue().slowCount >= maxSlowCount) {
@ -269,6 +278,35 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
this.abortConnection = abortConnection; this.abortConnection = abortConnection;
} }
/**
* Returns whether the strategy is configured to ignore subscriptions that are from a network
* connection.
*
* @return true if the strategy will ignore network connection subscriptions when looking
* for slow consumers.
*/
public boolean isIgnoreNetworkSubscriptions() {
return ignoreNetworkConsumers;
}
/**
* Sets whether the strategy is configured to ignore consumers that are part of a network
* connection to another broker.
*
* When configured to not ignore idle consumers this strategy acts not only on consumers
* that are actually slow but also on any consumer that has not received any messages for
* the maxTimeSinceLastAck. This allows for a way to evict idle consumers while also
* aborting slow consumers however for a network subscription this can create a lot of
* unnecessary churn and if the abort connection option is also enabled this can result
* in the entire network connection being torn down and rebuilt for no reason.
*
* @param ignoreNetworkConsumers
* Should this strategy ignore subscriptions made by a network connector.
*/
public void setIgnoreNetworkConsumers(boolean ignoreNetworkConsumers) {
this.ignoreNetworkConsumers = ignoreNetworkConsumers;
}
public void setName(String name) { public void setName(String name) {
this.name = name; this.name = name;
} }

View File

@ -34,17 +34,13 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.BlockJUnit4ClassRunner;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(value = Parameterized.class) @RunWith(value = Parameterized.class)
public class AbortSlowAckConsumer0Test extends AbortSlowConsumer0Test { public class AbortSlowAckConsumer0Test extends AbortSlowConsumer0Test {
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumer0Test.class);
protected long maxTimeSinceLastAck = 5 * 1000;
AbortSlowAckConsumerStrategy strategy; protected long maxTimeSinceLastAck = 5 * 1000;
protected AbortSlowAckConsumerStrategy strategy;
public AbortSlowAckConsumer0Test(Boolean isTopic) { public AbortSlowAckConsumer0Test(Boolean isTopic) {
super(isTopic); super(isTopic);

View File

@ -16,11 +16,17 @@
*/ */
package org.apache.activemq.broker.policy; package org.apache.activemq.broker.policy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.lang.reflect.UndeclaredThrowableException; import java.lang.reflect.UndeclaredThrowableException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -30,6 +36,7 @@ import javax.management.InstanceNotFoundException;
import javax.management.ObjectName; import javax.management.ObjectName;
import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularData;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer; import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.ActiveMQPrefetchPolicy;
@ -44,15 +51,10 @@ import org.apache.activemq.util.SocketProxy;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.BlockJUnit4ClassRunner;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
@RunWith(value = Parameterized.class) @RunWith(value = Parameterized.class)
public class AbortSlowConsumer0Test extends AbortSlowConsumerBase { public class AbortSlowConsumer0Test extends AbortSlowConsumerBase {

View File

@ -16,40 +16,21 @@
*/ */
package org.apache.activemq.broker.policy; package org.apache.activemq.broker.policy;
import junit.framework.Test; import java.util.ArrayList;
import java.util.List;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import org.apache.activemq.JmsMultipleClientsTestSupport; import org.apache.activemq.JmsMultipleClientsTestSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.MessageIdList;
import org.junit.Before; import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.management.InstanceNotFoundException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
public class AbortSlowConsumerBase extends JmsMultipleClientsTestSupport implements ExceptionListener { public class AbortSlowConsumerBase extends JmsMultipleClientsTestSupport implements ExceptionListener {
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerBase.class);
protected AbortSlowConsumerStrategy underTest; protected AbortSlowConsumerStrategy underTest;
protected boolean abortConnection = false; protected boolean abortConnection = false;
protected long checkPeriod = 2 * 1000; protected long checkPeriod = 2 * 1000;
@ -92,5 +73,4 @@ public class AbortSlowConsumerBase extends JmsMultipleClientsTestSupport impleme
exceptions.add(exception); exceptions.add(exception);
exception.printStackTrace(); exception.printStackTrace();
} }
} }