New SlowConsumerStrategy implementation for aborting consumers that haven't ack'd in the configured interval.  Can also be used to kick idle consumers if you disable the ignore idle consumers option.  

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1504231 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-07-17 18:44:27 +00:00
parent de49f2e873
commit 2b99f39a5b
11 changed files with 529 additions and 86 deletions

View File

@ -20,14 +20,17 @@ import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.management.ObjectName; import javax.management.ObjectName;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.LogicExpression; import org.apache.activemq.filter.LogicExpression;
@ -49,7 +52,7 @@ public abstract class AbstractSubscription implements Subscription {
private ObjectName objectName; private ObjectName objectName;
private int cursorMemoryHighWaterMark = 70; private int cursorMemoryHighWaterMark = 70;
private boolean slowConsumer; private boolean slowConsumer;
private long lastAckTime;
public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
this.broker = broker; this.broker = broker;
@ -57,6 +60,7 @@ public abstract class AbstractSubscription implements Subscription {
this.info = info; this.info = info;
this.destinationFilter = DestinationFilter.parseFilter(info.getDestination()); this.destinationFilter = DestinationFilter.parseFilter(info.getDestination());
this.selectorExpression = parseSelector(info); this.selectorExpression = parseSelector(info);
this.lastAckTime = System.currentTimeMillis();
} }
private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException { private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
@ -81,6 +85,12 @@ public abstract class AbstractSubscription implements Subscription {
return rc; return rc;
} }
@Override
public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
this.lastAckTime = System.currentTimeMillis();
}
@Override
public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException { public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
ConsumerId targetConsumerId = node.getTargetConsumerId(); ConsumerId targetConsumerId = node.getTargetConsumerId();
if (targetConsumerId != null) { if (targetConsumerId != null) {
@ -96,26 +106,32 @@ public abstract class AbstractSubscription implements Subscription {
} }
} }
@Override
public boolean matches(ActiveMQDestination destination) { public boolean matches(ActiveMQDestination destination) {
return destinationFilter.matches(destination); return destinationFilter.matches(destination);
} }
@Override
public void add(ConnectionContext context, Destination destination) throws Exception { public void add(ConnectionContext context, Destination destination) throws Exception {
destinations.add(destination); destinations.add(destination);
} }
@Override
public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
destinations.remove(destination); destinations.remove(destination);
return Collections.EMPTY_LIST; return Collections.EMPTY_LIST;
} }
@Override
public ConsumerInfo getConsumerInfo() { public ConsumerInfo getConsumerInfo() {
return info; return info;
} }
@Override
public void gc() { public void gc() {
} }
@Override
public ConnectionContext getContext() { public ConnectionContext getContext() {
return context; return context;
} }
@ -128,10 +144,12 @@ public abstract class AbstractSubscription implements Subscription {
return selectorExpression; return selectorExpression;
} }
@Override
public String getSelector() { public String getSelector() {
return info.getSelector(); return info.getSelector();
} }
@Override
public void setSelector(String selector) throws InvalidSelectorException { public void setSelector(String selector) throws InvalidSelectorException {
ConsumerInfo copy = info.copy(); ConsumerInfo copy = info.copy();
copy.setSelector(selector); copy.setSelector(selector);
@ -141,14 +159,17 @@ public abstract class AbstractSubscription implements Subscription {
this.selectorExpression = newSelector; this.selectorExpression = newSelector;
} }
@Override
public ObjectName getObjectName() { public ObjectName getObjectName() {
return objectName; return objectName;
} }
@Override
public void setObjectName(ObjectName objectName) { public void setObjectName(ObjectName objectName) {
this.objectName = objectName; this.objectName = objectName;
} }
@Override
public int getPrefetchSize() { public int getPrefetchSize() {
return info.getPrefetchSize(); return info.getPrefetchSize();
} }
@ -156,18 +177,21 @@ public abstract class AbstractSubscription implements Subscription {
info.setPrefetchSize(newSize); info.setPrefetchSize(newSize);
} }
@Override
public boolean isRecoveryRequired() { public boolean isRecoveryRequired() {
return true; return true;
} }
@Override
public boolean isSlowConsumer() { public boolean isSlowConsumer() {
return slowConsumer; return slowConsumer;
} }
public void setSlowConsumer(boolean val) { public void setSlowConsumer(boolean val) {
slowConsumer = val; slowConsumer = val;
} }
@Override
public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception { public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
boolean result = false; boolean result = false;
MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
@ -186,50 +210,56 @@ public abstract class AbstractSubscription implements Subscription {
return result; return result;
} }
@Override
public ActiveMQDestination getActiveMQDestination() { public ActiveMQDestination getActiveMQDestination() {
return info != null ? info.getDestination() : null; return info != null ? info.getDestination() : null;
} }
@Override
public boolean isBrowser() { public boolean isBrowser() {
return info != null && info.isBrowser(); return info != null && info.isBrowser();
} }
@Override
public int getInFlightUsage() { public int getInFlightUsage() {
if (info.getPrefetchSize() > 0) { if (info.getPrefetchSize() > 0) {
return (getInFlightSize() * 100)/info.getPrefetchSize(); return (getInFlightSize() * 100)/info.getPrefetchSize();
} }
return Integer.MAX_VALUE; return Integer.MAX_VALUE;
} }
/** /**
* Add a destination * Add a destination
* @param destination * @param destination
*/ */
public void addDestination(Destination destination) { public void addDestination(Destination destination) {
} }
/** /**
* Remove a destination * Remove a destination
* @param destination * @param destination
*/ */
public void removeDestination(Destination destination) { public void removeDestination(Destination destination) {
}
public int getCursorMemoryHighWaterMark(){
return this.cursorMemoryHighWaterMark;
} }
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){ @Override
this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark; public int getCursorMemoryHighWaterMark(){
} return this.cursorMemoryHighWaterMark;
}
@Override
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){
this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark;
}
@Override
public int countBeforeFull() { public int countBeforeFull() {
return getDispatchedQueueSize() - info.getPrefetchSize(); return getDispatchedQueueSize() - info.getPrefetchSize();
} }
@Override
public void unmatched(MessageReference node) throws IOException { public void unmatched(MessageReference node) throws IOException {
// only durable topic subs have something to do here // only durable topic subs have something to do here
} }
@ -237,4 +267,13 @@ public abstract class AbstractSubscription implements Subscription {
protected void doAddRecoveredMessage(MessageReference message) throws Exception { protected void doAddRecoveredMessage(MessageReference message) throws Exception {
add(message); add(message);
} }
@Override
public long getTimeOfLastMessageAck() {
return lastAckTime;
}
public void setTimeOfLastMessageAck(long value) {
this.lastAckTime = value;
}
} }

View File

@ -310,6 +310,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
@Override @Override
protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException { protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
this.setTimeOfLastMessageAck(System.currentTimeMillis());
Destination regionDestination = (Destination) node.getRegionDestination(); Destination regionDestination = (Destination) node.getRegionDestination();
regionDestination.acknowledge(context, this, ack, node); regionDestination.acknowledge(context, this, ack, node);
redeliveredMessages.remove(node.getMessageId()); redeliveredMessages.remove(node.getMessageId());

View File

@ -47,6 +47,8 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
*/ */
@Override @Override
protected void acknowledge(final ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException { protected void acknowledge(final ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException {
this.setTimeOfLastMessageAck(System.currentTimeMillis());
final Destination q = (Destination) n.getRegionDestination(); final Destination q = (Destination) n.getRegionDestination();
final QueueMessageReference node = (QueueMessageReference)n; final QueueMessageReference node = (QueueMessageReference)n;
final Queue queue = (Queue)q; final Queue queue = (Queue)q;

View File

@ -32,27 +32,26 @@ import org.apache.activemq.command.Response;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
/** /**
* *
*/ */
public interface Subscription extends SubscriptionRecovery { public interface Subscription extends SubscriptionRecovery {
/** /**
* Used to add messages that match the subscription. * Used to add messages that match the subscription.
* @param node * @param node
* @throws Exception * @throws Exception
* @throws InterruptedException * @throws InterruptedException
* @throws IOException * @throws IOException
*/ */
void add(MessageReference node) throws Exception; void add(MessageReference node) throws Exception;
/** /**
* Used when client acknowledge receipt of dispatched message. * Used when client acknowledge receipt of dispatched message.
* @param node * @param node
* @throws IOException * @throws IOException
* @throws Exception * @throws Exception
*/ */
void acknowledge(ConnectionContext context, final MessageAck ack) throws Exception; void acknowledge(ConnectionContext context, final MessageAck ack) throws Exception;
/** /**
* Allows a consumer to pull a message on demand * Allows a consumer to pull a message on demand
@ -61,36 +60,36 @@ public interface Subscription extends SubscriptionRecovery {
/** /**
* Is the subscription interested in the message? * Is the subscription interested in the message?
* @param node * @param node
* @param context * @param context
* @return * @return
* @throws IOException * @throws IOException
*/ */
boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException; boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException;
/** /**
* Is the subscription interested in messages in the destination? * Is the subscription interested in messages in the destination?
* @param context * @param context
* @return * @return
*/ */
boolean matches(ActiveMQDestination destination); boolean matches(ActiveMQDestination destination);
/** /**
* The subscription will be receiving messages from the destination. * The subscription will be receiving messages from the destination.
* @param context * @param context
* @param destination * @param destination
* @throws Exception * @throws Exception
*/ */
void add(ConnectionContext context, Destination destination) throws Exception; void add(ConnectionContext context, Destination destination) throws Exception;
/** /**
* The subscription will be no longer be receiving messages from the destination. * The subscription will be no longer be receiving messages from the destination.
* @param context * @param context
* @param destination * @param destination
* @return a list of un-acked messages that were added to the subscription. * @return a list of un-acked messages that were added to the subscription.
*/ */
List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception; List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception;
/** /**
* The ConsumerInfo object that created the subscription. * The ConsumerInfo object that created the subscription.
* @param destination * @param destination
@ -102,11 +101,11 @@ public interface Subscription extends SubscriptionRecovery {
* reclaim memory. * reclaim memory.
*/ */
void gc(); void gc();
/** /**
* Used by a Slave Broker to update dispatch infomation * Used by a Slave Broker to update dispatch infomation
* @param mdn * @param mdn
* @throws Exception * @throws Exception
*/ */
void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception; void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception;
@ -114,17 +113,17 @@ public interface Subscription extends SubscriptionRecovery {
* @return number of messages pending delivery * @return number of messages pending delivery
*/ */
int getPendingQueueSize(); int getPendingQueueSize();
/** /**
* @return number of messages dispatched to the client * @return number of messages dispatched to the client
*/ */
int getDispatchedQueueSize(); int getDispatchedQueueSize();
/** /**
* @return number of messages dispatched to the client * @return number of messages dispatched to the client
*/ */
long getDispatchedCounter(); long getDispatchedCounter();
/** /**
* @return number of messages that matched the subscription * @return number of messages that matched the subscription
*/ */
@ -139,7 +138,7 @@ public interface Subscription extends SubscriptionRecovery {
* @return the JMS selector on the current subscription * @return the JMS selector on the current subscription
*/ */
String getSelector(); String getSelector();
/** /**
* Attempts to change the current active selector on the subscription. * Attempts to change the current active selector on the subscription.
* This operation is not supported for persistent topics. * This operation is not supported for persistent topics.
@ -155,29 +154,28 @@ public interface Subscription extends SubscriptionRecovery {
* Set when the subscription is registered in JMX * Set when the subscription is registered in JMX
*/ */
void setObjectName(ObjectName objectName); void setObjectName(ObjectName objectName);
/** /**
* @return true when 60% or more room is left for dispatching messages * @return true when 60% or more room is left for dispatching messages
*/ */
boolean isLowWaterMark(); boolean isLowWaterMark();
/** /**
* @return true when 10% or less room is left for dispatching messages * @return true when 10% or less room is left for dispatching messages
*/ */
boolean isHighWaterMark(); boolean isHighWaterMark();
/** /**
* @return true if there is no space to dispatch messages * @return true if there is no space to dispatch messages
*/ */
boolean isFull(); boolean isFull();
/** /**
* inform the MessageConsumer on the client to change it's prefetch * inform the MessageConsumer on the client to change it's prefetch
* @param newPrefetch * @param newPrefetch
*/ */
void updateConsumerPrefetch(int newPrefetch); void updateConsumerPrefetch(int newPrefetch);
/** /**
* Called when the subscription is destroyed. * Called when the subscription is destroyed.
*/ */
@ -187,17 +185,17 @@ public interface Subscription extends SubscriptionRecovery {
* @return the prefetch size that is configured for the subscription * @return the prefetch size that is configured for the subscription
*/ */
int getPrefetchSize(); int getPrefetchSize();
/** /**
* @return the number of messages awaiting acknowledgement * @return the number of messages awaiting acknowledgement
*/ */
int getInFlightSize(); int getInFlightSize();
/** /**
* @return the in flight messages as a percentage of the prefetch size * @return the in flight messages as a percentage of the prefetch size
*/ */
int getInFlightUsage(); int getInFlightUsage();
/** /**
* Informs the Broker if the subscription needs to intervention to recover it's state * Informs the Broker if the subscription needs to intervention to recover it's state
* e.g. DurableTopicSubscriber may do * e.g. DurableTopicSubscriber may do
@ -205,25 +203,35 @@ public interface Subscription extends SubscriptionRecovery {
* @return true if recovery required * @return true if recovery required
*/ */
boolean isRecoveryRequired(); boolean isRecoveryRequired();
/** /**
* @return true if a browser * @return true if a browser
*/ */
boolean isBrowser(); boolean isBrowser();
/** /**
* @return the number of messages this subscription can accept before its full * @return the number of messages this subscription can accept before its full
*/ */
int countBeforeFull(); int countBeforeFull();
ConnectionContext getContext(); ConnectionContext getContext();
public int getCursorMemoryHighWaterMark(); public int getCursorMemoryHighWaterMark();
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark); public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
boolean isSlowConsumer(); boolean isSlowConsumer();
void unmatched(MessageReference node) throws IOException; void unmatched(MessageReference node) throws IOException;
/**
* Returns the time since the last Ack message was received by this subscription.
*
* If there has never been an ack this value should be set to the creation time of the
* subscription.
*
* @return time of last received Ack message or Subscription create time if no Acks.
*/
long getTimeOfLastMessageAck();
} }

View File

@ -261,6 +261,8 @@ public class TopicSubscription extends AbstractSubscription {
@Override @Override
public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception { public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
super.acknowledge(context, ack);
// Handle the standard acknowledgment case. // Handle the standard acknowledgment case.
if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) { if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
if (context.isInTransaction()) { if (context.isInTransaction()) {

View File

@ -0,0 +1,201 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Abort slow consumers when they reach the configured threshold of slowness,
*
* default is that a consumer that has not Ack'd a message for 30 seconds is slow.
*
* @org.apache.xbean.XBean
*/
public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy {
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumerStrategy.class);
private final List<Destination> destinations = new LinkedList<Destination>();
private long maxTimeSinceLastAck = 30*1000;
private boolean ignoreIdleConsumers = true;
public AbortSlowAckConsumerStrategy() {
this.name = "AbortSlowAckConsumerStrategy@" + hashCode();
}
@Override
public void setBrokerService(Broker broker) {
super.setBrokerService(broker);
// Task starts right away since we may not receive any slow consumer events.
if (taskStarted.compareAndSet(false, true)) {
scheduler.executePeriodically(this, getCheckPeriod());
}
}
@Override
public void slowConsumer(ConnectionContext context, Subscription subs) {
// Ignore these events, we just look at time since last Ack.
}
@Override
public void run() {
if (maxTimeSinceLastAck < 0) {
// nothing to do
LOG.info("no limit set, slowConsumer strategy has nothing to do");
return;
}
if (getMaxSlowDuration() > 0) {
// For subscriptions that are already slow we mark them again and check below if
// they've exceeded their configured lifetime.
for (SlowConsumerEntry entry : slowConsumers.values()) {
entry.mark();
}
}
List<Destination> disposed = new ArrayList<Destination>();
for (Destination destination : destinations) {
if (destination.isDisposed()) {
disposed.add(destination);
continue;
}
// Not explicitly documented but this returns a stable copy.
List<Subscription> subscribers = destination.getConsumers();
updateSlowConsumersList(subscribers);
}
// Clean up an disposed destinations to save space.
destinations.removeAll(disposed);
abortAllQualifiedSlowConsumers();
}
private void updateSlowConsumersList(List<Subscription> subscribers) {
for (Subscription subscriber : subscribers) {
if (isIgnoreIdleConsumers() && subscriber.getDispatchedQueueSize() == 0) {
// Not considered Idle so ensure its cleared from the list
if (slowConsumers.remove(subscriber) != null) {
LOG.info("sub: {} is no longer slow", subscriber.getConsumerInfo().getConsumerId());
}
}
long lastAckTime = subscriber.getTimeOfLastMessageAck();
long timeDelta = System.currentTimeMillis() - lastAckTime;
if (timeDelta > maxTimeSinceLastAck) {
if (!slowConsumers.containsKey(subscriber)) {
if (LOG.isDebugEnabled()) {
LOG.debug("sub: {} is now slow", subscriber.getConsumerInfo().getConsumerId());
}
slowConsumers.put(subscriber, new SlowConsumerEntry(subscriber.getContext()));
} else if (getMaxSlowCount() > 0) {
slowConsumers.get(subscriber).slow();
}
} else {
if (slowConsumers.remove(subscriber) != null) {
LOG.info("sub: {} is no longer slow", subscriber.getConsumerInfo().getConsumerId());
}
}
}
}
private void abortAllQualifiedSlowConsumers() {
HashMap<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>();
for (Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) {
if (entry.getKey().isSlowConsumer()) {
if (getMaxSlowDuration() > 0 &&
(entry.getValue().markCount * getCheckPeriod() > getMaxSlowDuration()) ||
getMaxSlowCount() > 0 && entry.getValue().slowCount > getMaxSlowCount()) {
toAbort.put(entry.getKey(), entry.getValue());
slowConsumers.remove(entry.getKey());
}
}
}
// Now if any subscriptions made it into the aborts list we can kick them.
abortSubscription(toAbort, isAbortConnection());
}
@Override
public void addDestination(Destination destination) {
this.destinations.add(destination);
}
/**
* Gets the maximum time since last Ack before a subscription is considered to be slow.
*
* @return the maximum time since last Ack before the consumer is considered to be slow.
*/
public long getMaxTimeSinceLastAck() {
return maxTimeSinceLastAck;
}
/**
* Sets the maximum time since last Ack before a subscription is considered to be slow.
*
* @param maxTimeSinceLastAck
* the maximum time since last Ack (mills) before the consumer is considered to be slow.
*/
public void setMaxTimeSinceLastAck(long maxTimeSinceLastAck) {
this.maxTimeSinceLastAck = maxTimeSinceLastAck;
}
/**
* Returns whether the strategy is configured to ignore consumers that are simply idle, i.e
* consumers that have no pending acks (dispatch queue is empty).
*
* @return true if the strategy will ignore idle consumer when looking for slow consumers.
*/
public boolean isIgnoreIdleConsumers() {
return ignoreIdleConsumers;
}
/**
* Sets whether the strategy is configured to ignore consumers that are simply idle, i.e
* consumers that have no pending acks (dispatch queue is empty).
*
* When configured to not ignore idle consumers this strategy acks not only on consumers
* that are actually slow but also on any consumer that has not received any messages for
* the maxTimeSinceLastAck. This allows for a way to evict idle consumers while also
* aborting slow consumers.
*
* @param ignoreIdleConsumers
* Should this strategy ignore idle consumers or consider all consumers when checking
* the last ack time verses the maxTimeSinceLastAck value.
*/
public void setIgnoreIdleConsumers(boolean ignoreIdleConsumers) {
this.ignoreIdleConsumers = ignoreIdleConsumers;
}
}

View File

@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.Connection; import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.Scheduler;
@ -34,40 +35,43 @@ import org.slf4j.LoggerFactory;
/** /**
* Abort slow consumers when they reach the configured threshold of slowness, default is slow for 30 seconds * Abort slow consumers when they reach the configured threshold of slowness, default is slow for 30 seconds
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
*/ */
public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable { public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerStrategy.class); private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerStrategy.class);
private String name = "AbortSlowConsumerStrategy@" + hashCode(); protected String name = "AbortSlowConsumerStrategy@" + hashCode();
private Scheduler scheduler; protected Scheduler scheduler;
private Broker broker; protected Broker broker;
private final AtomicBoolean taskStarted = new AtomicBoolean(false); protected final AtomicBoolean taskStarted = new AtomicBoolean(false);
private final Map<Subscription, SlowConsumerEntry> slowConsumers = new ConcurrentHashMap<Subscription, SlowConsumerEntry>(); protected final Map<Subscription, SlowConsumerEntry> slowConsumers =
new ConcurrentHashMap<Subscription, SlowConsumerEntry>();
private long maxSlowCount = -1; private long maxSlowCount = -1;
private long maxSlowDuration = 30*1000; private long maxSlowDuration = 30*1000;
private long checkPeriod = 30*1000; private long checkPeriod = 30*1000;
private boolean abortConnection = false; private boolean abortConnection = false;
@Override
public void setBrokerService(Broker broker) { public void setBrokerService(Broker broker) {
this.scheduler = broker.getScheduler(); this.scheduler = broker.getScheduler();
this.broker = broker; this.broker = broker;
} }
@Override
public void slowConsumer(ConnectionContext context, Subscription subs) { public void slowConsumer(ConnectionContext context, Subscription subs) {
if (maxSlowCount < 0 && maxSlowDuration < 0) { if (maxSlowCount < 0 && maxSlowDuration < 0) {
// nothing to do // nothing to do
LOG.info("no limits set, slowConsumer strategy has nothing to do"); LOG.info("no limits set, slowConsumer strategy has nothing to do");
return; return;
} }
if (taskStarted.compareAndSet(false, true)) { if (taskStarted.compareAndSet(false, true)) {
scheduler.executePeriodically(this, checkPeriod); scheduler.executePeriodically(this, checkPeriod);
} }
if (!slowConsumers.containsKey(subs)) { if (!slowConsumers.containsKey(subs)) {
slowConsumers.put(subs, new SlowConsumerEntry(context)); slowConsumers.put(subs, new SlowConsumerEntry(context));
} else if (maxSlowCount > 0) { } else if (maxSlowCount > 0) {
@ -75,6 +79,7 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
} }
} }
@Override
public void run() { public void run() {
if (maxSlowDuration > 0) { if (maxSlowDuration > 0) {
// mark // mark
@ -82,12 +87,12 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
entry.mark(); entry.mark();
} }
} }
HashMap<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>(); HashMap<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>();
for (Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) { for (Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) {
if (entry.getKey().isSlowConsumer()) { if (entry.getKey().isSlowConsumer()) {
if (maxSlowDuration > 0 && (entry.getValue().markCount * checkPeriod > maxSlowDuration) if (maxSlowDuration > 0 && (entry.getValue().markCount * checkPeriod > maxSlowDuration)
|| maxSlowCount > 0 && entry.getValue().slowCount > maxSlowCount) { || maxSlowCount > 0 && entry.getValue().slowCount > maxSlowCount) {
toAbort.put(entry.getKey(), entry.getValue()); toAbort.put(entry.getKey(), entry.getValue());
slowConsumers.remove(entry.getKey()); slowConsumers.remove(entry.getKey());
} }
@ -100,19 +105,20 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
abortSubscription(toAbort, abortConnection); abortSubscription(toAbort, abortConnection);
} }
private void abortSubscription(Map<Subscription, SlowConsumerEntry> toAbort, boolean abortSubscriberConnection) { protected void abortSubscription(Map<Subscription, SlowConsumerEntry> toAbort, boolean abortSubscriberConnection) {
for (final Entry<Subscription, SlowConsumerEntry> entry : toAbort.entrySet()) { for (final Entry<Subscription, SlowConsumerEntry> entry : toAbort.entrySet()) {
ConnectionContext connectionContext = entry.getValue().context; ConnectionContext connectionContext = entry.getValue().context;
if (connectionContext!= null) { if (connectionContext!= null) {
try { try {
LOG.info("aborting " LOG.info("aborting "
+ (abortSubscriberConnection ? "connection" : "consumer") + (abortSubscriberConnection ? "connection" : "consumer")
+ ", slow consumer: " + entry.getKey()); + ", slow consumer: " + entry.getKey());
final Connection connection = connectionContext.getConnection(); final Connection connection = connectionContext.getConnection();
if (connection != null) { if (connection != null) {
if (abortSubscriberConnection) { if (abortSubscriberConnection) {
scheduler.executeAfterDelay(new Runnable() { scheduler.executeAfterDelay(new Runnable() {
@Override
public void run() { public void run() {
connection.serviceException(new InactivityIOException("Consumer was slow too often (>" connection.serviceException(new InactivityIOException("Consumer was slow too often (>"
+ maxSlowCount + ") or too long (>" + maxSlowCount + ") or too long (>"
@ -137,12 +143,11 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
} }
} }
public void abortConsumer(Subscription sub, boolean abortSubscriberConnection) { public void abortConsumer(Subscription sub, boolean abortSubscriberConnection) {
if (sub != null) { if (sub != null) {
SlowConsumerEntry entry = slowConsumers.remove(sub); SlowConsumerEntry entry = slowConsumers.remove(sub);
if (entry != null) { if (entry != null) {
Map toAbort = new HashMap<Subscription, SlowConsumerEntry>(); Map<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>();
toAbort.put(sub, entry); toAbort.put(sub, entry);
abortSubscription(toAbort, abortSubscriberConnection); abortSubscription(toAbort, abortSubscriberConnection);
} else { } else {
@ -151,7 +156,6 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
} }
} }
public long getMaxSlowCount() { public long getMaxSlowCount() {
return maxSlowCount; return maxSlowCount;
} }
@ -204,7 +208,7 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
public void setName(String name) { public void setName(String name) {
this.name = name; this.name = name;
} }
public String getName() { public String getName() {
return name; return name;
} }
@ -212,4 +216,9 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
public Map<Subscription, SlowConsumerEntry> getSlowConsumers() { public Map<Subscription, SlowConsumerEntry> getSlowConsumers() {
return slowConsumers; return slowConsumers;
} }
@Override
public void addDestination(Destination destination) {
// Not needed for this strategy.
}
} }

View File

@ -145,7 +145,7 @@ public class PolicyEntry extends DestinationMapEntry {
topic.setLazyDispatch(isLazyDispatch()); topic.setLazyDispatch(isLazyDispatch());
} }
public void baseConfiguration(Broker broker,BaseDestination destination) { public void baseConfiguration(Broker broker, BaseDestination destination) {
destination.setProducerFlowControl(isProducerFlowControl()); destination.setProducerFlowControl(isProducerFlowControl());
destination.setAlwaysRetroactive(isAlwaysRetroactive()); destination.setAlwaysRetroactive(isAlwaysRetroactive());
destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval()); destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval());
@ -170,6 +170,7 @@ public class PolicyEntry extends DestinationMapEntry {
SlowConsumerStrategy scs = getSlowConsumerStrategy(); SlowConsumerStrategy scs = getSlowConsumerStrategy();
if (scs != null) { if (scs != null) {
scs.setBrokerService(broker); scs.setBrokerService(broker);
scs.addDestination(destination);
} }
destination.setSlowConsumerStrategy(scs); destination.setSlowConsumerStrategy(scs);
destination.setPrioritizedMessages(isPrioritizedMessages()); destination.setPrioritizedMessages(isPrioritizedMessages());
@ -179,7 +180,6 @@ public class PolicyEntry extends DestinationMapEntry {
destination.setReduceMemoryFootprint(isReduceMemoryFootprint()); destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage()); destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage());
destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit()); destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit());
} }
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) { public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {

View File

@ -18,13 +18,41 @@ package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
/* /**
* a strategy for dealing with slow consumers * Interface for a strategy for dealing with slow consumers
*/ */
public interface SlowConsumerStrategy { public interface SlowConsumerStrategy {
/**
* Slow consumer event.
*
* @param context
* Connection context of the subscription.
* @param subs
* The subscription object for the slow consumer.
*/
void slowConsumer(ConnectionContext context, Subscription subs); void slowConsumer(ConnectionContext context, Subscription subs);
/**
* Sets the Broker instance which can provide a Scheduler among other things.
*
* @param broker
* The running Broker.
*/
void setBrokerService(Broker broker); void setBrokerService(Broker broker);
/**
* For Strategies that need to examine assigned destination for slow consumers
* periodically the destination is assigned here.
*
* If the strategy doesn't is event driven it can just ignore assigned destination.
*
* @param destination
* A destination to add to a watch list.
*/
void addDestination(Destination destination);
} }

View File

@ -0,0 +1,152 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.policy;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AbortSlowAckConsumerTest extends AbortSlowConsumerTest {
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumerTest.class);
protected long maxTimeSinceLastAck = 5 * 1000;
@Override
protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
return new AbortSlowConsumerStrategy();
}
@Override
protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();
PolicyEntry policy = new PolicyEntry();
AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest;
strategy.setAbortConnection(abortConnection);
strategy.setCheckPeriod(checkPeriod);
strategy.setMaxSlowDuration(maxSlowDuration);
strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck);
policy.setSlowConsumerStrategy(strategy);
policy.setQueuePrefetch(10);
policy.setTopicPrefetch(10);
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
broker.setDestinationPolicy(pMap);
return broker;
}
@Override
protected ConnectionFactory createConnectionFactory() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
factory.getPrefetchPolicy().setAll(1);
return factory;
}
@Override
public void testSlowConsumerIsAbortedViaJmx() throws Exception {
AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest;
strategy.setMaxTimeSinceLastAck(500); // so jmx does the abort
super.testSlowConsumerIsAbortedViaJmx();
}
@Override
public void initCombosForTestSlowConsumerIsAborted() {
addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
}
public void testZeroPrefetchConsumerIsAborted() throws Exception {
ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
conn.setExceptionListener(this);
connections.add(conn);
Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final MessageConsumer consumer = sess.createConsumer(destination);
assertNotNull(consumer);
conn.start();
startProducers(destination, 20);
Message message = consumer.receive(5000);
assertNotNull(message);
try {
consumer.receive(20000);
fail("Slow consumer not aborted.");
} catch(Exception ex) {
}
}
public void testIdleConsumerCanBeAbortedNoMessages() throws Exception {
AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest;
strategy.setIgnoreIdleConsumers(false);
ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
conn.setExceptionListener(this);
connections.add(conn);
Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final MessageConsumer consumer = sess.createConsumer(destination);
assertNotNull(consumer);
conn.start();
startProducers(destination, 20);
try {
consumer.receive(20000);
fail("Idle consumer not aborted.");
} catch(Exception ex) {
}
}
public void testIdleConsumerCanBeAborted() throws Exception {
AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest;
strategy.setIgnoreIdleConsumers(false);
ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection();
conn.setExceptionListener(this);
connections.add(conn);
Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final MessageConsumer consumer = sess.createConsumer(destination);
assertNotNull(consumer);
conn.start();
startProducers(destination, 20);
Message message = consumer.receive(5000);
assertNotNull(message);
message.acknowledge();
try {
consumer.receive(20000);
fail("Slow consumer not aborted.");
} catch(Exception ex) {
}
}
}

View File

@ -51,23 +51,25 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerTest.class); private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerTest.class);
AbortSlowConsumerStrategy underTest; protected AbortSlowConsumerStrategy underTest;
protected boolean abortConnection = false;
public boolean abortConnection = false; protected long checkPeriod = 2 * 1000;
public long checkPeriod = 2 * 1000; protected long maxSlowDuration = 5 * 1000;
public long maxSlowDuration = 5 * 1000; protected final List<Throwable> exceptions = new ArrayList<Throwable>();
private final List<Throwable> exceptions = new ArrayList<Throwable>();
@Override @Override
protected void setUp() throws Exception { protected void setUp() throws Exception {
exceptions.clear(); exceptions.clear();
topic = true; topic = true;
underTest = new AbortSlowConsumerStrategy(); underTest = createSlowConsumerStrategy();
super.setUp(); super.setUp();
createDestination(); createDestination();
} }
protected AbortSlowConsumerStrategy createSlowConsumerStrategy() {
return new AbortSlowConsumerStrategy();
}
@Override @Override
protected BrokerService createBroker() throws Exception { protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker(); BrokerService broker = super.createBroker();
@ -244,7 +246,6 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty()); assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
} }
public void initCombosForTestAbortAlreadyClosedConnection() { public void initCombosForTestAbortAlreadyClosedConnection() {
addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE}); addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE}); addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});