remove dependency on jmx and revert to direct calls on destinations to get stats and view on counters - resolve intermittent failures on hudson with a bit of luck as jmx introduces lots of timing issues

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@905582 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-02-02 11:22:28 +00:00
parent 62daac4b31
commit 59653a50f8
1 changed files with 67 additions and 79 deletions

View File

@ -26,15 +26,14 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener; import javax.jms.MessageListener;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import javax.management.InstanceNotFoundException;
import javax.management.ObjectName;
import junit.framework.Test; import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
@ -125,61 +124,61 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
producingThread.join(); producingThread.join();
session.close(); session.close();
final DestinationViewMBean view = createView(destination); final DestinationStatistics view = this.getDestinationStatistics(destination);
// wait for all to inflight to expire // wait for all to inflight to expire
assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() { assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return view.getInFlightCount() == 0; return view.getInflight().getCount() == 0;
} }
})); }));
assertEquals("Wrong inFlightCount: ", 0, view.getInFlightCount()); assertEquals("Wrong inFlightCount: ", 0, view.getInflight().getCount());
LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount() LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount()
+ ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount()); + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount());
// wait for all sent to get delivered and expire // wait for all sent to get delivered and expire
assertTrue("all sent messages expired ", Wait.waitFor(new Wait.Condition() { assertTrue("all sent messages expired ", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
long oldEnqueues = view.getEnqueueCount(); long oldEnqueues = view.getEnqueues().getCount();
Thread.sleep(200); Thread.sleep(200);
LOG.info("Stats: received: " + received.get() + ", size= " + view.getQueueSize() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount() LOG.info("Stats: received: " + received.get() + ", size= " + view.getMessages().getCount() + ", enqueues: " + view.getDequeues().getCount() + ", dequeues: " + view.getDequeues().getCount()
+ ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount()); + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount());
return oldEnqueues == view.getEnqueueCount(); return oldEnqueues == view.getEnqueues().getCount();
} }
}, 60*1000)); }, 60*1000));
LOG.info("Stats: received: " + received.get() + ", size= " + view.getQueueSize() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount() LOG.info("Stats: received: " + received.get() + ", size= " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount()
+ ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount()); + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount());
assertTrue("got at least what did not expire", received.get() >= view.getDequeueCount() - view.getExpiredCount()); assertTrue("got at least what did not expire", received.get() >= view.getDequeues().getCount() - view.getExpired().getCount());
assertTrue("all messages expired - queue size gone to zero " + view.getQueueSize(), Wait.waitFor(new Wait.Condition() { assertTrue("all messages expired - queue size gone to zero " + view.getMessages().getCount(), Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
LOG.info("Stats: received: " + received.get() + ", size= " + view.getQueueSize() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount() LOG.info("Stats: received: " + received.get() + ", size= " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount()
+ ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount()); + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount());
return view.getQueueSize() == 0; return view.getMessages().getCount() == 0;
} }
})); }));
final long expiredBeforeEnqueue = numMessagesToSend - view.getEnqueueCount(); final long expiredBeforeEnqueue = numMessagesToSend - view.getEnqueues().getCount();
final long totalExpiredCount = view.getExpiredCount() + expiredBeforeEnqueue; final long totalExpiredCount = view.getExpired().getCount() + expiredBeforeEnqueue;
final DestinationViewMBean dlqView = createView(dlqDestination); final DestinationStatistics dlqView = getDestinationStatistics(dlqDestination);
LOG.info("DLQ stats: size= " + dlqView.getQueueSize() + ", enqueues: " + dlqView.getDequeueCount() + ", dequeues: " + dlqView.getDequeueCount() LOG.info("DLQ stats: size= " + dlqView.getMessages().getCount() + ", enqueues: " + dlqView.getDequeues().getCount() + ", dequeues: " + dlqView.getDequeues().getCount()
+ ", dispatched: " + dlqView.getDispatchCount() + ", inflight: " + dlqView.getInFlightCount() + ", expiries: " + dlqView.getExpiredCount()); + ", dispatched: " + dlqView.getDispatched().getCount() + ", inflight: " + dlqView.getInflight().getCount() + ", expiries: " + dlqView.getExpired().getCount());
Wait.waitFor(new Wait.Condition() { Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return totalExpiredCount == dlqView.getQueueSize(); return totalExpiredCount == dlqView.getMessages().getCount();
} }
}); });
assertEquals("dlq contains all expired", totalExpiredCount, dlqView.getQueueSize()); assertEquals("dlq contains all expired", totalExpiredCount, dlqView.getMessages().getCount());
// memory check // memory check
assertEquals("memory usage is back to duck egg", 0, view.getMemoryPercentUsage()); assertEquals("memory usage is back to duck egg", 0, this.getDestination(destination).getMemoryUsage().getPercentUsage());
assertTrue("memory usage is increased ", 0 < dlqView.getMemoryPercentUsage()); assertTrue("memory usage is increased ", 0 < this.getDestination(dlqDestination).getMemoryUsage().getPercentUsage());
// verify DLQ // verify DLQ
MessageConsumer dlqConsumer = createDlqConsumer(connection); MessageConsumer dlqConsumer = createDlqConsumer(connection);
@ -244,13 +243,13 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
producingThread.start(); producingThread.start();
producingThread.join(); producingThread.join();
DestinationViewMBean view = createView(destination); DestinationStatistics view = getDestinationStatistics(destination);
LOG.info("Stats: size: " + view.getQueueSize() + ", enqueues: " LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: "
+ view.getDequeueCount() + ", dequeues: " + view.getEnqueues().getCount() + ", dequeues: "
+ view.getDequeueCount() + ", dispatched: " + view.getDequeues().getCount() + ", dispatched: "
+ view.getDispatchCount() + ", inflight: " + view.getDispatched().getCount() + ", inflight: "
+ view.getInFlightCount() + ", expiries: " + view.getInflight().getCount() + ", expiries: "
+ view.getExpiredCount()); + view.getExpired().getCount());
LOG.info("stopping broker"); LOG.info("stopping broker");
broker.stop(); broker.stop();
@ -264,26 +263,21 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
Wait.waitFor(new Wait.Condition() { Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
boolean result = false; DestinationStatistics view = getDestinationStatistics(destination);
try { LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: "
DestinationViewMBean view = createView(destination); + view.getEnqueues().getCount() + ", dequeues: "
LOG.info("Stats: size: " + view.getQueueSize() + ", enqueues: " + view.getDequeues().getCount() + ", dispatched: "
+ view.getDequeueCount() + ", dequeues: " + view.getDispatched().getCount() + ", inflight: "
+ view.getDequeueCount() + ", dispatched: " + view.getInflight().getCount() + ", expiries: "
+ view.getDispatchCount() + ", inflight: " + view.getExpired().getCount());
+ view.getInFlightCount() + ", expiries: "
+ view.getExpiredCount()); return view.getMessages().getCount() == 0;
result = view.getQueueSize() == 0;
} catch (Exception notFoundExpectedOnSlowMachines) {
}
return result;
} }
}); });
view = createView(destination); view = getDestinationStatistics(destination);
assertEquals("Expect empty queue, QueueSize: ", 0, view.getQueueSize()); assertEquals("Expect empty queue, QueueSize: ", 0, view.getMessages().getCount());
assertEquals("all dequeues were expired", view.getDequeueCount(), view.getExpiredCount()); assertEquals("all dequeues were expired", view.getDequeues().getCount(), view.getExpired().getCount());
} }
private BrokerService createBroker(boolean deleteAllMessages, long expireMessagesPeriod) throws Exception { private BrokerService createBroker(boolean deleteAllMessages, long expireMessagesPeriod) throws Exception {
@ -309,33 +303,27 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
broker.start(); broker.start();
broker.waitUntilStarted(); broker.waitUntilStarted();
return broker; return broker;
} }
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { private DestinationStatistics getDestinationStatistics(ActiveMQDestination destination) {
String domain = "org.apache.activemq"; DestinationStatistics result = null;
ObjectName name; org.apache.activemq.broker.region.Destination dest = getDestination(destination);
if (destination.isQueue()) { if (dest != null) {
name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=" result = dest.getDestinationStatistics();
+ destination.getPhysicalName());
} else {
name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination="
+ destination.getPhysicalName());
} }
final DestinationViewMBean view = (DestinationViewMBean) return result;
broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true); }
assertTrue("validation: Mbean view for " + destination + " exists", Wait.waitFor(new Wait.Condition() { private org.apache.activemq.broker.region.Destination getDestination(ActiveMQDestination destination) {
public boolean isSatisified() throws Exception { org.apache.activemq.broker.region.Destination result = null;
boolean mbeanExists = false; RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
try { for (org.apache.activemq.broker.region.Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) {
view.getConsumerCount(); if (dest.getName().equals(destination.getPhysicalName())) {
mbeanExists = true; result = dest;
} catch (Exception notFoundExpectedOnSlowMachines) { break;
} }
return mbeanExists; }
} return result;
}));
return view;
} }
protected void tearDown() throws Exception { protected void tearDown() throws Exception {