diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java index 26ba271ef8..89abafd191 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java @@ -26,15 +26,14 @@ import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; -import javax.management.InstanceNotFoundException; -import javax.management.ObjectName; import junit.framework.Test; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; @@ -125,61 +124,61 @@ public class ExpiredMessagesTest extends CombinationTestSupport { producingThread.join(); session.close(); - final DestinationViewMBean view = createView(destination); - + final DestinationStatistics view = this.getDestinationStatistics(destination); + // wait for all to inflight to expire assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { - return view.getInFlightCount() == 0; + return view.getInflight().getCount() == 0; } })); - assertEquals("Wrong inFlightCount: ", 0, view.getInFlightCount()); + assertEquals("Wrong inFlightCount: ", 0, view.getInflight().getCount()); - LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount() - + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount()); + LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount() + + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount()); // wait for all sent to get delivered and expire assertTrue("all sent messages expired ", Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { - long oldEnqueues = view.getEnqueueCount(); + long oldEnqueues = view.getEnqueues().getCount(); Thread.sleep(200); - LOG.info("Stats: received: " + received.get() + ", size= " + view.getQueueSize() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount() - + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount()); - return oldEnqueues == view.getEnqueueCount(); + LOG.info("Stats: received: " + received.get() + ", size= " + view.getMessages().getCount() + ", enqueues: " + view.getDequeues().getCount() + ", dequeues: " + view.getDequeues().getCount() + + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount()); + return oldEnqueues == view.getEnqueues().getCount(); } }, 60*1000)); - LOG.info("Stats: received: " + received.get() + ", size= " + view.getQueueSize() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount() - + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount()); + LOG.info("Stats: received: " + received.get() + ", size= " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount() + + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount()); - assertTrue("got at least what did not expire", received.get() >= view.getDequeueCount() - view.getExpiredCount()); + assertTrue("got at least what did not expire", received.get() >= view.getDequeues().getCount() - view.getExpired().getCount()); - assertTrue("all messages expired - queue size gone to zero " + view.getQueueSize(), Wait.waitFor(new Wait.Condition() { + assertTrue("all messages expired - queue size gone to zero " + view.getMessages().getCount(), Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { - LOG.info("Stats: received: " + received.get() + ", size= " + view.getQueueSize() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount() - + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount()); - return view.getQueueSize() == 0; + LOG.info("Stats: received: " + received.get() + ", size= " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount() + + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expiries: " + view.getExpired().getCount()); + return view.getMessages().getCount() == 0; } })); - final long expiredBeforeEnqueue = numMessagesToSend - view.getEnqueueCount(); - final long totalExpiredCount = view.getExpiredCount() + expiredBeforeEnqueue; + final long expiredBeforeEnqueue = numMessagesToSend - view.getEnqueues().getCount(); + final long totalExpiredCount = view.getExpired().getCount() + expiredBeforeEnqueue; - final DestinationViewMBean dlqView = createView(dlqDestination); - LOG.info("DLQ stats: size= " + dlqView.getQueueSize() + ", enqueues: " + dlqView.getDequeueCount() + ", dequeues: " + dlqView.getDequeueCount() - + ", dispatched: " + dlqView.getDispatchCount() + ", inflight: " + dlqView.getInFlightCount() + ", expiries: " + dlqView.getExpiredCount()); + final DestinationStatistics dlqView = getDestinationStatistics(dlqDestination); + LOG.info("DLQ stats: size= " + dlqView.getMessages().getCount() + ", enqueues: " + dlqView.getDequeues().getCount() + ", dequeues: " + dlqView.getDequeues().getCount() + + ", dispatched: " + dlqView.getDispatched().getCount() + ", inflight: " + dlqView.getInflight().getCount() + ", expiries: " + dlqView.getExpired().getCount()); Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { - return totalExpiredCount == dlqView.getQueueSize(); + return totalExpiredCount == dlqView.getMessages().getCount(); } }); - assertEquals("dlq contains all expired", totalExpiredCount, dlqView.getQueueSize()); + assertEquals("dlq contains all expired", totalExpiredCount, dlqView.getMessages().getCount()); // memory check - assertEquals("memory usage is back to duck egg", 0, view.getMemoryPercentUsage()); - assertTrue("memory usage is increased ", 0 < dlqView.getMemoryPercentUsage()); + assertEquals("memory usage is back to duck egg", 0, this.getDestination(destination).getMemoryUsage().getPercentUsage()); + assertTrue("memory usage is increased ", 0 < this.getDestination(dlqDestination).getMemoryUsage().getPercentUsage()); // verify DLQ MessageConsumer dlqConsumer = createDlqConsumer(connection); @@ -244,13 +243,13 @@ public class ExpiredMessagesTest extends CombinationTestSupport { producingThread.start(); producingThread.join(); - DestinationViewMBean view = createView(destination); - LOG.info("Stats: size: " + view.getQueueSize() + ", enqueues: " - + view.getDequeueCount() + ", dequeues: " - + view.getDequeueCount() + ", dispatched: " - + view.getDispatchCount() + ", inflight: " - + view.getInFlightCount() + ", expiries: " - + view.getExpiredCount()); + DestinationStatistics view = getDestinationStatistics(destination); + LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: " + + view.getEnqueues().getCount() + ", dequeues: " + + view.getDequeues().getCount() + ", dispatched: " + + view.getDispatched().getCount() + ", inflight: " + + view.getInflight().getCount() + ", expiries: " + + view.getExpired().getCount()); LOG.info("stopping broker"); broker.stop(); @@ -264,26 +263,21 @@ public class ExpiredMessagesTest extends CombinationTestSupport { Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { - boolean result = false; - try { - DestinationViewMBean view = createView(destination); - LOG.info("Stats: size: " + view.getQueueSize() + ", enqueues: " - + view.getDequeueCount() + ", dequeues: " - + view.getDequeueCount() + ", dispatched: " - + view.getDispatchCount() + ", inflight: " - + view.getInFlightCount() + ", expiries: " - + view.getExpiredCount()); - - result = view.getQueueSize() == 0; - } catch (Exception notFoundExpectedOnSlowMachines) { - } - return result; + DestinationStatistics view = getDestinationStatistics(destination); + LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: " + + view.getEnqueues().getCount() + ", dequeues: " + + view.getDequeues().getCount() + ", dispatched: " + + view.getDispatched().getCount() + ", inflight: " + + view.getInflight().getCount() + ", expiries: " + + view.getExpired().getCount()); + + return view.getMessages().getCount() == 0; } }); - view = createView(destination); - assertEquals("Expect empty queue, QueueSize: ", 0, view.getQueueSize()); - assertEquals("all dequeues were expired", view.getDequeueCount(), view.getExpiredCount()); + view = getDestinationStatistics(destination); + assertEquals("Expect empty queue, QueueSize: ", 0, view.getMessages().getCount()); + assertEquals("all dequeues were expired", view.getDequeues().getCount(), view.getExpired().getCount()); } private BrokerService createBroker(boolean deleteAllMessages, long expireMessagesPeriod) throws Exception { @@ -308,20 +302,27 @@ public class ExpiredMessagesTest extends CombinationTestSupport { broker.start(); broker.waitUntilStarted(); return broker; - } - - protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { - String domain = "org.apache.activemq"; - ObjectName name; - if (destination.isQueue()) { - name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=" - + destination.getPhysicalName()); - } else { - name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=" - + destination.getPhysicalName()); + } + + private DestinationStatistics getDestinationStatistics(ActiveMQDestination destination) { + DestinationStatistics result = null; + org.apache.activemq.broker.region.Destination dest = getDestination(destination); + if (dest != null) { + result = dest.getDestinationStatistics(); } - return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, - true); + return result; + } + + private org.apache.activemq.broker.region.Destination getDestination(ActiveMQDestination destination) { + org.apache.activemq.broker.region.Destination result = null; + RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); + for (org.apache.activemq.broker.region.Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) { + if (dest.getName().equals(destination.getPhysicalName())) { + result = dest; + break; + } + } + return result; } protected void tearDown() throws Exception {