From dbe847e0dade437ba678299f62e899f61b95a3e9 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Thu, 9 Jul 2009 16:26:42 +0000 Subject: [PATCH] resolve https://issues.apache.org/activemq/browse/AMQ-2322 - test and correction git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@792598 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/region/Queue.java | 4 +- .../usecases/ExpiredMessagesTest.java | 131 +++++++++++++++--- .../ExpiredMessagesWithNoConsumerTest.java | 2 +- 3 files changed, 116 insertions(+), 21 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 78d538cb87..bcb1867f39 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -205,7 +205,9 @@ public class Queue extends BaseDestination implements Task, UsageListener { // Message could have expired while it was being // loaded.. if (broker.isExpired(message)) { - messageExpired(createConnectionContext(), message); + messageExpired(createConnectionContext(), createMessageReference(message)); + // drop message will decrement so counter balance here + destinationStatistics.getMessages().increment(); return true; } if (hasSpace()) { diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java index 2e981c8b44..396aaa22da 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java @@ -16,9 +16,12 @@ */ package org.apache.activemq.usecases; +import java.io.File; import java.util.concurrent.atomic.AtomicLong; import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; @@ -34,8 +37,10 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.amq.AMQPersistenceAdapter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,7 +54,9 @@ public class ExpiredMessagesTest extends CombinationTestSupport { MessageProducer producer; MessageConsumer consumer; public ActiveMQDestination destination = new ActiveMQQueue("test"); - + public boolean useTextMessage = true; + public boolean useVMCursor = true; + public static Test suite() { return suite(ExpiredMessagesTest.class); } @@ -59,21 +66,8 @@ public class ExpiredMessagesTest extends CombinationTestSupport { } protected void setUp() throws Exception { - broker = new BrokerService(); - broker.setBrokerName("localhost"); - broker.setDataDirectory("data/"); - broker.setUseJmx(true); - broker.deleteAllMessages(); - - PolicyEntry defaultPolicy = new PolicyEntry(); - defaultPolicy.setExpireMessagesPeriod(100); - PolicyMap policyMap = new PolicyMap(); - policyMap.setDefaultEntry(defaultPolicy); - broker.setDestinationPolicy(policyMap); - - broker.addConnector("tcp://localhost:61616"); - broker.start(); - broker.waitUntilStarted(); + final boolean deleteAllMessages = true; + broker = createBroker(deleteAllMessages, 100); } public void testExpiredMessages() throws Exception { @@ -129,8 +123,8 @@ public class ExpiredMessagesTest extends CombinationTestSupport { producingThread.join(); session.close(); - Thread.sleep(5000); - + Thread.sleep(2000); + DestinationViewMBean view = createView(destination); LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount() + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount()); @@ -145,8 +139,107 @@ public class ExpiredMessagesTest extends CombinationTestSupport { + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount()); assertEquals("Wrong inFlightCount: ", 0, view.getInFlightCount()); } + - protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { + public void initCombosForTestRecoverExpiredMessages() { + addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE, Boolean.FALSE}); + } + + public void testRecoverExpiredMessages() throws Exception { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + "failover://tcp://localhost:61616"); + connection = factory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(destination); + producer.setTimeToLive(2000); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + Thread producingThread = new Thread("Producing Thread") { + public void run() { + try { + int i = 0; + while (i++ < 1000) { + Message message = useTextMessage ? session + .createTextMessage("test") : session + .createObjectMessage("test"); + producer.send(message); + } + producer.close(); + } catch (Throwable ex) { + ex.printStackTrace(); + } + } + }; + + producingThread.start(); + producingThread.join(); + + DestinationViewMBean view = createView(destination); + LOG.info("Stats: size: " + view.getQueueSize() + ", enqueues: " + + view.getDequeueCount() + ", dequeues: " + + view.getDequeueCount() + ", dispatched: " + + view.getDispatchCount() + ", inflight: " + + view.getInFlightCount() + ", expiries: " + + view.getExpiredCount()); + + LOG.info("stopping broker"); + broker.stop(); + broker.waitUntilStopped(); + + Thread.sleep(5000); + + LOG.info("recovering broker"); + final boolean deleteAllMessages = false; + broker = createBroker(deleteAllMessages, 5000); + + view = createView(destination); + LOG.info("Stats: size: " + view.getQueueSize() + ", enqueues: " + + view.getDequeueCount() + ", dequeues: " + + view.getDequeueCount() + ", dispatched: " + + view.getDispatchCount() + ", inflight: " + + view.getInFlightCount() + ", expiries: " + + view.getExpiredCount()); + + long expiry = System.currentTimeMillis() + 30000; + while (view.getQueueSize() > 0 && System.currentTimeMillis() < expiry) { + Thread.sleep(500); + } + LOG.info("Stats: size: " + view.getQueueSize() + ", enqueues: " + + view.getDequeueCount() + ", dequeues: " + + view.getDequeueCount() + ", dispatched: " + + view.getDispatchCount() + ", inflight: " + + view.getInFlightCount() + ", expiries: " + + view.getExpiredCount()); + assertEquals("Wrong QueueSize: ", 0, view.getQueueSize()); + assertEquals("all dequeues were expired", view.getDequeueCount(), view.getExpiredCount()); + } + + private BrokerService createBroker(boolean deleteAllMessages, long expireMessagesPeriod) throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName("localhost"); + AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter(); + adaptor.setDirectory(new File("data/")); + adaptor.setForceRecoverReferenceStore(true); + broker.setPersistenceAdapter(adaptor); + + PolicyEntry defaultPolicy = new PolicyEntry(); + if (useVMCursor) { + defaultPolicy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy()); + } + defaultPolicy.setExpireMessagesPeriod(expireMessagesPeriod); + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(defaultPolicy); + broker.setDestinationPolicy(policyMap); + broker.setDeleteAllMessagesOnStartup(deleteAllMessages); + broker.addConnector("tcp://localhost:61616"); + broker.start(); + broker.waitUntilStarted(); + return broker; + } + + protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer(); String domain = "org.apache.activemq"; ObjectName name; diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java index d26c39d005..8e9ed8be11 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java @@ -137,7 +137,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { assertTrue("producer completed within time ", !producingThread.isAlive()); - Thread.sleep(2*expiryPeriod); + Thread.sleep(3*expiryPeriod); DestinationViewMBean view = createView(destination); assertEquals("All sent have expired ", sendCount, view.getExpiredCount()); }