first cut of resolution to deterministic expiry https://issues.apache.org/activemq/browse/AMQ-1112 - default period is 30 seconds, destination policy entry allows it to be specified or turned off (0)

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@790880 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-07-03 12:02:07 +00:00
parent e583839810
commit c6a485f90b
14 changed files with 231 additions and 78 deletions

View File

@ -451,8 +451,6 @@
<exclude>**/AMQDeadlockTest3.*</exclude>
<exclude>**/AMQ1936Test.*</exclude>
<!-- excluding it until the issue is fixed (AMQ-1112), so we can have successful builds -->
<exclude>**/MessageExpirationReaperTest.*</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -89,6 +89,10 @@ public class DestinationView implements DestinationViewMBean {
public long getInFlightCount() {
return destination.getDestinationStatistics().getInflight().getCount();
}
public long getExpiredCount() {
return destination.getDestinationStatistics().getExpired().getCount();
}
public long getConsumerCount() {
return destination.getDestinationStatistics().getConsumers().getCount();
@ -363,4 +367,5 @@ public class DestinationView implements DestinationViewMBean {
}
return answer;
}
}

View File

@ -73,6 +73,14 @@ public interface DestinationViewMBean {
*/
long getInFlightCount();
/**
* Returns the number of messages that have expired
*
* @return The number of messages that have expired
*/
long getExpiredCount();
/**
* Returns the number of consumers subscribed this destination.
*

View File

@ -44,6 +44,7 @@ public abstract class BaseDestination implements Destination {
*/
public static final int MAX_PAGE_SIZE = 200;
public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
public static final long EXPIRE_MESSAGE_PERIOD = 30*1000;
protected final ActiveMQDestination destination;
protected final Broker broker;
protected final MessageStore store;
@ -69,6 +70,8 @@ public abstract class BaseDestination implements Destination {
protected final BrokerService brokerService;
protected final Broker regionBroker;
protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
/**
* @param broker
@ -213,7 +216,23 @@ public abstract class BaseDestination implements Destination {
public void setMaxBrowsePageSize(int maxPageSize) {
this.maxBrowsePageSize = maxPageSize;
}
public int getMaxExpirePageSize() {
return this.maxExpirePageSize;
}
public void setMaxExpirePageSize(int maxPageSize) {
this.maxExpirePageSize = maxPageSize;
}
public void setExpireMessagesPeriod(long expireMessagesPeriod) {
this.expireMessagesPeriod = expireMessagesPeriod;
}
public long getExpireMessagesPeriod() {
return expireMessagesPeriod;
}
public boolean isUseCache() {
return useCache;
}

View File

@ -21,7 +21,6 @@ import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.management.PollCountStatisticImpl;
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.management.TimeStatisticImpl;
import org.apache.tools.ant.taskdefs.condition.IsReference;
/**
* The J2EE Statistics for the a Destination.
@ -38,6 +37,7 @@ public class DestinationStatistics extends StatsImpl {
protected PollCountStatisticImpl messagesCached;
protected CountStatisticImpl dispatched;
protected CountStatisticImpl inflight;
protected CountStatisticImpl expired;
protected TimeStatisticImpl processTime;
public DestinationStatistics() {
@ -46,6 +46,8 @@ public class DestinationStatistics extends StatsImpl {
dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the destination");
dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the destination");
inflight = new CountStatisticImpl("inflight", "The number of messages dispatched but awaiting acknowledgement");
expired = new CountStatisticImpl("expired", "The number of messages that have expired");
consumers = new CountStatisticImpl("consumers", "The number of consumers that that are subscribing to messages from the destination");
consumers.setDoReset(false);
producers = new CountStatisticImpl("producers", "The number of producers that that are publishing messages to the destination");
@ -57,6 +59,7 @@ public class DestinationStatistics extends StatsImpl {
addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues);
addStatistic("inflight", inflight);
addStatistic("expired", expired);
addStatistic("consumers", consumers);
addStatistic("producers", producers);
addStatistic("messages", messages);
@ -76,6 +79,10 @@ public class DestinationStatistics extends StatsImpl {
return inflight;
}
public CountStatisticImpl getExpired() {
return expired;
}
public CountStatisticImpl getConsumers() {
return consumers;
}
@ -111,6 +118,7 @@ public class DestinationStatistics extends StatsImpl {
dequeues.reset();
dispatched.reset();
inflight.reset();
expired.reset();
}
}
@ -120,6 +128,7 @@ public class DestinationStatistics extends StatsImpl {
dispatched.setEnabled(enabled);
dequeues.setEnabled(enabled);
inflight.setEnabled(enabled);
expired.setEnabled(true);
consumers.setEnabled(enabled);
producers.setEnabled(enabled);
messages.setEnabled(enabled);
@ -134,6 +143,7 @@ public class DestinationStatistics extends StatsImpl {
dispatched.setParent(parent.dispatched);
dequeues.setParent(parent.dequeues);
inflight.setParent(parent.inflight);
expired.setParent(parent.expired);
consumers.setParent(parent.consumers);
producers.setParent(parent.producers);
messagesCached.setParent(parent.messagesCached);
@ -144,6 +154,7 @@ public class DestinationStatistics extends StatsImpl {
dispatched.setParent(null);
dequeues.setParent(null);
inflight.setParent(null);
expired.setParent(null);
consumers.setParent(null);
producers.setParent(null);
messagesCached.setParent(null);

View File

@ -17,6 +17,7 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -66,6 +67,7 @@ import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.DeterministicTaskRunner;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@ -112,7 +114,13 @@ public class Queue extends BaseDestination implements Task, UsageListener {
wakeup();
}
};
private final Runnable expireMessagesTask = new Runnable() {
public void run() {
expireMessages();
}
};
private final Object iteratingMutex = new Object() {};
private static final Scheduler scheduler = Scheduler.getInstance();
private static final Comparator<Subscription>orderedCompare = new Comparator<Subscription>() {
@ -177,6 +185,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
this.taskRunner = new DeterministicTaskRunner(this.executor,this);
}
if (getExpireMessagesPeriod() > 0) {
scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod());
}
super.initialize();
if (store != null) {
// Restore the persistent messages.
@ -192,7 +205,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// Message could have expired while it was being
// loaded..
if (broker.isExpired(message)) {
messageExpired(createConnectionContext(), message);
messageExpired(createConnectionContext(), null, message, false);
return true;
}
if (hasSpace()) {
@ -416,11 +429,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
public void run() {
try {
// While waiting for space to free up... the
// message may have expired.
if (message.isExpired()) {
broker.messageExpired(context, message);
destinationStatistics.getExpired().increment();
} else {
doMessageSend(producerExchange, message);
}
@ -498,6 +511,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
throw new IOException(
"Connection closed, send aborted.");
}
LOG.debug(this + ", waiting for store space... msg: " + message);
}
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
store.addMessage(context, message);
@ -516,8 +530,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// op, by that time the message could have expired..
if (broker.isExpired(message)) {
broker.messageExpired(context, message);
//message not added to stats yet
//destinationStatistics.getMessages().decrement();
destinationStatistics.getExpired().increment();
return;
}
sendMessage(context, message);
@ -537,9 +550,34 @@ public class Queue extends BaseDestination implements Task, UsageListener {
sendMessage(context, message);
}
}
private void expireMessages() {
LOG.info("expiring messages...");
public void gc(){
}
// just track the insertion count
List<Message> l = new AbstractList<Message>() {
int size = 0;
@Override
public void add(int index, Message element) {
size++;
}
@Override
public int size() {
return size;
}
@Override
public Message get(int index) {
return null;
}
};
doBrowse(true, l, getMaxBrowsePageSize());
}
public void gc(){
}
public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
messageConsumed(context, node);
@ -593,6 +631,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (this.executor != null) {
this.executor.shutdownNow();
}
LOG.info(toString() + ", canceling expireMessagesTask");
scheduler.cancel(expireMessagesTask);
if (messages != null) {
messages.stop();
}
@ -691,57 +733,74 @@ public class Queue extends BaseDestination implements Task, UsageListener {
return result;
}
public Message[] browse() {
int count = 0;
public Message[] browse() {
List<Message> l = new ArrayList<Message>();
doBrowse(false, l, getMaxBrowsePageSize());
return l.toArray(new Message[l.size()]);
}
public void doBrowse(boolean forcePageIn, List<Message> l, int max) {
final ConnectionContext connectionContext = createConnectionContext();
try {
pageInMessages(false);
synchronized (this.pagedInPendingDispatch) {
for (Iterator<QueueMessageReference> i = this.pagedInPendingDispatch
.iterator(); i.hasNext()
&& count < getMaxBrowsePageSize();) {
l.add(i.next().getMessage());
count++;
}
}
if (count < getMaxBrowsePageSize()) {
synchronized (pagedInMessages) {
for (Iterator<QueueMessageReference> i = this.pagedInMessages
.values().iterator(); i.hasNext()
&& count < getMaxBrowsePageSize();) {
Message m = i.next().getMessage();
if (l.contains(m) == false) {
l.add(m);
count++;
}
pageInMessages(forcePageIn);
List<MessageReference> toExpire = new ArrayList<MessageReference>();
dispatchLock.lock();
try {
synchronized (pagedInPendingDispatch) {
addAll(pagedInPendingDispatch, l, max, toExpire);
for (MessageReference ref : toExpire) {
pagedInPendingDispatch.remove(ref);
messageExpired(connectionContext, ref, false);
}
}
}
if (count < getMaxBrowsePageSize()) {
synchronized (messages) {
try {
messages.reset();
while (messages.hasNext()
&& count < getMaxBrowsePageSize()) {
MessageReference node = messages.next();
messages.rollback(node.getMessageId());
if (node != null) {
Message m = node.getMessage();
if (l.contains(m) == false) {
l.add(m);
count++;
toExpire.clear();
synchronized (pagedInMessages) {
addAll(pagedInMessages.values(), l, max, toExpire);
}
for (MessageReference ref : toExpire) {
messageExpired(connectionContext, ref, false);
}
if (l.size() < getMaxBrowsePageSize()) {
synchronized (messages) {
try {
messages.reset();
while (messages.hasNext() && l.size() < max) {
MessageReference node = messages.next();
messages.rollback(node.getMessageId());
if (node != null) {
if (broker.isExpired(node)) {
messageExpired(connectionContext,
createMessageReference(node.getMessage()), false);
} else if (l.contains(node.getMessage()) == false) {
l.add(node.getMessage());
}
}
}
} finally {
messages.release();
}
} finally {
messages.release();
}
}
} finally {
dispatchLock.unlock();
}
} catch (Exception e) {
LOG.error("Problem retrieving message in browse() ", e);
LOG.error("Problem retrieving message for browse", e);
}
}
private void addAll(Collection<QueueMessageReference> refs,
List<Message> l, int maxBrowsePageSize, List<MessageReference> toExpire) throws Exception {
for (Iterator<QueueMessageReference> i = refs.iterator(); i.hasNext()
&& l.size() < getMaxBrowsePageSize();) {
QueueMessageReference ref = i.next();
if (broker.isExpired(ref)) {
toExpire.add(ref);
} else if (l.contains(ref.getMessage()) == false) {
l.add(ref.getMessage());
}
}
return l.toArray(new Message[l.size()]);
}
public Message getMessage(String id) {
@ -1190,22 +1249,26 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
public void messageExpired(ConnectionContext context,MessageReference reference) {
messageExpired(context,null,reference);
public void messageExpired(ConnectionContext context,MessageReference reference, boolean dispatched) {
messageExpired(context,null,reference, dispatched);
}
public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) {
messageExpired(context, subs, reference, true);
}
public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference, boolean dispatched) {
broker.messageExpired(context, reference);
destinationStatistics.getDequeues().increment();
destinationStatistics.getInflight().decrement();
destinationStatistics.getExpired().increment();
if (dispatched) {
destinationStatistics.getInflight().decrement();
}
try {
removeMessage(context,subs,(QueueMessageReference)reference);
} catch (IOException e) {
LOG.error("Failed to remove expired Message from the store ",e);
}
synchronized(pagedInMessages) {
pagedInMessages.remove(reference.getMessageId());
}
wakeup();
}
@ -1286,7 +1349,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
result.add(ref);
count++;
} else {
messageExpired(createConnectionContext(), ref);
messageExpired(createConnectionContext(), ref, false);
}
}
} finally {
@ -1312,7 +1375,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
return resultList;
}
private void doDispatch(List<QueueMessageReference> list) throws Exception {
dispatchLock.lock();
try {

View File

@ -709,6 +709,10 @@ public class RegionBroker extends EmptyBroker {
BrokerSupport.resend(context,message,
deadLetterDestination);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Expired message with no DLQ strategy in place");
}
}
}
}

View File

@ -278,6 +278,7 @@ public class Topic extends BaseDestination implements Task{
// destination.. it may have expired.
if (message.isExpired()) {
broker.messageExpired(context, message);
getDestinationStatistics().getExpired().increment();
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
@ -306,6 +307,7 @@ public class Topic extends BaseDestination implements Task{
// While waiting for space to free up... the
// message may have expired.
if (message.isExpired()) {
getDestinationStatistics().getExpired().increment();
broker.messageExpired(context, message);
} else {
doMessageSend(producerExchange, message);
@ -361,6 +363,7 @@ public class Topic extends BaseDestination implements Task{
// The usage manager could have delayed us by the time
// we unblock the message could have expired..
if (message.isExpired()) {
getDestinationStatistics().getExpired().increment();
if (LOG.isDebugEnabled()) {
LOG.debug("Expired message: " + message);
}
@ -418,6 +421,7 @@ public class Topic extends BaseDestination implements Task{
// operration.. by that time the message could have
// expired..
if (broker.isExpired(message)) {
getDestinationStatistics().getExpired().increment();
broker.messageExpired(context, message);
message.decrementReferenceCount();
return;
@ -594,6 +598,7 @@ public class Topic extends BaseDestination implements Task{
broker.messageExpired(context, reference);
destinationStatistics.getMessages().decrement();
destinationStatistics.getEnqueues().decrement();
destinationStatistics.getExpired().increment();
MessageAck ack = new MessageAck();
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
ack.setDestination(destination);

View File

@ -161,6 +161,7 @@ public class TopicSubscription extends AbstractSubscription {
matched.remove();
dispatchedCounter.incrementAndGet();
node.decrementReferenceCount();
node.getRegionDestination().getDestinationStatistics().getExpired().increment();
broker.messageExpired(getContext(), node);
break;
}

View File

@ -73,15 +73,15 @@ public class PolicyEntry extends DestinationMapEntry {
private boolean advisoryWhenFull;
private boolean advisoryForDelivery;
private boolean advisoryForConsumed;
private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD;
private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE;
public void configure(Broker broker,Queue queue) {
baseConfiguration(queue);
if (dispatchPolicy != null) {
queue.setDispatchPolicy(dispatchPolicy);
}
if (deadLetterStrategy != null) {
queue.setDeadLetterStrategy(deadLetterStrategy);
}
queue.setDeadLetterStrategy(getDeadLetterStrategy());
queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
if (memoryLimit > 0) {
queue.getMemoryUsage().setLimit(memoryLimit);
@ -104,9 +104,7 @@ public class PolicyEntry extends DestinationMapEntry {
if (dispatchPolicy != null) {
topic.setDispatchPolicy(dispatchPolicy);
}
if (deadLetterStrategy != null) {
topic.setDeadLetterStrategy(deadLetterStrategy);
}
topic.setDeadLetterStrategy(getDeadLetterStrategy());
if (subscriptionRecoveryPolicy != null) {
topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy.copy());
}
@ -132,6 +130,8 @@ public class PolicyEntry extends DestinationMapEntry {
destination.setAdvisdoryForFastProducers(isAdvisdoryForFastProducers());
destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
destination.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
destination.setExpireMessagesPeriod(getExpireMessagesPeriod());
destination.setMaxExpirePageSize(getMaxExpirePageSize());
}
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
@ -543,4 +543,21 @@ public class PolicyEntry extends DestinationMapEntry {
this.advisdoryForFastProducers = advisdoryForFastProducers;
}
public void setMaxExpirePageSize(int maxExpirePageSize) {
this.maxExpirePageSize = maxExpirePageSize;
}
public int getMaxExpirePageSize() {
return maxExpirePageSize;
}
public void setExpireMessagesPeriod(long expireMessagesPeriod) {
this.expireMessagesPeriod = expireMessagesPeriod;
}
public long getExpireMessagesPeriod() {
return expireMessagesPeriod;
}
}

View File

@ -242,8 +242,8 @@ public abstract class Usage<T extends Usage> implements Service {
LOG.debug("Memory usage change from: " + oldPercentUsage + "% of available memory, to: "
+ newPercentUsage + "% of available memory");
}
if (newPercentUsage >= 80) {
LOG.warn("Memory usage is now over 80%!");
if (newPercentUsage >= 100) {
LOG.warn("Memory usage is now at " + newPercentUsage + "%");
}
if (started.get()) {

View File

@ -19,7 +19,6 @@ package org.apache.activemq;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -28,8 +27,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;

View File

@ -1,6 +1,7 @@
package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
@ -16,6 +17,8 @@ import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.junit.After;
import org.junit.Before;
@ -56,8 +59,16 @@ public class MessageExpirationReaperTest {
broker = new BrokerService();
// broker.setPersistent(false);
// broker.setUseJmx(true);
broker.setDeleteAllMessagesOnStartup(true);
broker.setBrokerName(brokerName);
broker.addConnector(brokerUrl);
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setExpireMessagesPeriod(500);
policyMap.setDefaultEntry(defaultEntry);
broker.setDestinationPolicy(policyMap);
broker.start();
}
@ -85,15 +96,13 @@ public class MessageExpirationReaperTest {
}
// Let the messages expire
Thread.sleep(1000);
Thread.sleep(2000);
DestinationViewMBean view = createView(destination);
/*################### CURRENT EXPECTED FAILURE ####################*/
// The messages expire and should be reaped but they're not currently
// reaped until there is an active consumer placed on the queue
assertEquals("Incorrect count: " + view.getInFlightCount(), 0, view.getInFlightCount());
assertEquals("Incorrect inflight count: " + view.getInFlightCount(), 0, view.getInFlightCount());
assertEquals("Incorrect queue size count", 0, view.getQueueSize());
assertEquals("Incorrect expired size count", 3, view.getEnqueueCount());
// Send more messages with an expiration
for (int i = 0; i < count; i++) {
@ -101,10 +110,13 @@ public class MessageExpirationReaperTest {
producer.send(message);
}
// Let the messages expire
Thread.sleep(2000);
// Simply browse the queue
Session browserSession = createSession();
QueueBrowser browser = browserSession.createBrowser((Queue) destination);
browser.getEnumeration();
assertFalse("no message in the browser", browser.getEnumeration().hasMoreElements());
// The messages expire and should be reaped because of the presence of
// the queue browser

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.usecases;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@ -33,17 +35,21 @@ import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class ExpiredMessagesTest extends CombinationTestSupport {
private static final Log LOG = LogFactory.getLog(ExpiredMessagesTest.class);
BrokerService broker;
Connection connection;
Session session;
MessageProducer producer;
MessageConsumer consumer;
public ActiveMQDestination destination;
public ActiveMQDestination destination = new ActiveMQQueue("test");
public static Test suite() {
return suite(ExpiredMessagesTest.class);
@ -77,6 +83,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
producer.setTimeToLive(100);
consumer = session.createConsumer(destination);
connection.start();
final AtomicLong received = new AtomicLong();
Thread consumerThread = new Thread("Consumer Thread") {
public void run() {
@ -84,7 +91,9 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
try {
long end = System.currentTimeMillis();
while (end - start < 3000) {
consumer.receive(1000);
if (consumer.receive(1000) != null) {
received.incrementAndGet();
}
Thread.sleep(100);
end = System.currentTimeMillis();
}
@ -115,9 +124,13 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
consumerThread.join();
producingThread.join();
DestinationViewMBean view = createView(destination);
assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), view.getDispatchCount() - view.getDequeueCount(), view.getInFlightCount());
DestinationViewMBean view = createView(destination);
LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount()
+ ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount());
assertEquals("got what did not expire", received.get(), view.getDequeueCount() - view.getExpiredCount());
//assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), view.getDispatchCount() - view.getDequeueCount(), view.getInFlightCount());
}
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {