Call to the statistics instance of the subscription to reset the
counters.
(cherry picked from commit 564d55023e)
This commit is contained in:
Timothy Bish 2015-12-21 17:03:24 -05:00
parent 7dd2330011
commit 871f0a6005
2 changed files with 107 additions and 18 deletions

View File

@ -420,8 +420,8 @@ public class SubscriptionView implements SubscriptionViewMBean {
@Override @Override
public void resetStatistics() { public void resetStatistics() {
if (subscription != null){ if (subscription != null && subscription.getSubscriptionStatistics() != null){
subscription.resetConsumedCount(); subscription.getSubscriptionStatistics().reset();
} }
} }

View File

@ -26,7 +26,18 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.*; import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler; import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException; import javax.management.MalformedObjectNameException;
@ -51,11 +62,9 @@ import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue; import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.memory.list.MessageList;
import org.apache.activemq.util.JMXSupport; import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.URISupport; import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.junit.Ignore;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -65,6 +74,7 @@ import org.slf4j.LoggerFactory;
* command line application. * command line application.
*/ */
public class MBeanTest extends EmbeddedBrokerTestSupport { public class MBeanTest extends EmbeddedBrokerTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(MBeanTest.class); private static final Logger LOG = LoggerFactory.getLogger(MBeanTest.class);
private static boolean waitForKeyPress; private static boolean waitForKeyPress;
@ -171,7 +181,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
} }
//Show broken behaviour https://issues.apache.org/jira/browse/AMQ-5752" //Show broken behaviour https://issues.apache.org/jira/browse/AMQ-5752"
// points to the need to except on a duplicate or have store.addMessage return bool // points to the need to except on a duplicate or have store.addMessage return boolean
// need some thought on how best to resolve this // need some thought on how best to resolve this
public void Broken_testMoveDuplicateDoesNotDelete() throws Exception { public void Broken_testMoveDuplicateDoesNotDelete() throws Exception {
connection = connectionFactory.createConnection(); connection = connectionFactory.createConnection();
@ -1437,7 +1447,8 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
String messageID = (String) cdata.get("JMSMessageID"); String messageID = (String) cdata.get("JMSMessageID");
assertNotNull("Should have a message ID for message " + i, messageID); assertNotNull("Should have a message ID for message " + i, messageID);
Map intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES); @SuppressWarnings("unchecked")
Map<Object, Object> intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES);
assertTrue("not empty", intProperties.size() > 0); assertTrue("not empty", intProperties.size() > 0);
assertEquals("counter in order", i, intProperties.get("counter")); assertEquals("counter in order", i, intProperties.get("counter"));
} }
@ -1464,7 +1475,8 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
for (int i = 0; i < messageCount - 4; i++) { for (int i = 0; i < messageCount - 4; i++) {
CompositeData cdata = compdatalist[i]; CompositeData cdata = compdatalist[i];
Map intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES); @SuppressWarnings("unchecked")
Map<Object, Object> intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES);
assertTrue("not empty", intProperties.size() > 0); assertTrue("not empty", intProperties.size() > 0);
assertEquals("counter in order", i + 5, intProperties.get("counter")); assertEquals("counter in order", i + 5, intProperties.get("counter"));
echo("Got: " + intProperties.get("counter")); echo("Got: " + intProperties.get("counter"));
@ -1476,7 +1488,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost"); ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");
BrokerViewMBean brokerView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); BrokerViewMBean brokerView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
Map connectors = brokerView.getTransportConnectors(); Map<String, String> connectors = brokerView.getTransportConnectors();
LOG.info("Connectors: " + connectors); LOG.info("Connectors: " + connectors);
assertEquals("one connector", 1, connectors.size()); assertEquals("one connector", 1, connectors.size());
@ -1656,13 +1668,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
ObjectName topicObjName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test.topic"); ObjectName topicObjName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test.topic");
final TopicViewMBean topicView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName, TopicViewMBean.class, true); final TopicViewMBean topicView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName, TopicViewMBean.class, true);
ArrayList<SubscriptionViewMBean> subscriberViews = new ArrayList<SubscriptionViewMBean>();
assertEquals(1, topicView.getEnqueueCount());
assertEquals(4, topicView.getDispatchCount());
assertEquals(4, topicView.getInFlightCount());
assertEquals(0, topicView.getDequeueCount());
ArrayList<SubscriptionViewMBean> subscriberViews = new ArrayList();
for (ObjectName name : topicView.getSubscriptions()) { for (ObjectName name : topicView.getSubscriptions()) {
subscriberViews.add(MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, SubscriptionViewMBean.class, true)); subscriberViews.add(MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, SubscriptionViewMBean.class, true));
} }
@ -1688,6 +1694,91 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
} }
}); });
for (SubscriptionViewMBean subscriberView : subscriberViews) {
assertEquals(1, subscriberView.getEnqueueCounter());
assertEquals(1, subscriberView.getDispatchedCounter());
assertEquals(1, subscriberView.getDequeueCounter());
}
for (SubscriptionViewMBean subscriberView : subscriberViews) {
subscriberView.resetStatistics();
}
for (SubscriptionViewMBean subscriberView : subscriberViews) {
assertEquals(0, subscriberView.getEnqueueCounter());
assertEquals(0, subscriberView.getDispatchedCounter());
assertEquals(0, subscriberView.getDequeueCounter());
}
}
public void testSubscriptionView() throws Exception {
connection = connectionFactory.createConnection();
connection.setClientID("test");
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
connection.start();
Topic singleTopic = session.createTopic("test.topic");
Topic wildcardTopic = session.createTopic("test.>");
TopicSubscriber durable1 = session.createDurableSubscriber(singleTopic, "single");
TopicSubscriber durable2 = session.createDurableSubscriber(wildcardTopic, "wildcard");
MessageConsumer consumer1 = session.createConsumer(singleTopic);
MessageConsumer consumer2 = session.createConsumer(wildcardTopic);
final ArrayList<Message> messages = new ArrayList<>();
MessageListener listener = new MessageListener() {
@Override
public void onMessage(Message message) {
messages.add(message);
}
};
durable1.setMessageListener(listener);
durable2.setMessageListener(listener);
consumer1.setMessageListener(listener);
consumer2.setMessageListener(listener);
MessageProducer producer = session.createProducer(singleTopic);
producer.send(session.createTextMessage("test"));
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return messages.size() == 4;
}
});
ObjectName topicObjName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test.topic");
final TopicViewMBean topicView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName, TopicViewMBean.class, true);
ArrayList<SubscriptionViewMBean> subscriberViews = new ArrayList<SubscriptionViewMBean>();
for (ObjectName name : topicView.getSubscriptions()) {
subscriberViews.add(MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, SubscriptionViewMBean.class, true));
}
assertEquals(4, subscriberViews.size());
for (SubscriptionViewMBean subscriberView : subscriberViews) {
assertEquals(1, subscriberView.getEnqueueCounter());
assertEquals(1, subscriberView.getDispatchedCounter());
assertEquals(0, subscriberView.getDequeueCounter());
}
for (Message message : messages) {
try {
message.acknowledge();
} catch (JMSException ignore) {}
}
// Wait so that each subscription gets updated
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return topicView.getDequeueCount() == 4;
}
});
assertEquals(1, topicView.getEnqueueCount()); assertEquals(1, topicView.getEnqueueCount());
assertEquals(4, topicView.getDispatchCount()); assertEquals(4, topicView.getDispatchCount());
assertEquals(0, topicView.getInFlightCount()); assertEquals(0, topicView.getInFlightCount());
@ -1698,7 +1789,5 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertEquals(1, subscriberView.getDispatchedCounter()); assertEquals(1, subscriberView.getDispatchedCounter());
assertEquals(1, subscriberView.getDequeueCounter()); assertEquals(1, subscriberView.getDequeueCounter());
} }
} }
} }