Update the consumer / connection abort process such that when the
strategy is configured to abort the connection is only attempt to do so
once instead of once for every subscription in the map.  Also improve
logging to better indicate the subscription being aborted and the
destination that the subscription was on.
This commit is contained in:
Timothy Bish 2013-09-12 15:07:13 -04:00
parent 272b846b0c
commit cdb7bb11ff
1 changed files with 64 additions and 27 deletions

View File

@ -16,7 +16,9 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -106,38 +108,73 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
} }
protected void abortSubscription(Map<Subscription, SlowConsumerEntry> toAbort, boolean abortSubscriberConnection) { protected void abortSubscription(Map<Subscription, SlowConsumerEntry> toAbort, boolean abortSubscriberConnection) {
Map<Connection, List<Subscription>> abortMap = new HashMap<Connection, List<Subscription>>();
for (final Entry<Subscription, SlowConsumerEntry> entry : toAbort.entrySet()) { for (final Entry<Subscription, SlowConsumerEntry> entry : toAbort.entrySet()) {
ConnectionContext connectionContext = entry.getValue().context; ConnectionContext connectionContext = entry.getValue().context;
if (connectionContext!= null) { if (connectionContext == null) {
try { continue;
LOG.info("aborting " }
+ (abortSubscriberConnection ? "connection" : "consumer")
+ ", slow consumer: " + entry.getKey());
final Connection connection = connectionContext.getConnection(); Connection connection = connectionContext.getConnection();
if (connection != null) { if (connection == null) {
if (abortSubscriberConnection) { LOG.debug("slowConsumer abort ignored, no connection in context:" + connectionContext);
scheduler.executeAfterDelay(new Runnable() { }
@Override
public void run() { if (!abortMap.containsKey(connection)) {
connection.serviceException(new InactivityIOException("Consumer was slow too often (>" abortMap.put(connection, new ArrayList<Subscription>());
+ maxSlowCount + ") or too long (>" }
+ maxSlowDuration + "): " + entry.getKey().getConsumerInfo().getConsumerId()));
}}, 0l); abortMap.get(connection).add(entry.getKey());
} else { }
// just abort the consumer by telling it to stop
ConsumerControl stopConsumer = new ConsumerControl(); for (Entry<Connection, List<Subscription>> entry : abortMap.entrySet()) {
stopConsumer.setConsumerId(entry.getKey().getConsumerInfo().getConsumerId()); final Connection connection = entry.getKey();
stopConsumer.setClose(true); final List<Subscription> subscriptions = entry.getValue();
connection.dispatchAsync(stopConsumer);
} if (abortSubscriberConnection) {
} else {
LOG.debug("slowConsumer abort ignored, no connection in context:" + connectionContext); LOG.info("aborting connection:{} with {} slow consumers",
connection.getConnectionId(), subscriptions.size());
if (LOG.isTraceEnabled()) {
for (Subscription subscription : subscriptions) {
LOG.trace("Connection {} being aborted because of slow consumer: {} on destination: {}",
new Object[] { connection.getConnectionId(),
subscription.getConsumerInfo().getConsumerId(),
subscription.getActiveMQDestination() });
} }
}
try {
scheduler.executeAfterDelay(new Runnable() {
@Override
public void run() {
connection.serviceException(new InactivityIOException(
subscriptions.size() + " Consumers was slow too often (>"
+ maxSlowCount + ") or too long (>"
+ maxSlowDuration + "): "));
}}, 0l);
} catch (Exception e) { } catch (Exception e) {
LOG.info("exception on stopping " LOG.info("exception on aborting connection {} with {} slow consumers",
+ (abortSubscriberConnection ? "connection" : "consumer") connection.getConnectionId(), subscriptions.size());
+ " to abort slow consumer: " + entry.getKey(), e); }
} else {
// just abort each consumer by telling it to stop
for (Subscription subscription : subscriptions) {
LOG.info("aborting slow consumer: {} for destination:{}",
subscription.getConsumerInfo().getConsumerId(),
subscription.getActiveMQDestination());
try {
ConsumerControl stopConsumer = new ConsumerControl();
stopConsumer.setConsumerId(subscription.getConsumerInfo().getConsumerId());
stopConsumer.setClose(true);
connection.dispatchAsync(stopConsumer);
} catch (Exception e) {
LOG.info("exception on aborting slow consumer: {}", subscription.getConsumerInfo().getConsumerId());
}
} }
} }
} }