mirror of https://github.com/apache/activemq.git
code fix and test fix for: https://issues.apache.org/jira/browse/AMQ-4225
AbortSlowConsumerStrategy was broken due to a malformed ObjectName being created when the strategy was registered on-demand. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1428051 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7a95e1809b
commit
e60087608e
|
@ -19,7 +19,6 @@ package org.apache.activemq.broker.jmx;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Hashtable;
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -40,12 +39,23 @@ import javax.management.openmbean.OpenDataException;
|
||||||
import javax.management.openmbean.TabularData;
|
import javax.management.openmbean.TabularData;
|
||||||
import javax.management.openmbean.TabularDataSupport;
|
import javax.management.openmbean.TabularDataSupport;
|
||||||
import javax.management.openmbean.TabularType;
|
import javax.management.openmbean.TabularType;
|
||||||
|
|
||||||
import org.apache.activemq.broker.Broker;
|
import org.apache.activemq.broker.Broker;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.ConnectionContext;
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
import org.apache.activemq.broker.ProducerBrokerExchange;
|
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||||
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
|
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
|
||||||
import org.apache.activemq.broker.region.*;
|
import org.apache.activemq.broker.region.Destination;
|
||||||
|
import org.apache.activemq.broker.region.DestinationFactory;
|
||||||
|
import org.apache.activemq.broker.region.DestinationFactoryImpl;
|
||||||
|
import org.apache.activemq.broker.region.DestinationInterceptor;
|
||||||
|
import org.apache.activemq.broker.region.Queue;
|
||||||
|
import org.apache.activemq.broker.region.Region;
|
||||||
|
import org.apache.activemq.broker.region.RegionBroker;
|
||||||
|
import org.apache.activemq.broker.region.Subscription;
|
||||||
|
import org.apache.activemq.broker.region.Topic;
|
||||||
|
import org.apache.activemq.broker.region.TopicRegion;
|
||||||
|
import org.apache.activemq.broker.region.TopicSubscription;
|
||||||
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
|
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQMessage;
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
|
@ -568,19 +578,23 @@ public class ManagedRegionBroker extends RegionBroker {
|
||||||
ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
|
ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
|
||||||
TopicMessageStore store = adapter.createTopicMessageStore(topic);
|
TopicMessageStore store = adapter.createTopicMessageStore(topic);
|
||||||
store.recover(new MessageRecoveryListener() {
|
store.recover(new MessageRecoveryListener() {
|
||||||
|
@Override
|
||||||
public boolean recoverMessage(Message message) throws Exception {
|
public boolean recoverMessage(Message message) throws Exception {
|
||||||
result.add(message);
|
result.add(message);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
|
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
|
||||||
throw new RuntimeException("Should not be called.");
|
throw new RuntimeException("Should not be called.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean hasSpace() {
|
public boolean hasSpace() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isDuplicate(MessageId id) {
|
public boolean isDuplicate(MessageId id) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -760,8 +774,7 @@ public class ManagedRegionBroker extends RegionBroker {
|
||||||
|
|
||||||
private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{
|
private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{
|
||||||
String objectNameStr = this.brokerObjectName.toString();
|
String objectNameStr = this.brokerObjectName.toString();
|
||||||
objectNameStr += "Service=SlowConsumerStrategy,InstanceName="+ JMXSupport.encodeObjectNamePart(strategy.getName());
|
objectNameStr += ",Service=SlowConsumerStrategy,InstanceName="+ JMXSupport.encodeObjectNamePart(strategy.getName());
|
||||||
Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
|
|
||||||
ObjectName objectName = new ObjectName(objectNameStr);
|
ObjectName objectName = new ObjectName(objectNameStr);
|
||||||
return objectName;
|
return objectName;
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,12 +16,28 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.broker.policy;
|
package org.apache.activemq.broker.policy;
|
||||||
|
|
||||||
|
import java.lang.reflect.UndeclaredThrowableException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.ExceptionListener;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.Session;
|
||||||
import javax.management.InstanceNotFoundException;
|
import javax.management.InstanceNotFoundException;
|
||||||
|
import javax.management.ObjectName;
|
||||||
|
import javax.management.openmbean.CompositeData;
|
||||||
|
import javax.management.openmbean.TabularData;
|
||||||
|
|
||||||
import junit.framework.Test;
|
import junit.framework.Test;
|
||||||
|
|
||||||
import org.apache.activemq.JmsMultipleClientsTestSupport;
|
import org.apache.activemq.JmsMultipleClientsTestSupport;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
|
import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
|
||||||
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
import org.apache.activemq.broker.jmx.DestinationViewMBean;
|
||||||
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
|
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
|
@ -30,20 +46,6 @@ import org.apache.activemq.util.MessageIdList;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
|
||||||
import javax.jms.ExceptionListener;
|
|
||||||
import javax.jms.JMSException;
|
|
||||||
import javax.jms.MessageConsumer;
|
|
||||||
import javax.jms.Session;
|
|
||||||
import javax.management.ObjectName;
|
|
||||||
import javax.management.openmbean.CompositeData;
|
|
||||||
import javax.management.openmbean.TabularData;
|
|
||||||
import java.lang.reflect.UndeclaredThrowableException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
|
|
||||||
public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport implements ExceptionListener {
|
public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport implements ExceptionListener {
|
||||||
|
|
||||||
|
@ -55,7 +57,7 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
|
||||||
public long checkPeriod = 2 * 1000;
|
public long checkPeriod = 2 * 1000;
|
||||||
public long maxSlowDuration = 5 * 1000;
|
public long maxSlowDuration = 5 * 1000;
|
||||||
|
|
||||||
private List<Throwable> exceptions = new ArrayList<Throwable>();
|
private final List<Throwable> exceptions = new ArrayList<Throwable>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void setUp() throws Exception {
|
protected void setUp() throws Exception {
|
||||||
|
@ -109,7 +111,6 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
|
||||||
allMessagesList.assertAtLeastMessagesReceived(10);
|
allMessagesList.assertAtLeastMessagesReceived(10);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void initCombosForTestSlowConsumerIsAborted() {
|
public void initCombosForTestSlowConsumerIsAborted() {
|
||||||
addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
|
addCombinationValues("abortConnection", new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||||
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
|
addCombinationValues("topic", new Object[]{Boolean.TRUE, Boolean.FALSE});
|
||||||
|
@ -125,13 +126,10 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
|
||||||
startProducers(destination, 100);
|
startProducers(destination, 100);
|
||||||
|
|
||||||
consumertoAbort.getValue().assertMessagesReceived(1);
|
consumertoAbort.getValue().assertMessagesReceived(1);
|
||||||
|
|
||||||
TimeUnit.SECONDS.sleep(5);
|
TimeUnit.SECONDS.sleep(5);
|
||||||
|
|
||||||
consumertoAbort.getValue().assertAtMostMessagesReceived(1);
|
consumertoAbort.getValue().assertAtMostMessagesReceived(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void testSlowConsumerIsAbortedViaJmx() throws Exception {
|
public void testSlowConsumerIsAbortedViaJmx() throws Exception {
|
||||||
underTest.setMaxSlowDuration(60*1000); // so jmx does the abort
|
underTest.setMaxSlowDuration(60*1000); // so jmx does the abort
|
||||||
startConsumers(destination);
|
startConsumers(destination);
|
||||||
|
@ -145,11 +143,11 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
|
||||||
consumertoAbort.getValue().assertMessagesReceived(1);
|
consumertoAbort.getValue().assertMessagesReceived(1);
|
||||||
|
|
||||||
ActiveMQDestination amqDest = (ActiveMQDestination)destination;
|
ActiveMQDestination amqDest = (ActiveMQDestination)destination;
|
||||||
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:Type=" +
|
ObjectName destinationViewMBean = new ObjectName("org.apache.activemq:destinationType=" +
|
||||||
(amqDest.isTopic() ? "Topic" : "Queue") +",Destination="
|
(amqDest.isTopic() ? "Topic" : "Queue") +",destinationName="
|
||||||
+ amqDest.getPhysicalName() + ",BrokerName=localhost");
|
+ amqDest.getPhysicalName() + ",type=Broker,brokerName=localhost");
|
||||||
|
|
||||||
QueueViewMBean queue = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
|
DestinationViewMBean queue = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(destinationViewMBean, DestinationViewMBean.class, true);
|
||||||
ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy();
|
ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy();
|
||||||
|
|
||||||
assertNotNull(slowConsumerPolicyMBeanName);
|
assertNotNull(slowConsumerPolicyMBeanName);
|
||||||
|
@ -185,10 +183,8 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
|
||||||
assertTrue("correct exception: " + expected.getCause(),
|
assertTrue("correct exception: " + expected.getCause(),
|
||||||
expected.getCause() instanceof InstanceNotFoundException);
|
expected.getCause() instanceof InstanceNotFoundException);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void testOnlyOneSlowConsumerIsAborted() throws Exception {
|
public void testOnlyOneSlowConsumerIsAborted() throws Exception {
|
||||||
consumerCount = 10;
|
consumerCount = 10;
|
||||||
startConsumers(destination);
|
startConsumers(destination);
|
||||||
|
@ -203,9 +199,7 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
|
||||||
allMessagesList.assertAtLeastMessagesReceived(99);
|
allMessagesList.assertAtLeastMessagesReceived(99);
|
||||||
|
|
||||||
consumertoAbort.getValue().assertMessagesReceived(1);
|
consumertoAbort.getValue().assertMessagesReceived(1);
|
||||||
|
|
||||||
TimeUnit.SECONDS.sleep(5);
|
TimeUnit.SECONDS.sleep(5);
|
||||||
|
|
||||||
consumertoAbort.getValue().assertAtMostMessagesReceived(1);
|
consumertoAbort.getValue().assertAtMostMessagesReceived(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -276,6 +270,7 @@ public class AbortSlowConsumerTest extends JmsMultipleClientsTestSupport impleme
|
||||||
// socket proxy on pause, close could hang??
|
// socket proxy on pause, close could hang??
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void onException(JMSException exception) {
|
public void onException(JMSException exception) {
|
||||||
exceptions.add(exception);
|
exceptions.add(exception);
|
||||||
exception.printStackTrace();
|
exception.printStackTrace();
|
||||||
|
|
Loading…
Reference in New Issue