git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@792598 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-07-09 16:26:42 +00:00
parent a72f26f0df
commit dbe847e0da
3 changed files with 116 additions and 21 deletions

View File

@ -205,7 +205,9 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// Message could have expired while it was being
// loaded..
if (broker.isExpired(message)) {
messageExpired(createConnectionContext(), message);
messageExpired(createConnectionContext(), createMessageReference(message));
// drop message will decrement so counter balance here
destinationStatistics.getMessages().increment();
return true;
}
if (hasSpace()) {

View File

@ -16,9 +16,12 @@
*/
package org.apache.activemq.usecases;
import java.io.File;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
@ -34,8 +37,10 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -49,7 +54,9 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
MessageProducer producer;
MessageConsumer consumer;
public ActiveMQDestination destination = new ActiveMQQueue("test");
public boolean useTextMessage = true;
public boolean useVMCursor = true;
public static Test suite() {
return suite(ExpiredMessagesTest.class);
}
@ -59,21 +66,8 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
}
protected void setUp() throws Exception {
broker = new BrokerService();
broker.setBrokerName("localhost");
broker.setDataDirectory("data/");
broker.setUseJmx(true);
broker.deleteAllMessages();
PolicyEntry defaultPolicy = new PolicyEntry();
defaultPolicy.setExpireMessagesPeriod(100);
PolicyMap policyMap = new PolicyMap();
policyMap.setDefaultEntry(defaultPolicy);
broker.setDestinationPolicy(policyMap);
broker.addConnector("tcp://localhost:61616");
broker.start();
broker.waitUntilStarted();
final boolean deleteAllMessages = true;
broker = createBroker(deleteAllMessages, 100);
}
public void testExpiredMessages() throws Exception {
@ -129,8 +123,8 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
producingThread.join();
session.close();
Thread.sleep(5000);
Thread.sleep(2000);
DestinationViewMBean view = createView(destination);
LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount()
+ ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount());
@ -145,8 +139,107 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
+ ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount());
assertEquals("Wrong inFlightCount: ", 0, view.getInFlightCount());
}
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
public void initCombosForTestRecoverExpiredMessages() {
addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE, Boolean.FALSE});
}
public void testRecoverExpiredMessages() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
"failover://tcp://localhost:61616");
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(destination);
producer.setTimeToLive(2000);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Thread producingThread = new Thread("Producing Thread") {
public void run() {
try {
int i = 0;
while (i++ < 1000) {
Message message = useTextMessage ? session
.createTextMessage("test") : session
.createObjectMessage("test");
producer.send(message);
}
producer.close();
} catch (Throwable ex) {
ex.printStackTrace();
}
}
};
producingThread.start();
producingThread.join();
DestinationViewMBean view = createView(destination);
LOG.info("Stats: size: " + view.getQueueSize() + ", enqueues: "
+ view.getDequeueCount() + ", dequeues: "
+ view.getDequeueCount() + ", dispatched: "
+ view.getDispatchCount() + ", inflight: "
+ view.getInFlightCount() + ", expiries: "
+ view.getExpiredCount());
LOG.info("stopping broker");
broker.stop();
broker.waitUntilStopped();
Thread.sleep(5000);
LOG.info("recovering broker");
final boolean deleteAllMessages = false;
broker = createBroker(deleteAllMessages, 5000);
view = createView(destination);
LOG.info("Stats: size: " + view.getQueueSize() + ", enqueues: "
+ view.getDequeueCount() + ", dequeues: "
+ view.getDequeueCount() + ", dispatched: "
+ view.getDispatchCount() + ", inflight: "
+ view.getInFlightCount() + ", expiries: "
+ view.getExpiredCount());
long expiry = System.currentTimeMillis() + 30000;
while (view.getQueueSize() > 0 && System.currentTimeMillis() < expiry) {
Thread.sleep(500);
}
LOG.info("Stats: size: " + view.getQueueSize() + ", enqueues: "
+ view.getDequeueCount() + ", dequeues: "
+ view.getDequeueCount() + ", dispatched: "
+ view.getDispatchCount() + ", inflight: "
+ view.getInFlightCount() + ", expiries: "
+ view.getExpiredCount());
assertEquals("Wrong QueueSize: ", 0, view.getQueueSize());
assertEquals("all dequeues were expired", view.getDequeueCount(), view.getExpiredCount());
}
private BrokerService createBroker(boolean deleteAllMessages, long expireMessagesPeriod) throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName("localhost");
AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter();
adaptor.setDirectory(new File("data/"));
adaptor.setForceRecoverReferenceStore(true);
broker.setPersistenceAdapter(adaptor);
PolicyEntry defaultPolicy = new PolicyEntry();
if (useVMCursor) {
defaultPolicy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
}
defaultPolicy.setExpireMessagesPeriod(expireMessagesPeriod);
PolicyMap policyMap = new PolicyMap();
policyMap.setDefaultEntry(defaultPolicy);
broker.setDestinationPolicy(policyMap);
broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
broker.addConnector("tcp://localhost:61616");
broker.start();
broker.waitUntilStarted();
return broker;
}
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer();
String domain = "org.apache.activemq";
ObjectName name;

View File

@ -137,7 +137,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
assertTrue("producer completed within time ", !producingThread.isAlive());
Thread.sleep(2*expiryPeriod);
Thread.sleep(3*expiryPeriod);
DestinationViewMBean view = createView(destination);
assertEquals("All sent have expired ", sendCount, view.getExpiredCount());
}