mirror of https://github.com/apache/activemq.git
https://issues.apache.org/activemq/browse/AMQ-378 - add AbortSlowConsumerStrategy destination policy that will abort consumers that are repeatildy slow or slow for a defined period. Slowness is a product of the prefetch and message production rate.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@946138 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
50daa35a32
commit
1b6d397aab
|
@ -48,6 +48,7 @@ public abstract class AbstractSubscription implements Subscription {
|
||||||
private BooleanExpression selectorExpression;
|
private BooleanExpression selectorExpression;
|
||||||
private ObjectName objectName;
|
private ObjectName objectName;
|
||||||
private int cursorMemoryHighWaterMark = 70;
|
private int cursorMemoryHighWaterMark = 70;
|
||||||
|
private boolean slowConsumer;
|
||||||
|
|
||||||
|
|
||||||
public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
|
public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
|
||||||
|
@ -162,6 +163,14 @@ public abstract class AbstractSubscription implements Subscription {
|
||||||
public boolean isRecoveryRequired() {
|
public boolean isRecoveryRequired() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isSlowConsumer() {
|
||||||
|
return slowConsumer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSlowConsumer(boolean val) {
|
||||||
|
slowConsumer = val;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
|
public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
|
||||||
boolean result = false;
|
boolean result = false;
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.ConnectionContext;
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
import org.apache.activemq.broker.ProducerBrokerExchange;
|
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||||
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
||||||
|
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
import org.apache.activemq.command.Message;
|
import org.apache.activemq.command.Message;
|
||||||
|
@ -81,6 +82,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
|
private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
|
||||||
protected int cursorMemoryHighWaterMark = 70;
|
protected int cursorMemoryHighWaterMark = 70;
|
||||||
protected int storeUsageHighWaterMark = 100;
|
protected int storeUsageHighWaterMark = 100;
|
||||||
|
private SlowConsumerStrategy slowConsumerStrategy;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param broker
|
* @param broker
|
||||||
|
@ -449,6 +451,9 @@ public abstract class BaseDestination implements Destination {
|
||||||
if (advisoryForSlowConsumers) {
|
if (advisoryForSlowConsumers) {
|
||||||
broker.slowConsumer(context, this, subs);
|
broker.slowConsumer(context, this, subs);
|
||||||
}
|
}
|
||||||
|
if (slowConsumerStrategy != null) {
|
||||||
|
slowConsumerStrategy.slowConsumer(context, subs);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -573,5 +578,9 @@ public abstract class BaseDestination implements Destination {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract Log getLog();
|
protected abstract Log getLog();
|
||||||
|
|
||||||
|
public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
|
||||||
|
this.slowConsumerStrategy = slowConsumerStrategy;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -262,6 +262,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
|
||||||
}
|
}
|
||||||
dispatched.clear();
|
dispatched.clear();
|
||||||
}
|
}
|
||||||
|
setSlowConsumer(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -70,8 +70,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
||||||
private final Object pendingLock = new Object();
|
private final Object pendingLock = new Object();
|
||||||
private final Object dispatchLock = new Object();
|
private final Object dispatchLock = new Object();
|
||||||
protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
|
protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
|
||||||
private boolean slowConsumer;
|
|
||||||
|
|
||||||
private CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
|
private CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
|
||||||
|
|
||||||
public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
|
public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
|
||||||
|
@ -565,7 +563,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
||||||
try {
|
try {
|
||||||
int numberToDispatch = countBeforeFull();
|
int numberToDispatch = countBeforeFull();
|
||||||
if (numberToDispatch > 0) {
|
if (numberToDispatch > 0) {
|
||||||
slowConsumer=false;
|
setSlowConsumer(false);
|
||||||
pending.setMaxBatchSize(numberToDispatch);
|
pending.setMaxBatchSize(numberToDispatch);
|
||||||
int count = 0;
|
int count = 0;
|
||||||
pending.reset();
|
pending.reset();
|
||||||
|
@ -598,15 +596,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}else {
|
} else if (!isSlowConsumer()) {
|
||||||
if (!slowConsumer) {
|
setSlowConsumer(true);
|
||||||
slowConsumer=true;
|
for (Destination dest :destinations) {
|
||||||
ConnectionContext c = new ConnectionContext();
|
dest.slowConsumer(context, this);
|
||||||
c.setBroker(context.getBroker());
|
|
||||||
for (Destination dest :destinations) {
|
|
||||||
dest.slowConsumer(c,this);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -103,6 +103,7 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public void destroy() {
|
public void destroy() {
|
||||||
|
setSlowConsumer(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -227,4 +227,6 @@ public interface Subscription extends SubscriptionRecovery {
|
||||||
public int getCursorMemoryHighWaterMark();
|
public int getCursorMemoryHighWaterMark();
|
||||||
|
|
||||||
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
|
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
|
||||||
|
|
||||||
|
boolean isSlowConsumer();
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,8 +62,6 @@ public class TopicSubscription extends AbstractSubscription {
|
||||||
private final AtomicLong enqueueCounter = new AtomicLong(0);
|
private final AtomicLong enqueueCounter = new AtomicLong(0);
|
||||||
private final AtomicLong dequeueCounter = new AtomicLong(0);
|
private final AtomicLong dequeueCounter = new AtomicLong(0);
|
||||||
private int memoryUsageHighWaterMark = 95;
|
private int memoryUsageHighWaterMark = 95;
|
||||||
private boolean slowConsumer;
|
|
||||||
|
|
||||||
// allow duplicate suppression in a ring network of brokers
|
// allow duplicate suppression in a ring network of brokers
|
||||||
protected int maxProducersToAudit = 1024;
|
protected int maxProducersToAudit = 1024;
|
||||||
protected int maxAuditDepth = 1000;
|
protected int maxAuditDepth = 1000;
|
||||||
|
@ -99,11 +97,11 @@ public class TopicSubscription extends AbstractSubscription {
|
||||||
// if maximumPendingMessages is set we will only discard messages which
|
// if maximumPendingMessages is set we will only discard messages which
|
||||||
// have not been dispatched (i.e. we allow the prefetch buffer to be filled)
|
// have not been dispatched (i.e. we allow the prefetch buffer to be filled)
|
||||||
dispatch(node);
|
dispatch(node);
|
||||||
slowConsumer=false;
|
setSlowConsumer(false);
|
||||||
} else {
|
} else {
|
||||||
//we are slow
|
//we are slow
|
||||||
if(!slowConsumer) {
|
if(!isSlowConsumer()) {
|
||||||
slowConsumer=true;
|
setSlowConsumer(true);
|
||||||
for (Destination dest: destinations) {
|
for (Destination dest: destinations) {
|
||||||
dest.slowConsumer(getContext(), this);
|
dest.slowConsumer(getContext(), this);
|
||||||
}
|
}
|
||||||
|
@ -540,6 +538,7 @@ public class TopicSubscription extends AbstractSubscription {
|
||||||
LOG.warn("Failed to destroy cursor", e);
|
LOG.warn("Failed to destroy cursor", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
setSlowConsumer(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getPrefetchSize() {
|
public int getPrefetchSize() {
|
||||||
|
|
|
@ -0,0 +1,179 @@
|
||||||
|
package org.apache.activemq.broker.region.policy;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.Connection;
|
||||||
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
|
import org.apache.activemq.broker.region.Subscription;
|
||||||
|
import org.apache.activemq.command.ConsumerControl;
|
||||||
|
import org.apache.activemq.thread.Scheduler;
|
||||||
|
import org.apache.activemq.transport.InactivityIOException;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Abort slow consumers when they reach the configured threshold of slowness, default is slow for 30 seconds
|
||||||
|
*
|
||||||
|
* @org.apache.xbean.XBean
|
||||||
|
*/
|
||||||
|
public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(AbortSlowConsumerStrategy.class);
|
||||||
|
|
||||||
|
private static final Scheduler scheduler = Scheduler.getInstance();
|
||||||
|
private AtomicBoolean taskStarted = new AtomicBoolean(false);
|
||||||
|
private Map<Subscription, SlowConsumerEntry> slowConsumers = new ConcurrentHashMap<Subscription, SlowConsumerEntry>();
|
||||||
|
|
||||||
|
private long maxSlowCount = -1;
|
||||||
|
private long maxSlowDuration = 30*1000;
|
||||||
|
private long checkPeriod = 30*1000;
|
||||||
|
private boolean abortConnection = false;
|
||||||
|
|
||||||
|
public void slowConsumer(ConnectionContext context, Subscription subs) {
|
||||||
|
if (maxSlowCount < 0 && maxSlowDuration < 0) {
|
||||||
|
// nothing to do
|
||||||
|
LOG.info("no limits set, slowConsumer strategy has nothing to do");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taskStarted.compareAndSet(false, true)) {
|
||||||
|
scheduler.executePeriodically(this, checkPeriod);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!slowConsumers.containsKey(subs)) {
|
||||||
|
slowConsumers.put(subs, new SlowConsumerEntry(context));
|
||||||
|
} else if (maxSlowCount > 0) {
|
||||||
|
slowConsumers.get(subs).slow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void run() {
|
||||||
|
if (maxSlowDuration > 0) {
|
||||||
|
// mark
|
||||||
|
for (SlowConsumerEntry entry : slowConsumers.values()) {
|
||||||
|
entry.mark();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
HashMap<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>();
|
||||||
|
for (Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) {
|
||||||
|
if (entry.getKey().isSlowConsumer()) {
|
||||||
|
if (maxSlowDuration > 0 && (entry.getValue().markCount * checkPeriod > maxSlowDuration)
|
||||||
|
|| maxSlowCount > 0 && entry.getValue().slowCount > maxSlowCount) {
|
||||||
|
toAbort.put(entry.getKey(), entry.getValue());
|
||||||
|
slowConsumers.remove(entry.getKey());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG.info("sub: " + entry.getKey().getConsumerInfo().getConsumerId() + " is no longer slow");
|
||||||
|
slowConsumers.remove(entry.getKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (final Entry<Subscription, SlowConsumerEntry> entry : toAbort.entrySet()) {
|
||||||
|
ConnectionContext connectionContext = entry.getValue().context;
|
||||||
|
if (connectionContext!= null) {
|
||||||
|
try {
|
||||||
|
LOG.info("aborting "
|
||||||
|
+ (abortConnection ? "connection" : "consumer")
|
||||||
|
+ ", slow consumer: " + entry.getKey().getConsumerInfo().getConsumerId());
|
||||||
|
|
||||||
|
final Connection connection = connectionContext.getConnection();
|
||||||
|
if (connection != null) {
|
||||||
|
if (abortConnection) {
|
||||||
|
scheduler.executeAfterDelay(new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
connection.serviceException(new InactivityIOException("Consumer was slow too often (>"
|
||||||
|
+ maxSlowCount + ") or too long (>"
|
||||||
|
+ maxSlowDuration + "): " + entry.getKey().getConsumerInfo().getConsumerId()));
|
||||||
|
}}, 0l);
|
||||||
|
} else {
|
||||||
|
// just abort the consumer by telling it to stop
|
||||||
|
ConsumerControl stopConsumer = new ConsumerControl();
|
||||||
|
stopConsumer.setConsumerId(entry.getKey().getConsumerInfo().getConsumerId());
|
||||||
|
stopConsumer.setClose(true);
|
||||||
|
connection.dispatchAsync(stopConsumer);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG.debug("slowConsumer abort ignored, no connection in context:" + connectionContext);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.info("exception on stopping "
|
||||||
|
+ (abortConnection ? "connection" : "consumer")
|
||||||
|
+ " to abort slow consumer: " + entry.getKey(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMaxSlowCount() {
|
||||||
|
return maxSlowCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* number of times a subscription can be deemed slow before triggering abort
|
||||||
|
* effect depends on dispatch rate as slow determination is done on dispatch
|
||||||
|
*/
|
||||||
|
public void setMaxSlowCount(int maxSlowCount) {
|
||||||
|
this.maxSlowCount = maxSlowCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMaxSlowDuration() {
|
||||||
|
return maxSlowDuration;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* time in milliseconds that a sub can remain slow before triggering
|
||||||
|
* an abort.
|
||||||
|
* @param maxSlowDuration
|
||||||
|
*/
|
||||||
|
public void setMaxSlowDuration(long maxSlowDuration) {
|
||||||
|
this.maxSlowDuration = maxSlowDuration;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getCheckPeriod() {
|
||||||
|
return checkPeriod;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* time in milliseconds between checks for slow subscriptions
|
||||||
|
* @param checkPeriod
|
||||||
|
*/
|
||||||
|
public void setCheckPeriod(long checkPeriod) {
|
||||||
|
this.checkPeriod = checkPeriod;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isAbortConnection() {
|
||||||
|
return abortConnection;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* abort the consumers connection rather than sending a stop command to the remote consumer
|
||||||
|
* @param abortConnection
|
||||||
|
*/
|
||||||
|
public void setAbortConnection(boolean abortConnection) {
|
||||||
|
this.abortConnection = abortConnection;
|
||||||
|
}
|
||||||
|
|
||||||
|
static class SlowConsumerEntry {
|
||||||
|
|
||||||
|
final ConnectionContext context;
|
||||||
|
int slowCount = 1;
|
||||||
|
int markCount = 0;
|
||||||
|
|
||||||
|
SlowConsumerEntry(ConnectionContext context) {
|
||||||
|
this.context = context;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void slow() {
|
||||||
|
slowCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void mark() {
|
||||||
|
markCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -86,6 +86,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
||||||
private boolean usePrefetchExtension = true;
|
private boolean usePrefetchExtension = true;
|
||||||
private int cursorMemoryHighWaterMark = 70;
|
private int cursorMemoryHighWaterMark = 70;
|
||||||
private int storeUsageHighWaterMark = 100;
|
private int storeUsageHighWaterMark = 100;
|
||||||
|
private SlowConsumerStrategy slowConsumerStrategy;
|
||||||
|
|
||||||
|
|
||||||
public void configure(Broker broker,Queue queue) {
|
public void configure(Broker broker,Queue queue) {
|
||||||
|
@ -147,6 +148,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
||||||
destination.setMaxExpirePageSize(getMaxExpirePageSize());
|
destination.setMaxExpirePageSize(getMaxExpirePageSize());
|
||||||
destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
|
destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
|
||||||
destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark());
|
destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark());
|
||||||
|
destination.setSlowConsumerStrategy(getSlowConsumerStrategy());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
|
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
|
||||||
|
@ -724,4 +726,12 @@ public class PolicyEntry extends DestinationMapEntry {
|
||||||
return storeUsageHighWaterMark;
|
return storeUsageHighWaterMark;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
|
||||||
|
this.slowConsumerStrategy = slowConsumerStrategy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SlowConsumerStrategy getSlowConsumerStrategy() {
|
||||||
|
return this.slowConsumerStrategy;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,13 @@
|
||||||
|
package org.apache.activemq.broker.region.policy;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
|
import org.apache.activemq.broker.region.Subscription;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* a strategy for dealing with slow consumers
|
||||||
|
*/
|
||||||
|
public interface SlowConsumerStrategy {
|
||||||
|
|
||||||
|
void slowConsumer(ConnectionContext context, Subscription subs);
|
||||||
|
|
||||||
|
}
|
|
@ -61,7 +61,7 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
|
||||||
|
|
||||||
protected boolean useConcurrentSend = true;
|
protected boolean useConcurrentSend = true;
|
||||||
protected boolean durable;
|
protected boolean durable;
|
||||||
protected boolean topic;
|
public boolean topic;
|
||||||
protected boolean persistent;
|
protected boolean persistent;
|
||||||
|
|
||||||
protected BrokerService broker;
|
protected BrokerService broker;
|
||||||
|
@ -115,6 +115,7 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void sendMessages(Connection connection, Destination destination, int count) throws Exception {
|
protected void sendMessages(Connection connection, Destination destination, int count) throws Exception {
|
||||||
|
connections.add(connection);
|
||||||
connection.start();
|
connection.start();
|
||||||
|
|
||||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
@ -195,6 +196,9 @@ public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
|
||||||
|
|
||||||
protected ActiveMQDestination createDestination() throws JMSException {
|
protected ActiveMQDestination createDestination() throws JMSException {
|
||||||
String name = "." + getClass().getName() + "." + getName();
|
String name = "." + getClass().getName() + "." + getName();
|
||||||
|
// ensure not inadvertently composite because of combos
|
||||||
|
name = name.replace(' ','_');
|
||||||
|
name = name.replace(',','&');
|
||||||
if (topic) {
|
if (topic) {
|
||||||
destination = new ActiveMQTopic("Topic" + name);
|
destination = new ActiveMQTopic("Topic" + name);
|
||||||
return (ActiveMQDestination)destination;
|
return (ActiveMQDestination)destination;
|
||||||
|
|
|
@ -52,7 +52,6 @@ import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStorag
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQMessage;
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
import org.apache.activemq.util.ThreadTracker;
|
|
||||||
import org.apache.activemq.util.Wait;
|
import org.apache.activemq.util.Wait;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -82,7 +81,6 @@ public class MessageEvictionTest {
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
ThreadTracker.result();
|
|
||||||
connection.stop();
|
connection.stop();
|
||||||
broker.stop();
|
broker.stop();
|
||||||
}
|
}
|
||||||
|
@ -155,6 +153,7 @@ public class MessageEvictionTest {
|
||||||
|
|
||||||
ExecutorService executor = Executors.newCachedThreadPool();
|
ExecutorService executor = Executors.newCachedThreadPool();
|
||||||
final CountDownLatch doAck = new CountDownLatch(1);
|
final CountDownLatch doAck = new CountDownLatch(1);
|
||||||
|
final CountDownLatch ackDone = new CountDownLatch(1);
|
||||||
final CountDownLatch consumerRegistered = new CountDownLatch(1);
|
final CountDownLatch consumerRegistered = new CountDownLatch(1);
|
||||||
executor.execute(new Runnable() {
|
executor.execute(new Runnable() {
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -167,15 +166,18 @@ public class MessageEvictionTest {
|
||||||
doAck.await(60, TimeUnit.SECONDS);
|
doAck.await(60, TimeUnit.SECONDS);
|
||||||
LOG.info("acking: " + message.getJMSMessageID());
|
LOG.info("acking: " + message.getJMSMessageID());
|
||||||
message.acknowledge();
|
message.acknowledge();
|
||||||
|
ackDone.countDown();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
consumerRegistered.countDown();
|
|
||||||
fail(e.toString());
|
fail(e.toString());
|
||||||
|
} finally {
|
||||||
|
consumerRegistered.countDown();
|
||||||
|
ackDone.countDown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
consumerRegistered.countDown();
|
consumerRegistered.countDown();
|
||||||
doAck.await(60, TimeUnit.SECONDS);
|
ackDone.await(60, TimeUnit.SECONDS);
|
||||||
consumer.close();
|
consumer.close();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
|
@ -256,7 +258,8 @@ public class MessageEvictionTest {
|
||||||
|
|
||||||
// to keep the limit in check and up to date rather than just the first few, evict some
|
// to keep the limit in check and up to date rather than just the first few, evict some
|
||||||
OldestMessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
|
OldestMessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
|
||||||
messageEvictionStrategy.setEvictExpiredMessagesHighWatermark(100);
|
// whether to check expiry before eviction, default limit 1000 is fine as no ttl set in this test
|
||||||
|
//messageEvictionStrategy.setEvictExpiredMessagesHighWatermark(1000);
|
||||||
entry.setMessageEvictionStrategy(messageEvictionStrategy);
|
entry.setMessageEvictionStrategy(messageEvictionStrategy);
|
||||||
|
|
||||||
// let evicted messaged disappear
|
// let evicted messaged disappear
|
||||||
|
|
|
@ -0,0 +1,224 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.broker.policy;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.ExceptionListener;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import junit.framework.Test;
|
||||||
|
|
||||||
|
import org.apache.activemq.JmsMultipleClientsTestSupport;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
|
import org.apache.activemq.util.MessageIdList;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
|
||||||
|
public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport implements ExceptionListener {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(AbortSlowConsumerTest.class);
|
||||||
|
|
||||||
|
AbortSlowConsumerStrategy underTest;
|
||||||
|
|
||||||
|
public boolean abortConnection = false;
|
||||||
|
public long checkPeriod = 2*1000;
|
||||||
|
public long maxSlowDuration = 5*1000;
|
||||||
|
|
||||||
|
private List<Throwable> exceptions = new ArrayList<Throwable>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setUp() throws Exception {
|
||||||
|
exceptions.clear();
|
||||||
|
topic = true;
|
||||||
|
underTest = new AbortSlowConsumerStrategy();
|
||||||
|
super.setUp();
|
||||||
|
createDestination();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected BrokerService createBroker() throws Exception {
|
||||||
|
BrokerService broker = super.createBroker();
|
||||||
|
PolicyEntry policy = new PolicyEntry();
|
||||||
|
underTest.setAbortConnection(abortConnection);
|
||||||
|
underTest.setCheckPeriod(checkPeriod);
|
||||||
|
underTest.setMaxSlowDuration(maxSlowDuration);
|
||||||
|
|
||||||
|
policy.setSlowConsumerStrategy(underTest);
|
||||||
|
policy.setQueuePrefetch(10);
|
||||||
|
policy.setTopicPrefetch(10);
|
||||||
|
PolicyMap pMap = new PolicyMap();
|
||||||
|
pMap.setDefaultEntry(policy);
|
||||||
|
broker.setDestinationPolicy(pMap);
|
||||||
|
return broker;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRegularConsumerIsNotAborted() throws Exception {
|
||||||
|
startConsumers(destination);
|
||||||
|
for (Connection c: connections) {
|
||||||
|
c.setExceptionListener(this);
|
||||||
|
}
|
||||||
|
startProducers(destination, 100);
|
||||||
|
allMessagesList.waitForMessagesToArrive(10);
|
||||||
|
allMessagesList.assertAtLeastMessagesReceived(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void initCombosForTestLittleSlowConsumerIsNotAborted() {
|
||||||
|
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testLittleSlowConsumerIsNotAborted() throws Exception {
|
||||||
|
startConsumers(destination);
|
||||||
|
Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
|
||||||
|
consumertoAbort.getValue().setProcessingDelay(500);
|
||||||
|
for (Connection c: connections) {
|
||||||
|
c.setExceptionListener(this);
|
||||||
|
}
|
||||||
|
startProducers(destination, 12);
|
||||||
|
allMessagesList.waitForMessagesToArrive(10);
|
||||||
|
allMessagesList.assertAtLeastMessagesReceived(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void initCombosForTestSlowConsumerIsAborted() {
|
||||||
|
addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||||
|
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSlowConsumerIsAborted() throws Exception {
|
||||||
|
startConsumers(destination);
|
||||||
|
Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
|
||||||
|
consumertoAbort.getValue().setProcessingDelay(8*1000);
|
||||||
|
for (Connection c: connections) {
|
||||||
|
c.setExceptionListener(this);
|
||||||
|
}
|
||||||
|
startProducers(destination, 100);
|
||||||
|
|
||||||
|
consumertoAbort.getValue().assertMessagesReceived(1);
|
||||||
|
|
||||||
|
TimeUnit.SECONDS.sleep(5);
|
||||||
|
|
||||||
|
consumertoAbort.getValue().assertAtMostMessagesReceived(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testOnlyOneSlowConsumerIsAborted() throws Exception {
|
||||||
|
consumerCount = 10;
|
||||||
|
startConsumers(destination);
|
||||||
|
Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
|
||||||
|
consumertoAbort.getValue().setProcessingDelay(8*1000);
|
||||||
|
for (Connection c: connections) {
|
||||||
|
c.setExceptionListener(this);
|
||||||
|
}
|
||||||
|
startProducers(destination, 100);
|
||||||
|
|
||||||
|
allMessagesList.waitForMessagesToArrive(99);
|
||||||
|
allMessagesList.assertAtLeastMessagesReceived(99);
|
||||||
|
|
||||||
|
consumertoAbort.getValue().assertMessagesReceived(1);
|
||||||
|
|
||||||
|
TimeUnit.SECONDS.sleep(5);
|
||||||
|
|
||||||
|
consumertoAbort.getValue().assertAtMostMessagesReceived(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testAbortAlreadyClosingConsumers() throws Exception {
|
||||||
|
consumerCount = 1;
|
||||||
|
startConsumers(destination);
|
||||||
|
for (MessageIdList list : consumers.values()) {
|
||||||
|
list.setProcessingDelay(6*1000);
|
||||||
|
}
|
||||||
|
for (Connection c: connections) {
|
||||||
|
c.setExceptionListener(this);
|
||||||
|
}
|
||||||
|
startProducers(destination, 100);
|
||||||
|
allMessagesList.waitForMessagesToArrive(consumerCount);
|
||||||
|
|
||||||
|
for (MessageConsumer consumer : consumers.keySet()) {
|
||||||
|
LOG.info("closing consumer: " + consumer);
|
||||||
|
/// will block waiting for on message till 6secs expire
|
||||||
|
consumer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void initCombosForTestAbortAlreadyClosedConsumers() {
|
||||||
|
addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||||
|
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testAbortAlreadyClosedConsumers() throws Exception {
|
||||||
|
Connection conn = createConnectionFactory().createConnection();
|
||||||
|
conn.setExceptionListener(this);
|
||||||
|
connections.add(conn);
|
||||||
|
|
||||||
|
Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||||
|
final MessageConsumer consumer = sess.createConsumer(destination);
|
||||||
|
conn.start();
|
||||||
|
startProducers(destination, 20);
|
||||||
|
TimeUnit.SECONDS.sleep(1);
|
||||||
|
LOG.info("closing consumer: " + consumer);
|
||||||
|
consumer.close();
|
||||||
|
|
||||||
|
TimeUnit.SECONDS.sleep(5);
|
||||||
|
assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void initCombosForTestAbortAlreadyClosedConnection() {
|
||||||
|
addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||||
|
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testAbortAlreadyClosedConnection() throws Exception {
|
||||||
|
Connection conn = createConnectionFactory().createConnection();
|
||||||
|
conn.setExceptionListener(this);
|
||||||
|
|
||||||
|
Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||||
|
sess.createConsumer(destination);
|
||||||
|
conn.start();
|
||||||
|
startProducers(destination, 20);
|
||||||
|
TimeUnit.SECONDS.sleep(1);
|
||||||
|
LOG.info("closing connection: " + conn);
|
||||||
|
conn.close();
|
||||||
|
|
||||||
|
TimeUnit.SECONDS.sleep(5);
|
||||||
|
assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testAbortConsumerOnDeadConnection() throws Exception {
|
||||||
|
// socket proxy on pause, close could hang??
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onException(JMSException exception) {
|
||||||
|
exceptions.add(exception);
|
||||||
|
exception.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Test suite() {
|
||||||
|
return suite(AbortSlowConsumerTest.class);
|
||||||
|
}
|
||||||
|
}
|
|
@ -291,8 +291,12 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setCursorMemoryHighWaterMark(
|
public void setCursorMemoryHighWaterMark(
|
||||||
int cursorMemoryHighWaterMark) {
|
int cursorMemoryHighWaterMark) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isSlowConsumer() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
queue.addSubscription(contextNotInTx, subscription);
|
queue.addSubscription(contextNotInTx, subscription);
|
||||||
|
|
Loading…
Reference in New Issue