resolve: https://issues.apache.org/activemq/browse/AMQ-2741 - visibility of abort slow consumer policy in via jmx

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@966319 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-07-21 17:09:33 +00:00
parent 12f0195913
commit 383d12ed0e
16 changed files with 541 additions and 97 deletions

View File

@ -657,7 +657,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
void doClose() throws JMSException {
dispose();
RemoveInfo removeCommand = info.createRemoveCommand();
LOG.info("remove: " + this.getConsumerId() + ", lasteDeliveredSequenceId:" + lastDeliveredSequenceId);
if (LOG.isDebugEnabled()) {
LOG.debug("remove: " + this.getConsumerId() + ", lastDeliveredSequenceId:" + lastDeliveredSequenceId);
}
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
this.session.asyncSendPacket(removeCommand);
}

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.broker.region.policy.SlowConsumerEntry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import java.util.Map;
public class AbortSlowConsumerStrategyView implements AbortSlowConsumerStrategyViewMBean {
private static final Log LOG = LogFactory.getLog(AbortSlowConsumerStrategyView.class);
private ManagedRegionBroker broker;
private AbortSlowConsumerStrategy strategy;
public AbortSlowConsumerStrategyView(ManagedRegionBroker managedRegionBroker, AbortSlowConsumerStrategy slowConsumerStrategy) {
this.broker = managedRegionBroker;
this.strategy = slowConsumerStrategy;
}
public long getMaxSlowCount() {
return strategy.getMaxSlowCount();
}
public void setMaxSlowCount(long maxSlowCount) {
strategy.setMaxSlowCount(maxSlowCount);
}
public long getMaxSlowDuration() {
return strategy.getMaxSlowDuration();
}
public void setMaxSlowDuration(long maxSlowDuration) {
strategy.setMaxSlowDuration(maxSlowDuration);
}
public long getCheckPeriod() {
return strategy.getCheckPeriod();
}
public TabularData getSlowConsumers() throws OpenDataException {
OpenTypeSupport.OpenTypeFactory factory = OpenTypeSupport.getFactory(SlowConsumerEntry.class);
CompositeType ct = factory.getCompositeType();
TabularType tt = new TabularType("SlowConsumers", "Table of current slow Consumers", ct, new String[] {"subscription" });
TabularDataSupport rc = new TabularDataSupport(tt);
int index = 0;
Map<Subscription, SlowConsumerEntry> slowConsumers = strategy.getSlowConsumers();
for (Map.Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) {
entry.getValue().setSubscription(broker.getSubscriberObjectName(entry.getKey()));
rc.put(OpenTypeSupport.convert(entry.getValue()));
}
return rc;
}
public void abortConsumer(ObjectName consumerToAbort) {
Subscription sub = broker.getSubscriber(consumerToAbort);
if (sub != null) {
LOG.info("aborting consumer via jmx: " + sub.getConsumerInfo().getConsumerId());
strategy.abortConsumer(sub, false);
} else {
LOG.warn("cannot resolve subscription matching name: " + consumerToAbort);
}
}
public void abortConnection(ObjectName consumerToAbort) {
Subscription sub = broker.getSubscriber(consumerToAbort);
if (sub != null) {
LOG.info("aborting consumer connection via jmx: " + sub.getConsumerInfo().getConsumerId().getConnectionId());
strategy.abortConsumer(sub, true);
} else {
LOG.warn("cannot resolve subscription matching name: " + consumerToAbort);
}
}
public void abortConsumer(String objectNameOfConsumerToAbort) {
abortConsumer(toObjectName(objectNameOfConsumerToAbort));
}
public void abortConnection(String objectNameOfConsumerToAbort) {
abortConnection(toObjectName(objectNameOfConsumerToAbort));
}
private ObjectName toObjectName(String objectName) {
ObjectName result = null;
try {
result = new ObjectName(objectName);
} catch (Exception e) {
LOG.warn("cannot create subscription ObjectName to abort, from string: " + objectName);
}
return result;
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import javax.management.ObjectName;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
public interface AbortSlowConsumerStrategyViewMBean {
@MBeanInfo("returns the current max slow count, -1 disables")
long getMaxSlowCount();
@MBeanInfo("sets the count after which a slow consumer will be aborted, -1 disables")
void setMaxSlowCount(long maxSlowCount);
@MBeanInfo("returns the current max slow (milliseconds) duration")
long getMaxSlowDuration();
@MBeanInfo("sets the duration (milliseconds) after which a continually slow consumer will be aborted")
void setMaxSlowDuration(long maxSlowDuration);
@MBeanInfo("returns the check period at which a sweep of consumers is done to determine continued slowness")
public long getCheckPeriod();
@MBeanInfo("returns the current list of slow consumers, Not HTML friendly")
TabularData getSlowConsumers() throws OpenDataException;
@MBeanInfo("aborts the slow consumer gracefully by sending a shutdown control message to just that consumer")
void abortConsumer(ObjectName consumerToAbort);
@MBeanInfo("aborts the slow consumer forcefully by shutting down it's connection, note: all other users of the connection will be affected")
void abortConnection(ObjectName consumerToAbort);
@MBeanInfo("aborts the slow consumer gracefully by sending a shutdown control message to just that consumer")
void abortConsumer(String objectNameOfConsumerToAbort);
@MBeanInfo("aborts the slow consumer forcefully by shutting down it's connection, note: all other users of the connection will be affected")
void abortConnection(String objectNameOfConsumerToAbort);
}

View File

@ -39,6 +39,8 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
@ -388,4 +390,13 @@ public class DestinationView implements DestinationViewMBean {
return answer;
}
public ObjectName getSlowConsumerStrategy() throws IOException, MalformedObjectNameException {
ObjectName result = null;
SlowConsumerStrategy strategy = destination.getSlowConsumerStrategy();
if (strategy != null && strategy instanceof AbortSlowConsumerStrategy) {
result = broker.registerSlowConsumerStrategy((AbortSlowConsumerStrategy)strategy);
}
return result;
}
}

View File

@ -332,4 +332,13 @@ public interface DestinationViewMBean {
@MBeanInfo("returns all the current subscription MBeans matching this destination")
ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException;
/**
* Returns the slow consumer strategy MBean for this destination
*
* @return the name of the slow consumer handler MBean for this destination
*/
@MBeanInfo("returns the optional slowConsumer handler MBeans for this destination")
ObjectName getSlowConsumerStrategy() throws IOException, MalformedObjectNameException;
}

View File

@ -53,6 +53,8 @@ import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
@ -260,10 +262,12 @@ public class ManagedRegionBroker extends RegionBroker {
}
protected void unregisterDestination(ObjectName key) throws Exception {
topics.remove(key);
queues.remove(key);
temporaryQueues.remove(key);
temporaryTopics.remove(key);
DestinationView view = null;
removeAndRemember(topics, key, view);
removeAndRemember(queues, key, view);
removeAndRemember(temporaryQueues, key, view);
removeAndRemember(temporaryTopics, key, view);
if (registeredMBeans.remove(key)) {
try {
managementContext.unregisterMBean(key);
@ -272,6 +276,24 @@ public class ManagedRegionBroker extends RegionBroker {
LOG.debug("Failure reason: " + e, e);
}
}
if (view != null) {
key = view.getSlowConsumerStrategy();
if (key!= null && registeredMBeans.remove(key)) {
try {
managementContext.unregisterMBean(key);
} catch (Throwable e) {
LOG.warn("Failed to unregister slow consumer strategy MBean: " + key);
LOG.debug("Failure reason: " + e, e);
}
}
}
}
private void removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) {
DestinationView candidate = map.remove(key);
if (candidate != null && view == null) {
view = candidate;
}
}
protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception {
@ -527,4 +549,42 @@ public class ManagedRegionBroker extends RegionBroker {
+ JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
return objectName;
}
public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
ObjectName objectName = null;
try {
objectName = createObjectName(strategy);
if (!registeredMBeans.contains(objectName)) {
AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy);
AnnotatedMBean.registerMBean(managementContext, view, objectName);
registeredMBeans.add(objectName);
}
} catch (Exception e) {
LOG.warn("Failed to register MBean: " + strategy);
LOG.debug("Failure reason: " + e, e);
}
return objectName;
}
private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{
Hashtable map = brokerObjectName.getKeyPropertyList();
ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ","
+ "Type=SlowConsumerStrategy," + "InstanceName=" + JMXSupport.encodeObjectNamePart(strategy.getName()));
return objectName;
}
public ObjectName getSubscriberObjectName(Subscription key) {
return subscriptionMap.get(key);
}
public Subscription getSubscriber(ObjectName key) {
Subscription sub = null;
for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) {
if (entry.getValue().equals(key)) {
sub = entry.getKey();
break;
}
}
return sub;
}
}

View File

@ -41,6 +41,9 @@ import javax.management.openmbean.OpenType;
import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.SlowConsumerEntry;
import org.apache.activemq.broker.scheduler.Job;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMapMessage;
@ -429,7 +432,30 @@ public final class OpenTypeSupport {
}
}
static class SlowConsumerEntryOpenTypeFactory extends AbstractOpenTypeFactory {
@Override
protected String getTypeName() {
return SlowConsumerEntry.class.getName();
}
@Override
protected void init() throws OpenDataException {
super.init();
addItem("subscription", "the subscription view", SimpleType.OBJECTNAME);
addItem("slowCount", "number of times deemed slow", SimpleType.INTEGER);
addItem("markCount", "number of periods remaining slow", SimpleType.INTEGER);
}
@Override
public Map<String, Object> getFields(Object o) throws OpenDataException {
SlowConsumerEntry entry = (SlowConsumerEntry) o;
Map<String, Object> rc = super.getFields(o);
rc.put("subscription", entry.getSubscription());
rc.put("slowCount", Integer.valueOf(entry.getSlowCount()));
rc.put("markCount", Integer.valueOf(entry.getMarkCount()));
return rc;
}
}
static {
OPEN_TYPE_FACTORIES.put(ActiveMQMessage.class, new MessageOpenTypeFactory());
@ -439,6 +465,7 @@ public final class OpenTypeSupport {
OPEN_TYPE_FACTORIES.put(ActiveMQStreamMessage.class, new StreamMessageOpenTypeFactory());
OPEN_TYPE_FACTORIES.put(ActiveMQTextMessage.class, new TextMessageOpenTypeFactory());
OPEN_TYPE_FACTORIES.put(Job.class, new JobOpenTypeFactory());
OPEN_TYPE_FACTORIES.put(SlowConsumerEntry.class, new SlowConsumerEntryOpenTypeFactory());
}
private OpenTypeSupport() {
@ -448,7 +475,7 @@ public final class OpenTypeSupport {
return OPEN_TYPE_FACTORIES.get(clazz);
}
public static CompositeData convert(Message message) throws OpenDataException {
public static CompositeData convert(Object message) throws OpenDataException {
OpenTypeFactory f = getFactory(message.getClass());
if (f == null) {
throw new OpenDataException("Cannot create a CompositeData for type: " + message.getClass().getName());

View File

@ -602,6 +602,10 @@ public abstract class BaseDestination implements Destination {
this.slowConsumerStrategy = slowConsumerStrategy;
}
public SlowConsumerStrategy getSlowConsumerStrategy() {
return this.slowConsumerStrategy;
}
public boolean isPrioritizedMessages() {
return this.prioritizedMessages;

View File

@ -23,6 +23,7 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@ -215,4 +216,6 @@ public interface Destination extends Service, Task {
void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception;
boolean isPrioritizedMessages();
SlowConsumerStrategy getSlowConsumerStrategy();
}

View File

@ -23,6 +23,7 @@ import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@ -273,4 +274,8 @@ public class DestinationFilter implements Destination {
public boolean isPrioritizedMessages() {
return next.isPrioritizedMessages();
}
public SlowConsumerStrategy getSlowConsumerStrategy() {
return next.getSlowConsumerStrategy();
}
}

View File

@ -407,8 +407,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
* Checks an ack versus the contents of the dispatched list.
*
* @param ack
* @param firstAckedMsg
* @param lastAckedMsg
* @throws JMSException if it does not match
*/
protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {

View File

@ -1,3 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import java.util.HashMap;
@ -5,6 +21,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Subscription;
@ -23,7 +41,9 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
private static final Log LOG = LogFactory.getLog(AbortSlowConsumerStrategy.class);
private String name = "AbortSlowConsumerStrategy@" + hashCode();
private Scheduler scheduler;
private Broker broker;
private final AtomicBoolean taskStarted = new AtomicBoolean(false);
private final Map<Subscription, SlowConsumerEntry> slowConsumers = new ConcurrentHashMap<Subscription, SlowConsumerEntry>();
@ -32,10 +52,11 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
private long checkPeriod = 30*1000;
private boolean abortConnection = false;
public void setScheduler(Scheduler s) {
this.scheduler=s;
}
public void setBrokerService(Broker broker) {
this.scheduler = broker.getScheduler();
this.broker = broker;
}
public void slowConsumer(ConnectionContext context, Subscription subs) {
if (maxSlowCount < 0 && maxSlowDuration < 0) {
// nothing to do
@ -75,21 +96,25 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
slowConsumers.remove(entry.getKey());
}
}
abortSubscription(toAbort, abortConnection);
}
private void abortSubscription(Map<Subscription, SlowConsumerEntry> toAbort, boolean abortSubscriberConnection) {
for (final Entry<Subscription, SlowConsumerEntry> entry : toAbort.entrySet()) {
ConnectionContext connectionContext = entry.getValue().context;
if (connectionContext!= null) {
try {
LOG.info("aborting "
+ (abortConnection ? "connection" : "consumer")
+ ", slow consumer: " + entry.getKey().getConsumerInfo().getConsumerId());
LOG.info("aborting "
+ (abortSubscriberConnection ? "connection" : "consumer")
+ ", slow consumer: " + entry.getKey().getConsumerInfo().getConsumerId());
final Connection connection = connectionContext.getConnection();
if (connection != null) {
if (abortConnection) {
if (connection != null) {
if (abortSubscriberConnection) {
scheduler.executeAfterDelay(new Runnable() {
public void run() {
connection.serviceException(new InactivityIOException("Consumer was slow too often (>"
connection.serviceException(new InactivityIOException("Consumer was slow too often (>"
+ maxSlowCount + ") or too long (>"
+ maxSlowDuration + "): " + entry.getKey().getConsumerInfo().getConsumerId()));
}}, 0l);
@ -97,21 +122,36 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
// just abort the consumer by telling it to stop
ConsumerControl stopConsumer = new ConsumerControl();
stopConsumer.setConsumerId(entry.getKey().getConsumerInfo().getConsumerId());
stopConsumer.setClose(true);
stopConsumer.setClose(true);
connection.dispatchAsync(stopConsumer);
}
} else {
LOG.debug("slowConsumer abort ignored, no connection in context:" + connectionContext);
}
} catch (Exception e) {
LOG.info("exception on stopping "
+ (abortConnection ? "connection" : "consumer")
+ " to abort slow consumer: " + entry.getKey(), e);
LOG.info("exception on stopping "
+ (abortSubscriberConnection ? "connection" : "consumer")
+ " to abort slow consumer: " + entry.getKey(), e);
}
}
}
}
public void abortConsumer(Subscription sub, boolean abortSubscriberConnection) {
if (sub != null) {
SlowConsumerEntry entry = slowConsumers.remove(sub);
if (entry != null) {
Map toAbort = new HashMap<Subscription, SlowConsumerEntry>();
toAbort.put(sub, entry);
abortSubscription(toAbort, abortSubscriberConnection);
} else {
LOG.warn("cannot abort subscription as it no longer exists in the map of slow consumers: " + sub);
}
}
}
public long getMaxSlowCount() {
return maxSlowCount;
}
@ -120,7 +160,7 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
* number of times a subscription can be deemed slow before triggering abort
* effect depends on dispatch rate as slow determination is done on dispatch
*/
public void setMaxSlowCount(int maxSlowCount) {
public void setMaxSlowCount(long maxSlowCount) {
this.maxSlowCount = maxSlowCount;
}
@ -161,22 +201,15 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
this.abortConnection = abortConnection;
}
static class SlowConsumerEntry {
final ConnectionContext context;
int slowCount = 1;
int markCount = 0;
SlowConsumerEntry(ConnectionContext context) {
this.context = context;
}
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void slow() {
slowCount++;
}
public void mark() {
markCount++;
}
public Map<Subscription, SlowConsumerEntry> getSlowConsumers() {
return slowConsumers;
}
}

View File

@ -157,7 +157,7 @@ public class PolicyEntry extends DestinationMapEntry {
destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark());
SlowConsumerStrategy scs = getSlowConsumerStrategy();
if (scs != null) {
scs.setScheduler(broker.getScheduler());
scs.setBrokerService(broker);
}
destination.setSlowConsumerStrategy(scs);
destination.setPrioritizedMessages(isPrioritizedMessages());

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.ConnectionContext;
public class SlowConsumerEntry {
final ConnectionContext context;
Object subscription;
int slowCount = 1;
int markCount = 0;
SlowConsumerEntry(ConnectionContext context) {
this.context = context;
}
public void slow() {
slowCount++;
}
public void mark() {
markCount++;
}
public void setSubscription(Object subscriptionObjectName) {
this.subscription = subscriptionObjectName;
}
public Object getSubscription() {
return subscription;
}
public int getSlowCount() {
return slowCount;
}
public int getMarkCount() {
return markCount;
}
}

View File

@ -1,8 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.thread.Scheduler;
/*
* a strategy for dealing with slow consumers
@ -10,6 +26,5 @@ import org.apache.activemq.thread.Scheduler;
public interface SlowConsumerStrategy {
void slowConsumer(ConnectionContext context, Subscription subs);
void setScheduler(Scheduler scheduler);
void setBrokerService(Broker broker);
}

View File

@ -16,41 +16,45 @@
*/
package org.apache.activemq.broker.policy;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleClientsTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.MessageIdList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleClientsTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.util.MessageIdList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport implements ExceptionListener {
private static final Log LOG = LogFactory.getLog(AbortSlowConsumerTest.class);
AbortSlowConsumerStrategy underTest;
public boolean abortConnection = false;
public long checkPeriod = 2*1000;
public long maxSlowDuration = 5*1000;
public long checkPeriod = 2 * 1000;
public long maxSlowDuration = 5 * 1000;
private List<Throwable> exceptions = new ArrayList<Throwable>();
@Override
protected void setUp() throws Exception {
exceptions.clear();
@ -59,7 +63,7 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
super.setUp();
createDestination();
}
@Override
protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();
@ -79,7 +83,7 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
public void testRegularConsumerIsNotAborted() throws Exception {
startConsumers(destination);
for (Connection c: connections) {
for (Connection c : connections) {
c.setExceptionListener(this);
}
startProducers(destination, 100);
@ -90,12 +94,12 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
public void initCombosForTestLittleSlowConsumerIsNotAborted() {
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
}
public void testLittleSlowConsumerIsNotAborted() throws Exception {
startConsumers(destination);
Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
consumertoAbort.getValue().setProcessingDelay(500);
for (Connection c: connections) {
for (Connection c : connections) {
c.setExceptionListener(this);
}
startProducers(destination, 12);
@ -103,56 +107,102 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
allMessagesList.assertAtLeastMessagesReceived(10);
}
public void initCombosForTestSlowConsumerIsAborted() {
addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
}
public void testSlowConsumerIsAborted() throws Exception {
startConsumers(destination);
Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
consumertoAbort.getValue().setProcessingDelay(8*1000);
for (Connection c: connections) {
consumertoAbort.getValue().setProcessingDelay(8 * 1000);
for (Connection c : connections) {
c.setExceptionListener(this);
}
startProducers(destination, 100);
consumertoAbort.getValue().assertMessagesReceived(1);
TimeUnit.SECONDS.sleep(5);
consumertoAbort.getValue().assertAtMostMessagesReceived(1);
consumertoAbort.getValue().assertAtMostMessagesReceived(1);
}
public void testSlowConsumerIsAbortedViaJmx() throws Exception {
underTest.setMaxSlowDuration(60*1000); // so jmx does the abort
startConsumers(destination);
Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
consumertoAbort.getValue().setProcessingDelay(8 * 1000);
for (Connection c : connections) {
c.setExceptionListener(this);
}
startProducers(destination, 100);
consumertoAbort.getValue().assertMessagesReceived(1);
ActiveMQDestination amqDest = (ActiveMQDestination)destination;
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:Type=" +
(amqDest.isTopic() ? "Topic" : "Queue") +",Destination="
+ amqDest.getPhysicalName() + ",BrokerName=localhost");
QueueViewMBean queue = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy();
assertNotNull(slowConsumerPolicyMBeanName);
AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean)
broker.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName, AbortSlowConsumerStrategyViewMBean.class, true);
TimeUnit.SECONDS.sleep(3);
TabularData slowOnes = abortPolicy.getSlowConsumers();
assertEquals("one slow consumers", 1, slowOnes.size());
LOG.info("slow ones:" + slowOnes);
CompositeData slowOne = (CompositeData) slowOnes.values().iterator().next();
LOG.info("Slow one: " + slowOne);
assertTrue("we have an object name", slowOne.get("subscription") instanceof ObjectName);
abortPolicy.abortConsumer((ObjectName)slowOne.get("subscription"));
consumertoAbort.getValue().assertAtMostMessagesReceived(1);
slowOnes = abortPolicy.getSlowConsumers();
assertEquals("no slow consumers left", 0, slowOnes.size());
}
public void testOnlyOneSlowConsumerIsAborted() throws Exception {
consumerCount = 10;
startConsumers(destination);
Entry<MessageConsumer, MessageIdList> consumertoAbort = consumers.entrySet().iterator().next();
consumertoAbort.getValue().setProcessingDelay(8*1000);
for (Connection c: connections) {
consumertoAbort.getValue().setProcessingDelay(8 * 1000);
for (Connection c : connections) {
c.setExceptionListener(this);
}
startProducers(destination, 100);
allMessagesList.waitForMessagesToArrive(99);
allMessagesList.assertAtLeastMessagesReceived(99);
consumertoAbort.getValue().assertMessagesReceived(1);
TimeUnit.SECONDS.sleep(5);
consumertoAbort.getValue().assertAtMostMessagesReceived(1);
consumertoAbort.getValue().assertAtMostMessagesReceived(1);
}
public void testAbortAlreadyClosingConsumers() throws Exception {
consumerCount = 1;
startConsumers(destination);
for (MessageIdList list : consumers.values()) {
list.setProcessingDelay(6*1000);
list.setProcessingDelay(6 * 1000);
}
for (Connection c: connections) {
for (Connection c : connections) {
c.setExceptionListener(this);
}
startProducers(destination, 100);
@ -164,12 +214,12 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
consumer.close();
}
}
public void initCombosForTestAbortAlreadyClosedConsumers() {
addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
}
public void testAbortAlreadyClosedConsumers() throws Exception {
Connection conn = createConnectionFactory().createConnection();
conn.setExceptionListener(this);
@ -182,17 +232,17 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
TimeUnit.SECONDS.sleep(1);
LOG.info("closing consumer: " + consumer);
consumer.close();
TimeUnit.SECONDS.sleep(5);
assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
}
public void initCombosForTestAbortAlreadyClosedConnection() {
addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
}
public void testAbortAlreadyClosedConnection() throws Exception {
Connection conn = createConnectionFactory().createConnection();
conn.setExceptionListener(this);
@ -204,7 +254,7 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
TimeUnit.SECONDS.sleep(1);
LOG.info("closing connection: " + conn);
conn.close();
TimeUnit.SECONDS.sleep(5);
assertTrue("no exceptions : " + exceptions.toArray(), exceptions.isEmpty());
}
@ -212,12 +262,12 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
public void testAbortConsumerOnDeadConnection() throws Exception {
// socket proxy on pause, close could hang??
}
public void onException(JMSException exception) {
exceptions.add(exception);
exception.printStackTrace();
exception.printStackTrace();
}
public static Test suite() {
return suite(AbortSlowConsumerTest.class);
}