From ca242f4a34020d7ccbe2f7c8fe295cdafa482fd1 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Tue, 14 Jul 2009 12:59:19 +0000 Subject: [PATCH] resolve some issue with duplicate expiry due to concurrent execution with expiry task and dispatch/acks etc. some more tests for stats included - https://issues.apache.org/activemq/browse/AMQ-1112 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@793892 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/PrefetchSubscription.java | 32 ++----- .../apache/activemq/broker/region/Queue.java | 19 +++- .../broker/region/QueueSubscription.java | 6 ++ .../activemq/broker/region/RegionBroker.java | 32 +++++-- .../activemq/command/ActiveMQMessage.java | 8 -- .../org/apache/activemq/command/Message.java | 14 ++- .../usecases/ExpiredMessagesTest.java | 50 +++++++--- .../ExpiredMessagesWithNoConsumerTest.java | 93 ++++++++++++------- .../java/org/apache/activemq/util/Wait.java | 40 ++++++++ 9 files changed, 196 insertions(+), 98 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/util/Wait.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 515b8e9f0f..d4c26fed69 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -222,9 +222,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription { // Don't remove the nodes until we are committed. if (!context.isInTransaction()) { dequeueCounter++; - if (!this.getConsumerInfo().isBrowser()) { - node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); - } node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); removeList.add(node); } else { @@ -238,7 +235,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription { synchronized(dispatchLock) { dequeueCounter++; dispatched.remove(node); - node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); prefetchExtension--; } @@ -287,7 +283,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription { MessageId messageId = node.getMessageId(); if (ack.getLastMessageId().equals(messageId)) { // this should never be within a transaction - node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); destination = node.getRegionDestination(); acknowledge(context, ack, node); @@ -303,16 +298,12 @@ public abstract class PrefetchSubscription extends AbstractSubscription { int index = 0; for (Iterator iter = dispatched.iterator(); iter.hasNext(); index++) { final MessageReference node = iter.next(); - if (hasNotAlreadyExpired(node)) { - if (node.isExpired()) { + if (node.isExpired()) { + if (broker.isExpired(node)) { node.getRegionDestination().messageExpired(context, this, node); - dispatched.remove(node); - node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); } - } else { - // already expired dispatched.remove(node); - node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); + node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); } if (ack.getLastMessageId().equals(node.getMessageId())) { prefetchExtension = Math.max(prefetchExtension, index + 1); @@ -373,9 +364,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } if (inAckRange) { sendToDLQ(context, node); - node.getRegionDestination().getDestinationStatistics() - .getDequeues().increment(); - node.getRegionDestination().getDestinationStatistics() .getInflight().increment(); @@ -418,16 +406,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } } - private boolean hasNotAlreadyExpired(MessageReference node) { - boolean hasNotExpired = true; - try { - hasNotExpired = node.getMessage().getProperty(RegionBroker.ORIGINAL_EXPIRATION) == null; - } catch (IOException e) { - LOG.warn("failed to determine value message property " + RegionBroker.ORIGINAL_EXPIRATION + " for " + node, e); - } - return hasNotExpired; - } - /** * Checks an ack versus the contents of the dispatched list. * @@ -610,7 +588,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription { if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) { //increment number to dispatch numberToDispatch++; - node.getRegionDestination().messageExpired(context, this, node); + if (broker.isExpired(node)) { + node.getRegionDestination().messageExpired(context, this, node); + } continue; } dispatch(node); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index bcb1867f39..b32695c794 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -760,7 +760,9 @@ public class Queue extends BaseDestination implements Task, UsageListener { addAll(pagedInPendingDispatch, l, max, toExpire); for (MessageReference ref : toExpire) { pagedInPendingDispatch.remove(ref); - messageExpired(connectionContext, ref); + if (broker.isExpired(ref)) { + messageExpired(connectionContext, ref); + } } } toExpire.clear(); @@ -768,7 +770,13 @@ public class Queue extends BaseDestination implements Task, UsageListener { addAll(pagedInMessages.values(), l, max, toExpire); } for (MessageReference ref : toExpire) { - messageExpired(connectionContext, ref); + if (broker.isExpired(ref)) { + messageExpired(connectionContext, ref); + } else { + synchronized (pagedInMessages) { + pagedInMessages.remove(ref.getMessageId()); + } + } } if (l.size() < getMaxBrowsePageSize()) { @@ -805,7 +813,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { for (Iterator i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) { QueueMessageReference ref = i.next(); - if (broker.isExpired(ref)) { + if (ref.isExpired()) { toExpire.add(ref); } else if (l.contains(ref.getMessage()) == false) { l.add(ref.getMessage()); @@ -1224,6 +1232,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { // This sends the ack the the journal.. if (!ack.isInTransaction()) { acknowledge(context, sub, ack, reference); + getDestinationStatistics().getDequeues().increment(); dropMessage(reference); } else { try { @@ -1232,6 +1241,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { context.getTransaction().addSynchronization(new Synchronization() { public void afterCommit() throws Exception { + getDestinationStatistics().getDequeues().increment(); dropMessage(reference); wakeup(); } @@ -1264,11 +1274,10 @@ public class Queue extends BaseDestination implements Task, UsageListener { } public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) { - if (LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("message expired: " + reference); } broker.messageExpired(context, reference); - destinationStatistics.getDequeues().increment(); destinationStatistics.getExpired().increment(); try { removeMessage(context,subs,(QueueMessageReference)reference); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java index fbba6d0a25..f37a4d8488 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java @@ -45,6 +45,12 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner * @throws IOException */ protected void acknowledge(final ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException { + if (n.isExpired()) { + if (!broker.isExpired(n)) { + LOG.info("ignoring ack " + ack + ", for already expired message: " + n); + return; + } + } final Destination q = n.getRegionDestination(); final QueueMessageReference node = (QueueMessageReference)n; final Queue queue = (Queue)q; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index e859db5c04..97f3b0673a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -664,7 +664,30 @@ public class RegionBroker extends EmptyBroker { } public boolean isExpired(MessageReference messageReference) { - return messageReference.isExpired(); + boolean expired = false; + if (messageReference.isExpired()) { + try { + // prevent duplicate expiry processing + Message message = messageReference.getMessage(); + synchronized (message) { + expired = stampAsExpired(message); + } + } catch (IOException e) { + LOG.warn("unexpected exception on message expiry determination for: " + messageReference, e); + } + } + return expired; + } + + private boolean stampAsExpired(Message message) throws IOException { + boolean stamped=false; + if (message.getProperty(ORIGINAL_EXPIRATION) == null) { + long expiration=message.getExpiration(); + message.setExpiration(0); + message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration)); + stamped = true; + } + return stamped; } public void messageExpired(ConnectionContext context, MessageReference node) { @@ -679,7 +702,8 @@ public class RegionBroker extends EmptyBroker { try{ if(node!=null){ Message message=node.getMessage(); - if(message!=null&&node.getRegionDestination()!=null){ + stampAsExpired(message); + if(message!=null && node.getRegionDestination()!=null){ DeadLetterStrategy deadLetterStrategy=node .getRegionDestination().getDeadLetterStrategy(); if(deadLetterStrategy!=null){ @@ -688,10 +712,6 @@ public class RegionBroker extends EmptyBroker { // message may be inflight to other subscriptions so do not modify message = message.copy(); } - long expiration=message.getExpiration(); - message.setExpiration(0); - message.setProperty(ORIGINAL_EXPIRATION,new Long( - expiration)); if(!message.isPersistent()){ message.setPersistent(true); message.setProperty("originalDeliveryMode", diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java b/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java index 3f3810d329..15e9f3e13e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java @@ -611,14 +611,6 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess } } - public boolean isExpired() { - long expireTime = this.getExpiration(); - if (expireTime > 0 && System.currentTimeMillis() > expireTime) { - return true; - } - return false; - } - public Callback getAcknowledgeCallback() { return acknowledgeCallback; } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/Message.java b/activemq-core/src/main/java/org/apache/activemq/command/Message.java index c28ed170f9..e35b8c23b2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/Message.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/Message.java @@ -88,6 +88,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess private transient ActiveMQConnection connection; private transient org.apache.activemq.broker.region.Destination regionDestination; private transient MemoryUsage memoryUsage; + private transient boolean expired; private BrokerId[] brokerPath; private BrokerId[] cluster; @@ -338,6 +339,9 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess public void setExpiration(long expiration) { this.expiration = expiration; + if (this.expiration > 0) { + expired = false; + } } /** @@ -435,11 +439,13 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess } public boolean isExpired() { - long expireTime = getExpiration(); - if (expireTime > 0 && System.currentTimeMillis() > expireTime) { - return true; + if (!expired) { + long expireTime = getExpiration(); + if (expireTime > 0 && System.currentTimeMillis() > expireTime) { + expired = true; + } } - return false; + return expired; } public boolean isAdvisory() { diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java index 396aaa22da..0645973921 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java @@ -41,6 +41,7 @@ import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePoli import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.store.amq.AMQPersistenceAdapter; +import org.apache.activemq.util.Wait; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -107,7 +108,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport { public void run() { try { int i = 0; - while (i++ < 30000) { + while (i++ < 10000) { producer.send(session.createTextMessage("test")); } producer.close(); @@ -123,21 +124,41 @@ public class ExpiredMessagesTest extends CombinationTestSupport { producingThread.join(); session.close(); - Thread.sleep(2000); - - DestinationViewMBean view = createView(destination); - LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount() - + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount()); + final DestinationViewMBean view = createView(destination); - assertEquals("got what did not expire", received.get(), view.getDequeueCount() - view.getExpiredCount()); - - long expiry = System.currentTimeMillis() + 30000; - while (view.getInFlightCount() > 0 && System.currentTimeMillis() < expiry) { - Thread.sleep(500); - } - LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount() - + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount()); + // wait for all to inflight to expire + assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + return view.getInFlightCount() == 0; + } + })); assertEquals("Wrong inFlightCount: ", 0, view.getInFlightCount()); + + LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount() + + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount()); + + // wait for all sent to get delivered and expire + assertTrue("all sent messages expired ", Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + long oldEnqueues = view.getEnqueueCount(); + Thread.sleep(200); + LOG.info("Stats: received: " + received.get() + ", size= " + view.getQueueSize() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount() + + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount()); + return oldEnqueues == view.getEnqueueCount(); + } + }, 60*1000)); + + + LOG.info("Stats: received: " + received.get() + ", size= " + view.getQueueSize() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount() + + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount()); + + assertTrue("got at least what did not expire", received.get() >= view.getDequeueCount() - view.getExpiredCount()); + + assertTrue("all messages expired - queue size gone to zero " + view.getQueueSize(), Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + return view.getQueueSize() == 0; + } + })); } @@ -229,6 +250,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport { defaultPolicy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy()); } defaultPolicy.setExpireMessagesPeriod(expireMessagesPeriod); + defaultPolicy.setMaxExpirePageSize(1200); PolicyMap policyMap = new PolicyMap(); policyMap.setDefaultEntry(defaultPolicy); broker.setDestinationPolicy(policyMap); diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java index 8e9ed8be11..f9ab03c76f 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java @@ -39,6 +39,7 @@ import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.util.Wait; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -47,7 +48,6 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { private static final Log LOG = LogFactory.getLog(ExpiredMessagesWithNoConsumerTest.class); - private static final int expiryPeriod = 1000; BrokerService broker; Connection connection; @@ -81,8 +81,8 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { PolicyMap policyMap = new PolicyMap(); PolicyEntry defaultEntry = new PolicyEntry(); - defaultEntry.setExpireMessagesPeriod(expiryPeriod); - defaultEntry.setMaxExpirePageSize(200); + defaultEntry.setExpireMessagesPeriod(100); + defaultEntry.setMaxExpirePageSize(800); if (memoryLimit) { // so memory is not consumed by DLQ turn if off @@ -106,11 +106,11 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(destination); - producer.setTimeToLive(100); + producer.setTimeToLive(1000); connection.start(); final long sendCount = 2000; - Thread producingThread = new Thread("Producing Thread") { + final Thread producingThread = new Thread("Producing Thread") { public void run() { try { int i = 0; @@ -130,21 +130,27 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { producingThread.start(); - final long expiry = System.currentTimeMillis() + 20*1000; - while (producingThread.isAlive() && expiry > System.currentTimeMillis()) { - producingThread.join(1000); - } - - assertTrue("producer completed within time ", !producingThread.isAlive()); + assertTrue("producer completed within time", Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + producingThread.join(1000); + return !producingThread.isAlive(); + } + })); - Thread.sleep(3*expiryPeriod); - DestinationViewMBean view = createView(destination); - assertEquals("All sent have expired ", sendCount, view.getExpiredCount()); + final DestinationViewMBean view = createView(destination); + Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + return sendCount == view.getExpiredCount(); + } + }); + LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + + ", size= " + view.getQueueSize()); + assertEquals("All sent have expired", sendCount, view.getExpiredCount()); } - - - public void testExpiredMessagesWitVerySlowConsumer() throws Exception { + // first ack delivered after expiry + public void testExpiredMessagesWithVerySlowConsumer() throws Exception { createBroker(); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); connection = factory.createConnection(); @@ -153,7 +159,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { final int ttl = 4000; producer.setTimeToLive(ttl); - final long sendCount = 1001; + final long sendCount = 1500; final CountDownLatch receivedOneCondition = new CountDownLatch(1); final CountDownLatch waitCondition = new CountDownLatch(1); @@ -165,6 +171,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { LOG.info("Got my message: " + message); receivedOneCondition.countDown(); waitCondition.await(60, TimeUnit.SECONDS); + LOG.info("acking message: " + message); message.acknowledge(); } catch (Exception e) { e.printStackTrace(); @@ -176,7 +183,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { connection.start(); - Thread producingThread = new Thread("Producing Thread") { + final Thread producingThread = new Thread("Producing Thread") { public void run() { try { int i = 0; @@ -195,30 +202,46 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { }; producingThread.start(); + assertTrue("got one message", receivedOneCondition.await(20, TimeUnit.SECONDS)); - final long expiry = System.currentTimeMillis() + 20*1000; - while (producingThread.isAlive() && expiry > System.currentTimeMillis()) { - producingThread.join(1000); - } - - assertTrue("got one message", receivedOneCondition.await(10, TimeUnit.SECONDS)); - assertTrue("producer completed within time ", !producingThread.isAlive()); - - Thread.sleep(2 * Math.max(ttl, expiryPeriod)); - DestinationViewMBean view = createView(destination); + assertTrue("producer completed within time ", Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + producingThread.join(1000); + return !producingThread.isAlive(); + } + })); + + final DestinationViewMBean view = createView(destination); - assertEquals("all dispatched up to default prefetch ", 1000, view.getDispatchCount()); - assertEquals("All sent save one have expired ", sendCount, view.getExpiredCount()); + assertTrue("all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + return 1000 == view.getDispatchCount(); + } + })); + assertTrue("All sent have expired ", Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + return sendCount == view.getExpiredCount(); + } + })); + LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + + ", size= " + view.getQueueSize()); // let the ack happen waitCondition.countDown(); - - Thread.sleep(Math.max(ttl, expiryPeriod)); - - assertEquals("all sent save one have expired ", sendCount, view.getExpiredCount()); + Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + return 0 == view.getInFlightCount(); + } + }); + LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + + ", size= " + view.getQueueSize()); assertEquals("prefetch gets back to 0 ", 0, view.getInFlightCount()); + assertEquals("size gets back to 0 ", 0, view.getQueueSize()); + assertEquals("dequeues match sent/expired ", sendCount, view.getDequeueCount()); consumer.close(); LOG.info("done: " + getName()); diff --git a/activemq-core/src/test/java/org/apache/activemq/util/Wait.java b/activemq-core/src/test/java/org/apache/activemq/util/Wait.java new file mode 100644 index 0000000000..7403422b3c --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/util/Wait.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.util; + + +public class Wait { + + public static final long MAX_WAIT_MILLIS = 30*1000; + + public interface Condition { + boolean isSatisified() throws Exception; + } + + public static boolean waitFor(Condition condition) throws Exception { + return waitFor(condition, MAX_WAIT_MILLIS); + } + + public static boolean waitFor(final Condition condition, final long duration) throws Exception { + final long expiry = System.currentTimeMillis() + duration; + while (!condition.isSatisified() && System.currentTimeMillis() < expiry) { + Thread.sleep(1000); + } + return condition.isSatisified(); + } +}