diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java index ae723c7ba8..c26d1311c4 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java @@ -19,7 +19,6 @@ package org.apache.activemq.broker.jmx; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; -import java.util.Hashtable; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -40,12 +39,23 @@ import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; -import org.apache.activemq.broker.region.*; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DestinationFactory; +import org.apache.activemq.broker.region.DestinationFactoryImpl; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.broker.region.Region; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.broker.region.TopicRegion; +import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; @@ -568,19 +578,23 @@ public class ManagedRegionBroker extends RegionBroker { ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName()); TopicMessageStore store = adapter.createTopicMessageStore(topic); store.recover(new MessageRecoveryListener() { + @Override public boolean recoverMessage(Message message) throws Exception { result.add(message); return true; } + @Override public boolean recoverMessageReference(MessageId messageReference) throws Exception { throw new RuntimeException("Should not be called."); } + @Override public boolean hasSpace() { return true; } + @Override public boolean isDuplicate(MessageId id) { return false; } @@ -760,8 +774,7 @@ public class ManagedRegionBroker extends RegionBroker { private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{ String objectNameStr = this.brokerObjectName.toString(); - objectNameStr += "Service=SlowConsumerStrategy,InstanceName="+ JMXSupport.encodeObjectNamePart(strategy.getName()); - Hashtable map = brokerObjectName.getKeyPropertyList(); + objectNameStr += ",Service=SlowConsumerStrategy,InstanceName="+ JMXSupport.encodeObjectNamePart(strategy.getName()); ObjectName objectName = new ObjectName(objectNameStr); return objectName; } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java index 583fe83d6a..ad8a56bd2b 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/policy/AbortSlowConsumerTest.java @@ -16,12 +16,28 @@ */ package org.apache.activemq.broker.policy; +import java.lang.reflect.UndeclaredThrowableException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; import javax.management.InstanceNotFoundException; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + import junit.framework.Test; + import org.apache.activemq.JmsMultipleClientsTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean; -import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; @@ -30,20 +46,6 @@ import org.apache.activemq.util.MessageIdList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.Connection; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.management.ObjectName; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.TabularData; -import java.lang.reflect.UndeclaredThrowableException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; - public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport implements ExceptionListener { @@ -55,7 +57,7 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme public long checkPeriod = 2 * 1000; public long maxSlowDuration = 5 * 1000; - private List exceptions = new ArrayList(); + private final List exceptions = new ArrayList(); @Override protected void setUp() throws Exception { @@ -109,7 +111,6 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme allMessagesList.assertAtLeastMessagesReceived(10); } - public void initCombosForTestSlowConsumerIsAborted() { addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE}); addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE}); @@ -125,13 +126,10 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme startProducers(destination, 100); consumertoAbort.getValue().assertMessagesReceived(1); - TimeUnit.SECONDS.sleep(5); - consumertoAbort.getValue().assertAtMostMessagesReceived(1); } - public void testSlowConsumerIsAbortedViaJmx() throws Exception { underTest.setMaxSlowDuration(60*1000); // so jmx does the abort startConsumers(destination); @@ -145,11 +143,11 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme consumertoAbort.getValue().assertMessagesReceived(1); ActiveMQDestination amqDest = (ActiveMQDestination)destination; - ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:Type=" + - (amqDest.isTopic() ? "Topic" : "Queue") +",Destination=" - + amqDest.getPhysicalName() + ",BrokerName=localhost"); + ObjectName destinationViewMBean = new ObjectName("org.apache.activemq:destinationType=" + + (amqDest.isTopic() ? "Topic" : "Queue") +",destinationName=" + + amqDest.getPhysicalName() + ",type=Broker,brokerName=localhost"); - QueueViewMBean queue = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + DestinationViewMBean queue = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(destinationViewMBean, DestinationViewMBean.class, true); ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy(); assertNotNull(slowConsumerPolicyMBeanName); @@ -185,10 +183,8 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme assertTrue("correct exception: " + expected.getCause(), expected.getCause() instanceof InstanceNotFoundException); } - } - public void testOnlyOneSlowConsumerIsAborted() throws Exception { consumerCount = 10; startConsumers(destination); @@ -203,9 +199,7 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme allMessagesList.assertAtLeastMessagesReceived(99); consumertoAbort.getValue().assertMessagesReceived(1); - TimeUnit.SECONDS.sleep(5); - consumertoAbort.getValue().assertAtMostMessagesReceived(1); } @@ -276,6 +270,7 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme // socket proxy on pause, close could hang?? } + @Override public void onException(JMSException exception) { exceptions.add(exception); exception.printStackTrace();