diff --git a/activemq-core/pom.xml b/activemq-core/pom.xml index 20a1770a1f..23a917c375 100755 --- a/activemq-core/pom.xml +++ b/activemq-core/pom.xml @@ -451,8 +451,6 @@ **/AMQDeadlockTest3.* **/AMQ1936Test.* - - **/MessageExpirationReaperTest.* diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index 39fd6cf36b..a4158ec277 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -89,6 +89,10 @@ public class DestinationView implements DestinationViewMBean { public long getInFlightCount() { return destination.getDestinationStatistics().getInflight().getCount(); } + + public long getExpiredCount() { + return destination.getDestinationStatistics().getExpired().getCount(); + } public long getConsumerCount() { return destination.getDestinationStatistics().getConsumers().getCount(); @@ -363,4 +367,5 @@ public class DestinationView implements DestinationViewMBean { } return answer; } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java index 9a6ecfa86f..41502425d0 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java @@ -73,6 +73,14 @@ public interface DestinationViewMBean { */ long getInFlightCount(); + + /** + * Returns the number of messages that have expired + * + * @return The number of messages that have expired + */ + long getExpiredCount(); + /** * Returns the number of consumers subscribed this destination. * diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 62fe4e888f..fa04bb0ccb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -44,6 +44,7 @@ public abstract class BaseDestination implements Destination { */ public static final int MAX_PAGE_SIZE = 200; public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2; + public static final long EXPIRE_MESSAGE_PERIOD = 30*1000; protected final ActiveMQDestination destination; protected final Broker broker; protected final MessageStore store; @@ -69,6 +70,8 @@ public abstract class BaseDestination implements Destination { protected final BrokerService brokerService; protected final Broker regionBroker; protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY; + protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD; + private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE; /** * @param broker @@ -213,7 +216,23 @@ public abstract class BaseDestination implements Destination { public void setMaxBrowsePageSize(int maxPageSize) { this.maxBrowsePageSize = maxPageSize; } + + public int getMaxExpirePageSize() { + return this.maxExpirePageSize; + } + public void setMaxExpirePageSize(int maxPageSize) { + this.maxExpirePageSize = maxPageSize; + } + + public void setExpireMessagesPeriod(long expireMessagesPeriod) { + this.expireMessagesPeriod = expireMessagesPeriod; + } + + public long getExpireMessagesPeriod() { + return expireMessagesPeriod; + } + public boolean isUseCache() { return useCache; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java index 60fdc4d114..90dc404e70 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java @@ -21,7 +21,6 @@ import org.apache.activemq.management.CountStatisticImpl; import org.apache.activemq.management.PollCountStatisticImpl; import org.apache.activemq.management.StatsImpl; import org.apache.activemq.management.TimeStatisticImpl; -import org.apache.tools.ant.taskdefs.condition.IsReference; /** * The J2EE Statistics for the a Destination. @@ -38,6 +37,7 @@ public class DestinationStatistics extends StatsImpl { protected PollCountStatisticImpl messagesCached; protected CountStatisticImpl dispatched; protected CountStatisticImpl inflight; + protected CountStatisticImpl expired; protected TimeStatisticImpl processTime; public DestinationStatistics() { @@ -46,6 +46,8 @@ public class DestinationStatistics extends StatsImpl { dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the destination"); dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the destination"); inflight = new CountStatisticImpl("inflight", "The number of messages dispatched but awaiting acknowledgement"); + expired = new CountStatisticImpl("expired", "The number of messages that have expired"); + consumers = new CountStatisticImpl("consumers", "The number of consumers that that are subscribing to messages from the destination"); consumers.setDoReset(false); producers = new CountStatisticImpl("producers", "The number of producers that that are publishing messages to the destination"); @@ -57,6 +59,7 @@ public class DestinationStatistics extends StatsImpl { addStatistic("dispatched", dispatched); addStatistic("dequeues", dequeues); addStatistic("inflight", inflight); + addStatistic("expired", expired); addStatistic("consumers", consumers); addStatistic("producers", producers); addStatistic("messages", messages); @@ -76,6 +79,10 @@ public class DestinationStatistics extends StatsImpl { return inflight; } + public CountStatisticImpl getExpired() { + return expired; + } + public CountStatisticImpl getConsumers() { return consumers; } @@ -111,6 +118,7 @@ public class DestinationStatistics extends StatsImpl { dequeues.reset(); dispatched.reset(); inflight.reset(); + expired.reset(); } } @@ -120,6 +128,7 @@ public class DestinationStatistics extends StatsImpl { dispatched.setEnabled(enabled); dequeues.setEnabled(enabled); inflight.setEnabled(enabled); + expired.setEnabled(true); consumers.setEnabled(enabled); producers.setEnabled(enabled); messages.setEnabled(enabled); @@ -134,6 +143,7 @@ public class DestinationStatistics extends StatsImpl { dispatched.setParent(parent.dispatched); dequeues.setParent(parent.dequeues); inflight.setParent(parent.inflight); + expired.setParent(parent.expired); consumers.setParent(parent.consumers); producers.setParent(parent.producers); messagesCached.setParent(parent.messagesCached); @@ -144,6 +154,7 @@ public class DestinationStatistics extends StatsImpl { dispatched.setParent(null); dequeues.setParent(null); inflight.setParent(null); + expired.setParent(null); consumers.setParent(null); producers.setParent(null); messagesCached.setParent(null); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index e401507881..82a558149a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -17,6 +17,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; +import java.util.AbstractList; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -66,6 +67,7 @@ import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; import org.apache.activemq.thread.DeterministicTaskRunner; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; @@ -112,7 +114,13 @@ public class Queue extends BaseDestination implements Task, UsageListener { wakeup(); } }; + private final Runnable expireMessagesTask = new Runnable() { + public void run() { + expireMessages(); + } + }; private final Object iteratingMutex = new Object() {}; + private static final Scheduler scheduler = Scheduler.getInstance(); private static final ComparatororderedCompare = new Comparator() { @@ -177,6 +185,11 @@ public class Queue extends BaseDestination implements Task, UsageListener { this.taskRunner = new DeterministicTaskRunner(this.executor,this); } + + if (getExpireMessagesPeriod() > 0) { + scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod()); + } + super.initialize(); if (store != null) { // Restore the persistent messages. @@ -192,7 +205,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { // Message could have expired while it was being // loaded.. if (broker.isExpired(message)) { - messageExpired(createConnectionContext(), message); + messageExpired(createConnectionContext(), null, message, false); return true; } if (hasSpace()) { @@ -416,11 +429,11 @@ public class Queue extends BaseDestination implements Task, UsageListener { public void run() { try { - // While waiting for space to free up... the // message may have expired. if (message.isExpired()) { broker.messageExpired(context, message); + destinationStatistics.getExpired().increment(); } else { doMessageSend(producerExchange, message); } @@ -498,6 +511,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { throw new IOException( "Connection closed, send aborted."); } + LOG.debug(this + ", waiting for store space... msg: " + message); } message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); store.addMessage(context, message); @@ -516,8 +530,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { // op, by that time the message could have expired.. if (broker.isExpired(message)) { broker.messageExpired(context, message); - //message not added to stats yet - //destinationStatistics.getMessages().decrement(); + destinationStatistics.getExpired().increment(); return; } sendMessage(context, message); @@ -537,9 +550,34 @@ public class Queue extends BaseDestination implements Task, UsageListener { sendMessage(context, message); } } + + private void expireMessages() { + LOG.info("expiring messages..."); - public void gc(){ - } + // just track the insertion count + List l = new AbstractList() { + int size = 0; + + @Override + public void add(int index, Message element) { + size++; + } + + @Override + public int size() { + return size; + } + + @Override + public Message get(int index) { + return null; + } + }; + doBrowse(true, l, getMaxBrowsePageSize()); + } + + public void gc(){ + } public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException { messageConsumed(context, node); @@ -593,6 +631,10 @@ public class Queue extends BaseDestination implements Task, UsageListener { if (this.executor != null) { this.executor.shutdownNow(); } + + LOG.info(toString() + ", canceling expireMessagesTask"); + scheduler.cancel(expireMessagesTask); + if (messages != null) { messages.stop(); } @@ -691,57 +733,74 @@ public class Queue extends BaseDestination implements Task, UsageListener { return result; } - public Message[] browse() { - int count = 0; + public Message[] browse() { List l = new ArrayList(); + doBrowse(false, l, getMaxBrowsePageSize()); + return l.toArray(new Message[l.size()]); + } + + public void doBrowse(boolean forcePageIn, List l, int max) { + final ConnectionContext connectionContext = createConnectionContext(); try { - pageInMessages(false); - synchronized (this.pagedInPendingDispatch) { - for (Iterator i = this.pagedInPendingDispatch - .iterator(); i.hasNext() - && count < getMaxBrowsePageSize();) { - l.add(i.next().getMessage()); - count++; - } - } - if (count < getMaxBrowsePageSize()) { - synchronized (pagedInMessages) { - for (Iterator i = this.pagedInMessages - .values().iterator(); i.hasNext() - && count < getMaxBrowsePageSize();) { - Message m = i.next().getMessage(); - if (l.contains(m) == false) { - l.add(m); - count++; - } + pageInMessages(forcePageIn); + List toExpire = new ArrayList(); + dispatchLock.lock(); + try { + synchronized (pagedInPendingDispatch) { + addAll(pagedInPendingDispatch, l, max, toExpire); + for (MessageReference ref : toExpire) { + pagedInPendingDispatch.remove(ref); + messageExpired(connectionContext, ref, false); } } - } - if (count < getMaxBrowsePageSize()) { - synchronized (messages) { - try { - messages.reset(); - while (messages.hasNext() - && count < getMaxBrowsePageSize()) { - MessageReference node = messages.next(); - messages.rollback(node.getMessageId()); - if (node != null) { - Message m = node.getMessage(); - if (l.contains(m) == false) { - l.add(m); - count++; + toExpire.clear(); + synchronized (pagedInMessages) { + addAll(pagedInMessages.values(), l, max, toExpire); + } + for (MessageReference ref : toExpire) { + messageExpired(connectionContext, ref, false); + } + + if (l.size() < getMaxBrowsePageSize()) { + synchronized (messages) { + try { + messages.reset(); + while (messages.hasNext() && l.size() < max) { + MessageReference node = messages.next(); + messages.rollback(node.getMessageId()); + if (node != null) { + if (broker.isExpired(node)) { + messageExpired(connectionContext, + createMessageReference(node.getMessage()), false); + } else if (l.contains(node.getMessage()) == false) { + l.add(node.getMessage()); + } } } + } finally { + messages.release(); } - } finally { - messages.release(); } } + } finally { + dispatchLock.unlock(); } } catch (Exception e) { - LOG.error("Problem retrieving message in browse() ", e); + LOG.error("Problem retrieving message for browse", e); + } + } + + private void addAll(Collection refs, + List l, int maxBrowsePageSize, List toExpire) throws Exception { + for (Iterator i = refs.iterator(); i.hasNext() + && l.size() < getMaxBrowsePageSize();) { + QueueMessageReference ref = i.next(); + if (broker.isExpired(ref)) { + toExpire.add(ref); + } else if (l.contains(ref.getMessage()) == false) { + l.add(ref.getMessage()); + } } - return l.toArray(new Message[l.size()]); } public Message getMessage(String id) { @@ -1190,22 +1249,26 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } - public void messageExpired(ConnectionContext context,MessageReference reference) { - messageExpired(context,null,reference); + public void messageExpired(ConnectionContext context,MessageReference reference, boolean dispatched) { + messageExpired(context,null,reference, dispatched); } public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) { + messageExpired(context, subs, reference, true); + } + + public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference, boolean dispatched) { broker.messageExpired(context, reference); destinationStatistics.getDequeues().increment(); - destinationStatistics.getInflight().decrement(); + destinationStatistics.getExpired().increment(); + if (dispatched) { + destinationStatistics.getInflight().decrement(); + } try { removeMessage(context,subs,(QueueMessageReference)reference); } catch (IOException e) { LOG.error("Failed to remove expired Message from the store ",e); } - synchronized(pagedInMessages) { - pagedInMessages.remove(reference.getMessageId()); - } wakeup(); } @@ -1286,7 +1349,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { result.add(ref); count++; } else { - messageExpired(createConnectionContext(), ref); + messageExpired(createConnectionContext(), ref, false); } } } finally { @@ -1312,7 +1375,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } return resultList; } - + private void doDispatch(List list) throws Exception { dispatchLock.lock(); try { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 26d1c1fdba..953b240401 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -709,6 +709,10 @@ public class RegionBroker extends EmptyBroker { BrokerSupport.resend(context,message, deadLetterDestination); } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Expired message with no DLQ strategy in place"); + } } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index ed475ec481..8b462ac5e8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -278,6 +278,7 @@ public class Topic extends BaseDestination implements Task{ // destination.. it may have expired. if (message.isExpired()) { broker.messageExpired(context, message); + getDestinationStatistics().getExpired().increment(); if (sendProducerAck) { ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); context.getConnection().dispatchAsync(ack); @@ -306,6 +307,7 @@ public class Topic extends BaseDestination implements Task{ // While waiting for space to free up... the // message may have expired. if (message.isExpired()) { + getDestinationStatistics().getExpired().increment(); broker.messageExpired(context, message); } else { doMessageSend(producerExchange, message); @@ -361,6 +363,7 @@ public class Topic extends BaseDestination implements Task{ // The usage manager could have delayed us by the time // we unblock the message could have expired.. if (message.isExpired()) { + getDestinationStatistics().getExpired().increment(); if (LOG.isDebugEnabled()) { LOG.debug("Expired message: " + message); } @@ -418,6 +421,7 @@ public class Topic extends BaseDestination implements Task{ // operration.. by that time the message could have // expired.. if (broker.isExpired(message)) { + getDestinationStatistics().getExpired().increment(); broker.messageExpired(context, message); message.decrementReferenceCount(); return; @@ -594,6 +598,7 @@ public class Topic extends BaseDestination implements Task{ broker.messageExpired(context, reference); destinationStatistics.getMessages().decrement(); destinationStatistics.getEnqueues().decrement(); + destinationStatistics.getExpired().increment(); MessageAck ack = new MessageAck(); ack.setAckType(MessageAck.STANDARD_ACK_TYPE); ack.setDestination(destination); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 9844801cce..f36e5554cc 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -161,6 +161,7 @@ public class TopicSubscription extends AbstractSubscription { matched.remove(); dispatchedCounter.incrementAndGet(); node.decrementReferenceCount(); + node.getRegionDestination().getDestinationStatistics().getExpired().increment(); broker.messageExpired(getContext(), node); break; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 5c9a033d7e..4673255e02 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -73,15 +73,15 @@ public class PolicyEntry extends DestinationMapEntry { private boolean advisoryWhenFull; private boolean advisoryForDelivery; private boolean advisoryForConsumed; + private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD; + private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE; public void configure(Broker broker,Queue queue) { baseConfiguration(queue); if (dispatchPolicy != null) { queue.setDispatchPolicy(dispatchPolicy); } - if (deadLetterStrategy != null) { - queue.setDeadLetterStrategy(deadLetterStrategy); - } + queue.setDeadLetterStrategy(getDeadLetterStrategy()); queue.setMessageGroupMapFactory(getMessageGroupMapFactory()); if (memoryLimit > 0) { queue.getMemoryUsage().setLimit(memoryLimit); @@ -104,9 +104,7 @@ public class PolicyEntry extends DestinationMapEntry { if (dispatchPolicy != null) { topic.setDispatchPolicy(dispatchPolicy); } - if (deadLetterStrategy != null) { - topic.setDeadLetterStrategy(deadLetterStrategy); - } + topic.setDeadLetterStrategy(getDeadLetterStrategy()); if (subscriptionRecoveryPolicy != null) { topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy.copy()); } @@ -132,6 +130,8 @@ public class PolicyEntry extends DestinationMapEntry { destination.setAdvisdoryForFastProducers(isAdvisdoryForFastProducers()); destination.setAdvisoryWhenFull(isAdvisoryWhenFull()); destination.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers); + destination.setExpireMessagesPeriod(getExpireMessagesPeriod()); + destination.setMaxExpirePageSize(getMaxExpirePageSize()); } public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) { @@ -543,4 +543,21 @@ public class PolicyEntry extends DestinationMapEntry { this.advisdoryForFastProducers = advisdoryForFastProducers; } + public void setMaxExpirePageSize(int maxExpirePageSize) { + this.maxExpirePageSize = maxExpirePageSize; + } + + public int getMaxExpirePageSize() { + return maxExpirePageSize; + } + + public void setExpireMessagesPeriod(long expireMessagesPeriod) { + this.expireMessagesPeriod = expireMessagesPeriod; + } + + public long getExpireMessagesPeriod() { + return expireMessagesPeriod; + } + + } diff --git a/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java b/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java index 0493c2b156..cf9d63aeb6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java +++ b/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java @@ -242,8 +242,8 @@ public abstract class Usage implements Service { LOG.debug("Memory usage change from: " + oldPercentUsage + "% of available memory, to: " + newPercentUsage + "% of available memory"); } - if (newPercentUsage >= 80) { - LOG.warn("Memory usage is now over 80%!"); + if (newPercentUsage >= 100) { + LOG.warn("Memory usage is now at " + newPercentUsage + "%"); } if (started.get()) { diff --git a/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java index 1872a0b741..1b55c18d67 100755 --- a/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java @@ -19,7 +19,6 @@ package org.apache.activemq; import java.lang.Thread.UncaughtExceptionHandler; import java.util.HashMap; import java.util.Map; -import java.util.Vector; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -28,8 +27,6 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java index 0d9210494a..cd53f42345 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java @@ -1,6 +1,7 @@ package org.apache.activemq.bugs; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import javax.jms.ConnectionFactory; import javax.jms.MessageProducer; @@ -16,6 +17,8 @@ import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; import org.junit.After; import org.junit.Before; @@ -56,8 +59,16 @@ public class MessageExpirationReaperTest { broker = new BrokerService(); // broker.setPersistent(false); // broker.setUseJmx(true); + broker.setDeleteAllMessagesOnStartup(true); broker.setBrokerName(brokerName); broker.addConnector(brokerUrl); + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setExpireMessagesPeriod(500); + policyMap.setDefaultEntry(defaultEntry); + broker.setDestinationPolicy(policyMap); + broker.start(); } @@ -85,15 +96,13 @@ public class MessageExpirationReaperTest { } // Let the messages expire - Thread.sleep(1000); + Thread.sleep(2000); DestinationViewMBean view = createView(destination); - /*################### CURRENT EXPECTED FAILURE ####################*/ - // The messages expire and should be reaped but they're not currently - // reaped until there is an active consumer placed on the queue - assertEquals("Incorrect count: " + view.getInFlightCount(), 0, view.getInFlightCount()); - + assertEquals("Incorrect inflight count: " + view.getInFlightCount(), 0, view.getInFlightCount()); + assertEquals("Incorrect queue size count", 0, view.getQueueSize()); + assertEquals("Incorrect expired size count", 3, view.getEnqueueCount()); // Send more messages with an expiration for (int i = 0; i < count; i++) { @@ -101,10 +110,13 @@ public class MessageExpirationReaperTest { producer.send(message); } + // Let the messages expire + Thread.sleep(2000); + // Simply browse the queue Session browserSession = createSession(); QueueBrowser browser = browserSession.createBrowser((Queue) destination); - browser.getEnumeration(); + assertFalse("no message in the browser", browser.getEnumeration().hasMoreElements()); // The messages expire and should be reaped because of the presence of // the queue browser diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java index baab3070e3..dfe1e3a56b 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.usecases; +import java.util.concurrent.atomic.AtomicLong; + import javax.jms.Connection; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -33,17 +35,21 @@ import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; public class ExpiredMessagesTest extends CombinationTestSupport { + private static final Log LOG = LogFactory.getLog(ExpiredMessagesTest.class); + BrokerService broker; Connection connection; Session session; MessageProducer producer; MessageConsumer consumer; - public ActiveMQDestination destination; + public ActiveMQDestination destination = new ActiveMQQueue("test"); public static Test suite() { return suite(ExpiredMessagesTest.class); @@ -77,6 +83,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport { producer.setTimeToLive(100); consumer = session.createConsumer(destination); connection.start(); + final AtomicLong received = new AtomicLong(); Thread consumerThread = new Thread("Consumer Thread") { public void run() { @@ -84,7 +91,9 @@ public class ExpiredMessagesTest extends CombinationTestSupport { try { long end = System.currentTimeMillis(); while (end - start < 3000) { - consumer.receive(1000); + if (consumer.receive(1000) != null) { + received.incrementAndGet(); + } Thread.sleep(100); end = System.currentTimeMillis(); } @@ -115,9 +124,13 @@ public class ExpiredMessagesTest extends CombinationTestSupport { consumerThread.join(); producingThread.join(); - DestinationViewMBean view = createView(destination); - assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), view.getDispatchCount() - view.getDequeueCount(), view.getInFlightCount()); + DestinationViewMBean view = createView(destination); + LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount() + + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount()); + + assertEquals("got what did not expire", received.get(), view.getDequeueCount() - view.getExpiredCount()); + //assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), view.getDispatchCount() - view.getDequeueCount(), view.getInFlightCount()); } protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {