From 2b99f39a5be229d8982e9378248c7e32df9147a0 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Wed, 17 Jul 2013 18:44:27 +0000 Subject: [PATCH] Fix for: https://issues.apache.org/jira/browse/AMQ-4621 New SlowConsumerStrategy implementation for aborting consumers that haven't ack'd in the configured interval. Can also be used to kick idle consumers if you disable the ignore idle consumers option. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1504231 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/AbstractSubscription.java | 75 +++++-- .../region/DurableTopicSubscription.java | 1 + .../broker/region/QueueSubscription.java | 2 + .../activemq/broker/region/Subscription.java | 84 ++++---- .../broker/region/TopicSubscription.java | 2 + .../policy/AbortSlowAckConsumerStrategy.java | 201 ++++++++++++++++++ .../policy/AbortSlowConsumerStrategy.java | 43 ++-- .../broker/region/policy/PolicyEntry.java | 4 +- .../region/policy/SlowConsumerStrategy.java | 32 ++- .../policy/AbortSlowAckConsumerTest.java | 152 +++++++++++++ .../broker/policy/AbortSlowConsumerTest.java | 19 +- 11 files changed, 529 insertions(+), 86 deletions(-) create mode 100644 activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumerTest.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java index 1bf53391ce..e470fabde7 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java @@ -20,14 +20,17 @@ import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; + import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.management.ObjectName; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.MessageAck; import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.LogicExpression; @@ -49,7 +52,7 @@ public abstract class AbstractSubscription implements Subscription { private ObjectName objectName; private int cursorMemoryHighWaterMark = 70; private boolean slowConsumer; - + private long lastAckTime; public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { this.broker = broker; @@ -57,6 +60,7 @@ public abstract class AbstractSubscription implements Subscription { this.info = info; this.destinationFilter = DestinationFilter.parseFilter(info.getDestination()); this.selectorExpression = parseSelector(info); + this.lastAckTime = System.currentTimeMillis(); } private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException { @@ -81,6 +85,12 @@ public abstract class AbstractSubscription implements Subscription { return rc; } + @Override + public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception { + this.lastAckTime = System.currentTimeMillis(); + } + + @Override public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException { ConsumerId targetConsumerId = node.getTargetConsumerId(); if (targetConsumerId != null) { @@ -96,26 +106,32 @@ public abstract class AbstractSubscription implements Subscription { } } + @Override public boolean matches(ActiveMQDestination destination) { return destinationFilter.matches(destination); } + @Override public void add(ConnectionContext context, Destination destination) throws Exception { destinations.add(destination); } + @Override public List remove(ConnectionContext context, Destination destination) throws Exception { destinations.remove(destination); return Collections.EMPTY_LIST; } + @Override public ConsumerInfo getConsumerInfo() { return info; } + @Override public void gc() { } + @Override public ConnectionContext getContext() { return context; } @@ -128,10 +144,12 @@ public abstract class AbstractSubscription implements Subscription { return selectorExpression; } + @Override public String getSelector() { return info.getSelector(); } + @Override public void setSelector(String selector) throws InvalidSelectorException { ConsumerInfo copy = info.copy(); copy.setSelector(selector); @@ -141,14 +159,17 @@ public abstract class AbstractSubscription implements Subscription { this.selectorExpression = newSelector; } + @Override public ObjectName getObjectName() { return objectName; } + @Override public void setObjectName(ObjectName objectName) { this.objectName = objectName; } + @Override public int getPrefetchSize() { return info.getPrefetchSize(); } @@ -156,18 +177,21 @@ public abstract class AbstractSubscription implements Subscription { info.setPrefetchSize(newSize); } + @Override public boolean isRecoveryRequired() { return true; } - + + @Override public boolean isSlowConsumer() { return slowConsumer; } - + public void setSlowConsumer(boolean val) { slowConsumer = val; } + @Override public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception { boolean result = false; MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); @@ -186,50 +210,56 @@ public abstract class AbstractSubscription implements Subscription { return result; } + @Override public ActiveMQDestination getActiveMQDestination() { return info != null ? info.getDestination() : null; } - + + @Override public boolean isBrowser() { return info != null && info.isBrowser(); } - + + @Override public int getInFlightUsage() { if (info.getPrefetchSize() > 0) { return (getInFlightSize() * 100)/info.getPrefetchSize(); } return Integer.MAX_VALUE; } - + /** * Add a destination * @param destination */ public void addDestination(Destination destination) { - + } - - + /** * Remove a destination * @param destination */ public void removeDestination(Destination destination) { - - } - - public int getCursorMemoryHighWaterMark(){ - return this.cursorMemoryHighWaterMark; + } - public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){ - this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark; - } - + @Override + public int getCursorMemoryHighWaterMark(){ + return this.cursorMemoryHighWaterMark; + } + + @Override + public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){ + this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark; + } + + @Override public int countBeforeFull() { return getDispatchedQueueSize() - info.getPrefetchSize(); } + @Override public void unmatched(MessageReference node) throws IOException { // only durable topic subs have something to do here } @@ -237,4 +267,13 @@ public abstract class AbstractSubscription implements Subscription { protected void doAddRecoveredMessage(MessageReference message) throws Exception { add(message); } + + @Override + public long getTimeOfLastMessageAck() { + return lastAckTime; + } + + public void setTimeOfLastMessageAck(long value) { + this.lastAckTime = value; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index c4ef1198bf..25752f269a 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -310,6 +310,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us @Override protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException { + this.setTimeOfLastMessageAck(System.currentTimeMillis()); Destination regionDestination = (Destination) node.getRegionDestination(); regionDestination.acknowledge(context, this, ack, node); redeliveredMessages.remove(node.getMessageId()); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java index d0e69e59af..4996eaa3cb 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java @@ -47,6 +47,8 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner */ @Override protected void acknowledge(final ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException { + this.setTimeOfLastMessageAck(System.currentTimeMillis()); + final Destination q = (Destination) n.getRegionDestination(); final QueueMessageReference node = (QueueMessageReference)n; final Queue queue = (Queue)q; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java index ef996d8c64..dfd427d128 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java @@ -32,27 +32,26 @@ import org.apache.activemq.command.Response; import org.apache.activemq.filter.MessageEvaluationContext; /** - * + * */ public interface Subscription extends SubscriptionRecovery { /** * Used to add messages that match the subscription. * @param node - * @throws Exception - * @throws InterruptedException - * @throws IOException + * @throws Exception + * @throws InterruptedException + * @throws IOException */ void add(MessageReference node) throws Exception; - + /** - * Used when client acknowledge receipt of dispatched message. + * Used when client acknowledge receipt of dispatched message. * @param node - * @throws IOException - * @throws Exception + * @throws IOException + * @throws Exception */ void acknowledge(ConnectionContext context, final MessageAck ack) throws Exception; - /** * Allows a consumer to pull a message on demand @@ -61,36 +60,36 @@ public interface Subscription extends SubscriptionRecovery { /** * Is the subscription interested in the message? - * @param node + * @param node * @param context * @return - * @throws IOException + * @throws IOException */ boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException; - + /** * Is the subscription interested in messages in the destination? * @param context * @return */ boolean matches(ActiveMQDestination destination); - + /** * The subscription will be receiving messages from the destination. - * @param context + * @param context * @param destination - * @throws Exception + * @throws Exception */ void add(ConnectionContext context, Destination destination) throws Exception; - + /** * The subscription will be no longer be receiving messages from the destination. - * @param context + * @param context * @param destination * @return a list of un-acked messages that were added to the subscription. */ List remove(ConnectionContext context, Destination destination) throws Exception; - + /** * The ConsumerInfo object that created the subscription. * @param destination @@ -102,11 +101,11 @@ public interface Subscription extends SubscriptionRecovery { * reclaim memory. */ void gc(); - + /** * Used by a Slave Broker to update dispatch infomation * @param mdn - * @throws Exception + * @throws Exception */ void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception; @@ -114,17 +113,17 @@ public interface Subscription extends SubscriptionRecovery { * @return number of messages pending delivery */ int getPendingQueueSize(); - + /** * @return number of messages dispatched to the client */ int getDispatchedQueueSize(); - + /** * @return number of messages dispatched to the client */ long getDispatchedCounter(); - + /** * @return number of messages that matched the subscription */ @@ -139,7 +138,7 @@ public interface Subscription extends SubscriptionRecovery { * @return the JMS selector on the current subscription */ String getSelector(); - + /** * Attempts to change the current active selector on the subscription. * This operation is not supported for persistent topics. @@ -155,29 +154,28 @@ public interface Subscription extends SubscriptionRecovery { * Set when the subscription is registered in JMX */ void setObjectName(ObjectName objectName); - + /** * @return true when 60% or more room is left for dispatching messages */ boolean isLowWaterMark(); - + /** * @return true when 10% or less room is left for dispatching messages */ boolean isHighWaterMark(); - + /** * @return true if there is no space to dispatch messages */ boolean isFull(); - + /** * inform the MessageConsumer on the client to change it's prefetch * @param newPrefetch */ void updateConsumerPrefetch(int newPrefetch); - - + /** * Called when the subscription is destroyed. */ @@ -187,17 +185,17 @@ public interface Subscription extends SubscriptionRecovery { * @return the prefetch size that is configured for the subscription */ int getPrefetchSize(); - + /** * @return the number of messages awaiting acknowledgement */ int getInFlightSize(); - + /** * @return the in flight messages as a percentage of the prefetch size */ int getInFlightUsage(); - + /** * Informs the Broker if the subscription needs to intervention to recover it's state * e.g. DurableTopicSubscriber may do @@ -205,25 +203,35 @@ public interface Subscription extends SubscriptionRecovery { * @return true if recovery required */ boolean isRecoveryRequired(); - - + /** * @return true if a browser */ boolean isBrowser(); - + /** * @return the number of messages this subscription can accept before its full */ int countBeforeFull(); ConnectionContext getContext(); - + public int getCursorMemoryHighWaterMark(); - public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark); + public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark); boolean isSlowConsumer(); void unmatched(MessageReference node) throws IOException; + + /** + * Returns the time since the last Ack message was received by this subscription. + * + * If there has never been an ack this value should be set to the creation time of the + * subscription. + * + * @return time of last received Ack message or Subscription create time if no Acks. + */ + long getTimeOfLastMessageAck(); + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 4474f2a234..a21fe3b233 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -261,6 +261,8 @@ public class TopicSubscription extends AbstractSubscription { @Override public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception { + super.acknowledge(context, ack); + // Handle the standard acknowledgment case. if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) { if (context.isInTransaction()) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java new file mode 100644 index 0000000000..73e8efecb3 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.policy; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abort slow consumers when they reach the configured threshold of slowness, + * + * default is that a consumer that has not Ack'd a message for 30 seconds is slow. + * + * @org.apache.xbean.XBean + */ +public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumerStrategy.class); + + private final List destinations = new LinkedList(); + private long maxTimeSinceLastAck = 30*1000; + private boolean ignoreIdleConsumers = true; + + public AbortSlowAckConsumerStrategy() { + this.name = "AbortSlowAckConsumerStrategy@" + hashCode(); + } + + @Override + public void setBrokerService(Broker broker) { + super.setBrokerService(broker); + + // Task starts right away since we may not receive any slow consumer events. + if (taskStarted.compareAndSet(false, true)) { + scheduler.executePeriodically(this, getCheckPeriod()); + } + } + + @Override + public void slowConsumer(ConnectionContext context, Subscription subs) { + // Ignore these events, we just look at time since last Ack. + } + + @Override + public void run() { + + if (maxTimeSinceLastAck < 0) { + // nothing to do + LOG.info("no limit set, slowConsumer strategy has nothing to do"); + return; + } + + if (getMaxSlowDuration() > 0) { + // For subscriptions that are already slow we mark them again and check below if + // they've exceeded their configured lifetime. + for (SlowConsumerEntry entry : slowConsumers.values()) { + entry.mark(); + } + } + + List disposed = new ArrayList(); + + for (Destination destination : destinations) { + if (destination.isDisposed()) { + disposed.add(destination); + continue; + } + + // Not explicitly documented but this returns a stable copy. + List subscribers = destination.getConsumers(); + + updateSlowConsumersList(subscribers); + } + + // Clean up an disposed destinations to save space. + destinations.removeAll(disposed); + + abortAllQualifiedSlowConsumers(); + } + + private void updateSlowConsumersList(List subscribers) { + for (Subscription subscriber : subscribers) { + + if (isIgnoreIdleConsumers() && subscriber.getDispatchedQueueSize() == 0) { + // Not considered Idle so ensure its cleared from the list + if (slowConsumers.remove(subscriber) != null) { + LOG.info("sub: {} is no longer slow", subscriber.getConsumerInfo().getConsumerId()); + } + } + + long lastAckTime = subscriber.getTimeOfLastMessageAck(); + long timeDelta = System.currentTimeMillis() - lastAckTime; + + if (timeDelta > maxTimeSinceLastAck) { + if (!slowConsumers.containsKey(subscriber)) { + if (LOG.isDebugEnabled()) { + LOG.debug("sub: {} is now slow", subscriber.getConsumerInfo().getConsumerId()); + } + slowConsumers.put(subscriber, new SlowConsumerEntry(subscriber.getContext())); + } else if (getMaxSlowCount() > 0) { + slowConsumers.get(subscriber).slow(); + } + } else { + if (slowConsumers.remove(subscriber) != null) { + LOG.info("sub: {} is no longer slow", subscriber.getConsumerInfo().getConsumerId()); + } + } + } + } + + private void abortAllQualifiedSlowConsumers() { + HashMap toAbort = new HashMap(); + for (Entry entry : slowConsumers.entrySet()) { + if (entry.getKey().isSlowConsumer()) { + if (getMaxSlowDuration() > 0 && + (entry.getValue().markCount * getCheckPeriod() > getMaxSlowDuration()) || + getMaxSlowCount() > 0 && entry.getValue().slowCount > getMaxSlowCount()) { + + toAbort.put(entry.getKey(), entry.getValue()); + slowConsumers.remove(entry.getKey()); + } + } + } + + // Now if any subscriptions made it into the aborts list we can kick them. + abortSubscription(toAbort, isAbortConnection()); + } + + @Override + public void addDestination(Destination destination) { + this.destinations.add(destination); + } + + /** + * Gets the maximum time since last Ack before a subscription is considered to be slow. + * + * @return the maximum time since last Ack before the consumer is considered to be slow. + */ + public long getMaxTimeSinceLastAck() { + return maxTimeSinceLastAck; + } + + /** + * Sets the maximum time since last Ack before a subscription is considered to be slow. + * + * @param maxTimeSinceLastAck + * the maximum time since last Ack (mills) before the consumer is considered to be slow. + */ + public void setMaxTimeSinceLastAck(long maxTimeSinceLastAck) { + this.maxTimeSinceLastAck = maxTimeSinceLastAck; + } + + /** + * Returns whether the strategy is configured to ignore consumers that are simply idle, i.e + * consumers that have no pending acks (dispatch queue is empty). + * + * @return true if the strategy will ignore idle consumer when looking for slow consumers. + */ + public boolean isIgnoreIdleConsumers() { + return ignoreIdleConsumers; + } + + /** + * Sets whether the strategy is configured to ignore consumers that are simply idle, i.e + * consumers that have no pending acks (dispatch queue is empty). + * + * When configured to not ignore idle consumers this strategy acks not only on consumers + * that are actually slow but also on any consumer that has not received any messages for + * the maxTimeSinceLastAck. This allows for a way to evict idle consumers while also + * aborting slow consumers. + * + * @param ignoreIdleConsumers + * Should this strategy ignore idle consumers or consider all consumers when checking + * the last ack time verses the maxTimeSinceLastAck value. + */ + public void setIgnoreIdleConsumers(boolean ignoreIdleConsumers) { + this.ignoreIdleConsumers = ignoreIdleConsumers; + } +} diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java index 653a3578ea..263ca037ce 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Connection; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.thread.Scheduler; @@ -34,40 +35,43 @@ import org.slf4j.LoggerFactory; /** * Abort slow consumers when they reach the configured threshold of slowness, default is slow for 30 seconds - * + * * @org.apache.xbean.XBean */ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable { - + private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerStrategy.class); - private String name = "AbortSlowConsumerStrategy@" + hashCode(); - private Scheduler scheduler; - private Broker broker; - private final AtomicBoolean taskStarted = new AtomicBoolean(false); - private final Map slowConsumers = new ConcurrentHashMap(); + protected String name = "AbortSlowConsumerStrategy@" + hashCode(); + protected Scheduler scheduler; + protected Broker broker; + protected final AtomicBoolean taskStarted = new AtomicBoolean(false); + protected final Map slowConsumers = + new ConcurrentHashMap(); private long maxSlowCount = -1; private long maxSlowDuration = 30*1000; private long checkPeriod = 30*1000; private boolean abortConnection = false; + @Override public void setBrokerService(Broker broker) { this.scheduler = broker.getScheduler(); this.broker = broker; } + @Override public void slowConsumer(ConnectionContext context, Subscription subs) { if (maxSlowCount < 0 && maxSlowDuration < 0) { // nothing to do LOG.info("no limits set, slowConsumer strategy has nothing to do"); return; } - + if (taskStarted.compareAndSet(false, true)) { scheduler.executePeriodically(this, checkPeriod); } - + if (!slowConsumers.containsKey(subs)) { slowConsumers.put(subs, new SlowConsumerEntry(context)); } else if (maxSlowCount > 0) { @@ -75,6 +79,7 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable } } + @Override public void run() { if (maxSlowDuration > 0) { // mark @@ -82,12 +87,12 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable entry.mark(); } } - + HashMap toAbort = new HashMap(); for (Entry entry : slowConsumers.entrySet()) { if (entry.getKey().isSlowConsumer()) { if (maxSlowDuration > 0 && (entry.getValue().markCount * checkPeriod > maxSlowDuration) - || maxSlowCount > 0 && entry.getValue().slowCount > maxSlowCount) { + || maxSlowCount > 0 && entry.getValue().slowCount > maxSlowCount) { toAbort.put(entry.getKey(), entry.getValue()); slowConsumers.remove(entry.getKey()); } @@ -100,19 +105,20 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable abortSubscription(toAbort, abortConnection); } - private void abortSubscription(Map toAbort, boolean abortSubscriberConnection) { + protected void abortSubscription(Map toAbort, boolean abortSubscriberConnection) { for (final Entry entry : toAbort.entrySet()) { ConnectionContext connectionContext = entry.getValue().context; if (connectionContext!= null) { try { LOG.info("aborting " - + (abortSubscriberConnection ? "connection" : "consumer") + + (abortSubscriberConnection ? "connection" : "consumer") + ", slow consumer: " + entry.getKey()); final Connection connection = connectionContext.getConnection(); if (connection != null) { if (abortSubscriberConnection) { scheduler.executeAfterDelay(new Runnable() { + @Override public void run() { connection.serviceException(new InactivityIOException("Consumer was slow too often (>" + maxSlowCount + ") or too long (>" @@ -137,12 +143,11 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable } } - public void abortConsumer(Subscription sub, boolean abortSubscriberConnection) { if (sub != null) { SlowConsumerEntry entry = slowConsumers.remove(sub); if (entry != null) { - Map toAbort = new HashMap(); + Map toAbort = new HashMap(); toAbort.put(sub, entry); abortSubscription(toAbort, abortSubscriberConnection); } else { @@ -151,7 +156,6 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable } } - public long getMaxSlowCount() { return maxSlowCount; } @@ -204,7 +208,7 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable public void setName(String name) { this.name = name; } - + public String getName() { return name; } @@ -212,4 +216,9 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable public Map getSlowConsumers() { return slowConsumers; } + + @Override + public void addDestination(Destination destination) { + // Not needed for this strategy. + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index b77f430aaa..97f4b3e2b7 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -145,7 +145,7 @@ public class PolicyEntry extends DestinationMapEntry { topic.setLazyDispatch(isLazyDispatch()); } - public void baseConfiguration(Broker broker,BaseDestination destination) { + public void baseConfiguration(Broker broker, BaseDestination destination) { destination.setProducerFlowControl(isProducerFlowControl()); destination.setAlwaysRetroactive(isAlwaysRetroactive()); destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval()); @@ -170,6 +170,7 @@ public class PolicyEntry extends DestinationMapEntry { SlowConsumerStrategy scs = getSlowConsumerStrategy(); if (scs != null) { scs.setBrokerService(broker); + scs.addDestination(destination); } destination.setSlowConsumerStrategy(scs); destination.setPrioritizedMessages(isPrioritizedMessages()); @@ -179,7 +180,6 @@ public class PolicyEntry extends DestinationMapEntry { destination.setReduceMemoryFootprint(isReduceMemoryFootprint()); destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage()); destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit()); - } public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java index d74fe91b1b..7b8ca4cc5c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SlowConsumerStrategy.java @@ -18,13 +18,41 @@ package org.apache.activemq.broker.region.policy; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Subscription; -/* - * a strategy for dealing with slow consumers +/** + * Interface for a strategy for dealing with slow consumers */ public interface SlowConsumerStrategy { + /** + * Slow consumer event. + * + * @param context + * Connection context of the subscription. + * @param subs + * The subscription object for the slow consumer. + */ void slowConsumer(ConnectionContext context, Subscription subs); + + /** + * Sets the Broker instance which can provide a Scheduler among other things. + * + * @param broker + * The running Broker. + */ void setBrokerService(Broker broker); + + /** + * For Strategies that need to examine assigned destination for slow consumers + * periodically the destination is assigned here. + * + * If the strategy doesn't is event driven it can just ignore assigned destination. + * + * @param destination + * A destination to add to a watch list. + */ + void addDestination(Destination destination); + } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumerTest.java new file mode 100644 index 0000000000..3ed88e2e7b --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumerTest.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy; +import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class AbortSlowAckConsumerTest extends AbortSlowConsumerTest { + + private static final Logger LOG = LoggerFactory.getLogger(AbortSlowAckConsumerTest.class); + + protected long maxTimeSinceLastAck = 5 * 1000; + + @Override + protected AbortSlowConsumerStrategy createSlowConsumerStrategy() { + return new AbortSlowConsumerStrategy(); + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + PolicyEntry policy = new PolicyEntry(); + + AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest; + strategy.setAbortConnection(abortConnection); + strategy.setCheckPeriod(checkPeriod); + strategy.setMaxSlowDuration(maxSlowDuration); + strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck); + + policy.setSlowConsumerStrategy(strategy); + policy.setQueuePrefetch(10); + policy.setTopicPrefetch(10); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + broker.setDestinationPolicy(pMap); + return broker; + } + + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + factory.getPrefetchPolicy().setAll(1); + return factory; + } + + @Override + public void testSlowConsumerIsAbortedViaJmx() throws Exception { + AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest; + strategy.setMaxTimeSinceLastAck(500); // so jmx does the abort + super.testSlowConsumerIsAbortedViaJmx(); + } + + @Override + public void initCombosForTestSlowConsumerIsAborted() { + addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE}); + addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE}); + } + + public void testZeroPrefetchConsumerIsAborted() throws Exception { + ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection(); + conn.setExceptionListener(this); + connections.add(conn); + + Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final MessageConsumer consumer = sess.createConsumer(destination); + assertNotNull(consumer); + conn.start(); + startProducers(destination, 20); + + Message message = consumer.receive(5000); + assertNotNull(message); + + try { + consumer.receive(20000); + fail("Slow consumer not aborted."); + } catch(Exception ex) { + } + } + + public void testIdleConsumerCanBeAbortedNoMessages() throws Exception { + AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest; + strategy.setIgnoreIdleConsumers(false); + + ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection(); + conn.setExceptionListener(this); + connections.add(conn); + + Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final MessageConsumer consumer = sess.createConsumer(destination); + assertNotNull(consumer); + conn.start(); + startProducers(destination, 20); + + try { + consumer.receive(20000); + fail("Idle consumer not aborted."); + } catch(Exception ex) { + } + } + + public void testIdleConsumerCanBeAborted() throws Exception { + AbortSlowAckConsumerStrategy strategy = (AbortSlowAckConsumerStrategy) underTest; + strategy.setIgnoreIdleConsumers(false); + + ActiveMQConnection conn = (ActiveMQConnection) createConnectionFactory().createConnection(); + conn.setExceptionListener(this); + connections.add(conn); + + Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final MessageConsumer consumer = sess.createConsumer(destination); + assertNotNull(consumer); + conn.start(); + startProducers(destination, 20); + + Message message = consumer.receive(5000); + assertNotNull(message); + message.acknowledge(); + + try { + consumer.receive(20000); + fail("Slow consumer not aborted."); + } catch(Exception ex) { + } + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java index ad8a56bd2b..a263ebcc1f 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java @@ -51,23 +51,25 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerTest.class); - AbortSlowConsumerStrategy underTest; - - public boolean abortConnection = false; - public long checkPeriod = 2 * 1000; - public long maxSlowDuration = 5 * 1000; - - private final List exceptions = new ArrayList(); + protected AbortSlowConsumerStrategy underTest; + protected boolean abortConnection = false; + protected long checkPeriod = 2 * 1000; + protected long maxSlowDuration = 5 * 1000; + protected final List exceptions = new ArrayList(); @Override protected void setUp() throws Exception { exceptions.clear(); topic = true; - underTest = new AbortSlowConsumerStrategy(); + underTest = createSlowConsumerStrategy(); super.setUp(); createDestination(); } + protected AbortSlowConsumerStrategy createSlowConsumerStrategy() { + return new AbortSlowConsumerStrategy(); + } + @Override protected BrokerService createBroker() throws Exception { BrokerService broker = super.createBroker(); @@ -244,7 +246,6 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty()); } - public void initCombosForTestAbortAlreadyClosedConnection() { addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE}); addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});