diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java index 263ca037ce..46f68d8168 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbortSlowConsumerStrategy.java @@ -16,7 +16,9 @@ */ package org.apache.activemq.broker.region.policy; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; @@ -106,38 +108,73 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable } protected void abortSubscription(Map toAbort, boolean abortSubscriberConnection) { + + Map> abortMap = new HashMap>(); + for (final Entry entry : toAbort.entrySet()) { ConnectionContext connectionContext = entry.getValue().context; - if (connectionContext!= null) { - try { - LOG.info("aborting " - + (abortSubscriberConnection ? "connection" : "consumer") - + ", slow consumer: " + entry.getKey()); + if (connectionContext == null) { + continue; + } - final Connection connection = connectionContext.getConnection(); - if (connection != null) { - if (abortSubscriberConnection) { - scheduler.executeAfterDelay(new Runnable() { - @Override - public void run() { - connection.serviceException(new InactivityIOException("Consumer was slow too often (>" - + maxSlowCount + ") or too long (>" - + maxSlowDuration + "): " + entry.getKey().getConsumerInfo().getConsumerId())); - }}, 0l); - } else { - // just abort the consumer by telling it to stop - ConsumerControl stopConsumer = new ConsumerControl(); - stopConsumer.setConsumerId(entry.getKey().getConsumerInfo().getConsumerId()); - stopConsumer.setClose(true); - connection.dispatchAsync(stopConsumer); - } - } else { - LOG.debug("slowConsumer abort ignored, no connection in context:" + connectionContext); + Connection connection = connectionContext.getConnection(); + if (connection == null) { + LOG.debug("slowConsumer abort ignored, no connection in context:" + connectionContext); + } + + if (!abortMap.containsKey(connection)) { + abortMap.put(connection, new ArrayList()); + } + + abortMap.get(connection).add(entry.getKey()); + } + + for (Entry> entry : abortMap.entrySet()) { + final Connection connection = entry.getKey(); + final List subscriptions = entry.getValue(); + + if (abortSubscriberConnection) { + + LOG.info("aborting connection:{} with {} slow consumers", + connection.getConnectionId(), subscriptions.size()); + + if (LOG.isTraceEnabled()) { + for (Subscription subscription : subscriptions) { + LOG.trace("Connection {} being aborted because of slow consumer: {} on destination: {}", + new Object[] { connection.getConnectionId(), + subscription.getConsumerInfo().getConsumerId(), + subscription.getActiveMQDestination() }); } + } + + try { + scheduler.executeAfterDelay(new Runnable() { + @Override + public void run() { + connection.serviceException(new InactivityIOException( + subscriptions.size() + " Consumers was slow too often (>" + + maxSlowCount + ") or too long (>" + + maxSlowDuration + "): ")); + }}, 0l); } catch (Exception e) { - LOG.info("exception on stopping " - + (abortSubscriberConnection ? "connection" : "consumer") - + " to abort slow consumer: " + entry.getKey(), e); + LOG.info("exception on aborting connection {} with {} slow consumers", + connection.getConnectionId(), subscriptions.size()); + } + } else { + // just abort each consumer by telling it to stop + for (Subscription subscription : subscriptions) { + LOG.info("aborting slow consumer: {} for destination:{}", + subscription.getConsumerInfo().getConsumerId(), + subscription.getActiveMQDestination()); + + try { + ConsumerControl stopConsumer = new ConsumerControl(); + stopConsumer.setConsumerId(subscription.getConsumerInfo().getConsumerId()); + stopConsumer.setClose(true); + connection.dispatchAsync(stopConsumer); + } catch (Exception e) { + LOG.info("exception on aborting slow consumer: {}", subscription.getConsumerInfo().getConsumerId()); + } } } }