From 25af8e62ccbe1abd1cdfed3347938a71523ee3e7 Mon Sep 17 00:00:00 2001 From: Dejan Bosanac Date: Mon, 16 Sep 2013 15:14:36 +0200 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4621 - some improvements to the AbortSlowAckConsumerStrategy --- .../policy/AbortSlowAckConsumerStrategy.java | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java index 9de3ea9c4a..dedb580d2a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowAckConsumerStrategy.java @@ -103,7 +103,6 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy { private void updateSlowConsumersList(List subscribers) { for (Subscription subscriber : subscribers) { - if (isIgnoreNetworkSubscriptions() && subscriber.getConsumerInfo().isNetworkSubscription()) { if (slowConsumers.remove(subscriber) != null) { LOG.info("network sub: {} is no longer slow", subscriber.getConsumerInfo().getConsumerId()); @@ -119,13 +118,20 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy { continue; } + // don't mark consumers with no messages + if (subscriber.getInFlightSize() <= 0) { + continue; + } + long lastAckTime = subscriber.getTimeOfLastMessageAck(); long timeDelta = System.currentTimeMillis() - lastAckTime; if (timeDelta > maxTimeSinceLastAck) { if (!slowConsumers.containsKey(subscriber)) { LOG.debug("sub: {} is now slow", subscriber.getConsumerInfo().getConsumerId()); - slowConsumers.put(subscriber, new SlowConsumerEntry(subscriber.getContext())); + SlowConsumerEntry entry = new SlowConsumerEntry(subscriber.getContext()); + entry.mark(); // mark consumer on first run + slowConsumers.put(subscriber, entry); } else if (getMaxSlowCount() > 0) { slowConsumers.get(subscriber).slow(); } @@ -140,10 +146,13 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy { private void abortAllQualifiedSlowConsumers() { HashMap toAbort = new HashMap(); for (Entry entry : slowConsumers.entrySet()) { - if (getMaxSlowDuration() > 0 && (entry.getValue().markCount * getCheckPeriod() > getMaxSlowDuration()) || - getMaxSlowCount() > 0 && entry.getValue().slowCount > getMaxSlowCount()) { + if (getMaxSlowDuration() > 0 && (entry.getValue().markCount * getCheckPeriod() >= getMaxSlowDuration()) || + getMaxSlowCount() > 0 && entry.getValue().slowCount >= getMaxSlowCount()) { - LOG.trace("Transferring consumer{} to the abort list: {} slow duration = {}, slow count = {}", new Object[]{ entry.getKey().getConsumerInfo().getConsumerId(), entry.getValue().markCount * getCheckPeriod(), entry.getValue().getSlowCount() }); + LOG.trace("Transferring consumer {} to the abort list: " + + "slow duration = " + entry.getValue().markCount * getCheckPeriod() + ", " + + "slow count = " + entry.getValue().slowCount, + entry.getKey().getConsumerInfo().getConsumerId()); toAbort.put(entry.getKey(), entry.getValue()); slowConsumers.remove(entry.getKey());