https://issues.apache.org/jira/browse/AMQ-4621 - some improvements to the AbortSlowAckConsumerStrategy

This commit is contained in:
Dejan Bosanac 2013-09-16 15:14:36 +02:00
parent e1bbde7302
commit 25af8e62cc
1 changed files with 14 additions and 5 deletions

View File

@ -103,7 +103,6 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy {
private void updateSlowConsumersList(List<Subscription> subscribers) { private void updateSlowConsumersList(List<Subscription> subscribers) {
for (Subscription subscriber : subscribers) { for (Subscription subscriber : subscribers) {
if (isIgnoreNetworkSubscriptions() && subscriber.getConsumerInfo().isNetworkSubscription()) { if (isIgnoreNetworkSubscriptions() && subscriber.getConsumerInfo().isNetworkSubscription()) {
if (slowConsumers.remove(subscriber) != null) { if (slowConsumers.remove(subscriber) != null) {
LOG.info("network sub: {} is no longer slow", subscriber.getConsumerInfo().getConsumerId()); LOG.info("network sub: {} is no longer slow", subscriber.getConsumerInfo().getConsumerId());
@ -119,13 +118,20 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy {
continue; continue;
} }
// don't mark consumers with no messages
if (subscriber.getInFlightSize() <= 0) {
continue;
}
long lastAckTime = subscriber.getTimeOfLastMessageAck(); long lastAckTime = subscriber.getTimeOfLastMessageAck();
long timeDelta = System.currentTimeMillis() - lastAckTime; long timeDelta = System.currentTimeMillis() - lastAckTime;
if (timeDelta > maxTimeSinceLastAck) { if (timeDelta > maxTimeSinceLastAck) {
if (!slowConsumers.containsKey(subscriber)) { if (!slowConsumers.containsKey(subscriber)) {
LOG.debug("sub: {} is now slow", subscriber.getConsumerInfo().getConsumerId()); LOG.debug("sub: {} is now slow", subscriber.getConsumerInfo().getConsumerId());
slowConsumers.put(subscriber, new SlowConsumerEntry(subscriber.getContext())); SlowConsumerEntry entry = new SlowConsumerEntry(subscriber.getContext());
entry.mark(); // mark consumer on first run
slowConsumers.put(subscriber, entry);
} else if (getMaxSlowCount() > 0) { } else if (getMaxSlowCount() > 0) {
slowConsumers.get(subscriber).slow(); slowConsumers.get(subscriber).slow();
} }
@ -140,10 +146,13 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy {
private void abortAllQualifiedSlowConsumers() { private void abortAllQualifiedSlowConsumers() {
HashMap<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>(); HashMap<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>();
for (Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) { for (Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) {
if (getMaxSlowDuration() > 0 && (entry.getValue().markCount * getCheckPeriod() > getMaxSlowDuration()) || if (getMaxSlowDuration() > 0 && (entry.getValue().markCount * getCheckPeriod() >= getMaxSlowDuration()) ||
getMaxSlowCount() > 0 && entry.getValue().slowCount > getMaxSlowCount()) { getMaxSlowCount() > 0 && entry.getValue().slowCount >= getMaxSlowCount()) {
LOG.trace("Transferring consumer{} to the abort list: {} slow duration = {}, slow count = {}", new Object[]{ entry.getKey().getConsumerInfo().getConsumerId(), entry.getValue().markCount * getCheckPeriod(), entry.getValue().getSlowCount() }); LOG.trace("Transferring consumer {} to the abort list: " +
"slow duration = " + entry.getValue().markCount * getCheckPeriod() + ", " +
"slow count = " + entry.getValue().slowCount,
entry.getKey().getConsumerInfo().getConsumerId());
toAbort.put(entry.getKey(), entry.getValue()); toAbort.put(entry.getKey(), entry.getValue());
slowConsumers.remove(entry.getKey()); slowConsumers.remove(entry.getKey());