mirror of https://github.com/apache/activemq.git
resolve some issue with duplicate expiry due to concurrent execution with expiry task and dispatch/acks etc. some more tests for stats included - https://issues.apache.org/activemq/browse/AMQ-1112
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@793892 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8c6e5dccea
commit
ca242f4a34
|
@ -222,9 +222,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
// Don't remove the nodes until we are committed.
|
||||
if (!context.isInTransaction()) {
|
||||
dequeueCounter++;
|
||||
if (!this.getConsumerInfo().isBrowser()) {
|
||||
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
|
||||
}
|
||||
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
|
||||
removeList.add(node);
|
||||
} else {
|
||||
|
@ -238,7 +235,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
synchronized(dispatchLock) {
|
||||
dequeueCounter++;
|
||||
dispatched.remove(node);
|
||||
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
|
||||
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
|
||||
prefetchExtension--;
|
||||
}
|
||||
|
@ -287,7 +283,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
MessageId messageId = node.getMessageId();
|
||||
if (ack.getLastMessageId().equals(messageId)) {
|
||||
// this should never be within a transaction
|
||||
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
|
||||
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
|
||||
destination = node.getRegionDestination();
|
||||
acknowledge(context, ack, node);
|
||||
|
@ -303,16 +298,12 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
int index = 0;
|
||||
for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
|
||||
final MessageReference node = iter.next();
|
||||
if (hasNotAlreadyExpired(node)) {
|
||||
if (node.isExpired()) {
|
||||
if (node.isExpired()) {
|
||||
if (broker.isExpired(node)) {
|
||||
node.getRegionDestination().messageExpired(context, this, node);
|
||||
dispatched.remove(node);
|
||||
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
|
||||
}
|
||||
} else {
|
||||
// already expired
|
||||
dispatched.remove(node);
|
||||
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
|
||||
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
|
||||
}
|
||||
if (ack.getLastMessageId().equals(node.getMessageId())) {
|
||||
prefetchExtension = Math.max(prefetchExtension, index + 1);
|
||||
|
@ -373,9 +364,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
}
|
||||
if (inAckRange) {
|
||||
sendToDLQ(context, node);
|
||||
node.getRegionDestination().getDestinationStatistics()
|
||||
.getDequeues().increment();
|
||||
|
||||
node.getRegionDestination().getDestinationStatistics()
|
||||
.getInflight().increment();
|
||||
|
||||
|
@ -418,16 +406,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean hasNotAlreadyExpired(MessageReference node) {
|
||||
boolean hasNotExpired = true;
|
||||
try {
|
||||
hasNotExpired = node.getMessage().getProperty(RegionBroker.ORIGINAL_EXPIRATION) == null;
|
||||
} catch (IOException e) {
|
||||
LOG.warn("failed to determine value message property " + RegionBroker.ORIGINAL_EXPIRATION + " for " + node, e);
|
||||
}
|
||||
return hasNotExpired;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks an ack versus the contents of the dispatched list.
|
||||
*
|
||||
|
@ -610,7 +588,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
|
||||
//increment number to dispatch
|
||||
numberToDispatch++;
|
||||
node.getRegionDestination().messageExpired(context, this, node);
|
||||
if (broker.isExpired(node)) {
|
||||
node.getRegionDestination().messageExpired(context, this, node);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
dispatch(node);
|
||||
|
|
|
@ -760,7 +760,9 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
addAll(pagedInPendingDispatch, l, max, toExpire);
|
||||
for (MessageReference ref : toExpire) {
|
||||
pagedInPendingDispatch.remove(ref);
|
||||
messageExpired(connectionContext, ref);
|
||||
if (broker.isExpired(ref)) {
|
||||
messageExpired(connectionContext, ref);
|
||||
}
|
||||
}
|
||||
}
|
||||
toExpire.clear();
|
||||
|
@ -768,7 +770,13 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
addAll(pagedInMessages.values(), l, max, toExpire);
|
||||
}
|
||||
for (MessageReference ref : toExpire) {
|
||||
messageExpired(connectionContext, ref);
|
||||
if (broker.isExpired(ref)) {
|
||||
messageExpired(connectionContext, ref);
|
||||
} else {
|
||||
synchronized (pagedInMessages) {
|
||||
pagedInMessages.remove(ref.getMessageId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (l.size() < getMaxBrowsePageSize()) {
|
||||
|
@ -805,7 +813,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
for (Iterator<QueueMessageReference> i = refs.iterator(); i.hasNext()
|
||||
&& l.size() < getMaxBrowsePageSize();) {
|
||||
QueueMessageReference ref = i.next();
|
||||
if (broker.isExpired(ref)) {
|
||||
if (ref.isExpired()) {
|
||||
toExpire.add(ref);
|
||||
} else if (l.contains(ref.getMessage()) == false) {
|
||||
l.add(ref.getMessage());
|
||||
|
@ -1224,6 +1232,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
// This sends the ack the the journal..
|
||||
if (!ack.isInTransaction()) {
|
||||
acknowledge(context, sub, ack, reference);
|
||||
getDestinationStatistics().getDequeues().increment();
|
||||
dropMessage(reference);
|
||||
} else {
|
||||
try {
|
||||
|
@ -1232,6 +1241,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
context.getTransaction().addSynchronization(new Synchronization() {
|
||||
|
||||
public void afterCommit() throws Exception {
|
||||
getDestinationStatistics().getDequeues().increment();
|
||||
dropMessage(reference);
|
||||
wakeup();
|
||||
}
|
||||
|
@ -1264,11 +1274,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
}
|
||||
|
||||
public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("message expired: " + reference);
|
||||
}
|
||||
broker.messageExpired(context, reference);
|
||||
destinationStatistics.getDequeues().increment();
|
||||
destinationStatistics.getExpired().increment();
|
||||
try {
|
||||
removeMessage(context,subs,(QueueMessageReference)reference);
|
||||
|
|
|
@ -45,6 +45,12 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
|
|||
* @throws IOException
|
||||
*/
|
||||
protected void acknowledge(final ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException {
|
||||
if (n.isExpired()) {
|
||||
if (!broker.isExpired(n)) {
|
||||
LOG.info("ignoring ack " + ack + ", for already expired message: " + n);
|
||||
return;
|
||||
}
|
||||
}
|
||||
final Destination q = n.getRegionDestination();
|
||||
final QueueMessageReference node = (QueueMessageReference)n;
|
||||
final Queue queue = (Queue)q;
|
||||
|
|
|
@ -664,7 +664,30 @@ public class RegionBroker extends EmptyBroker {
|
|||
}
|
||||
|
||||
public boolean isExpired(MessageReference messageReference) {
|
||||
return messageReference.isExpired();
|
||||
boolean expired = false;
|
||||
if (messageReference.isExpired()) {
|
||||
try {
|
||||
// prevent duplicate expiry processing
|
||||
Message message = messageReference.getMessage();
|
||||
synchronized (message) {
|
||||
expired = stampAsExpired(message);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("unexpected exception on message expiry determination for: " + messageReference, e);
|
||||
}
|
||||
}
|
||||
return expired;
|
||||
}
|
||||
|
||||
private boolean stampAsExpired(Message message) throws IOException {
|
||||
boolean stamped=false;
|
||||
if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
|
||||
long expiration=message.getExpiration();
|
||||
message.setExpiration(0);
|
||||
message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration));
|
||||
stamped = true;
|
||||
}
|
||||
return stamped;
|
||||
}
|
||||
|
||||
public void messageExpired(ConnectionContext context, MessageReference node) {
|
||||
|
@ -679,7 +702,8 @@ public class RegionBroker extends EmptyBroker {
|
|||
try{
|
||||
if(node!=null){
|
||||
Message message=node.getMessage();
|
||||
if(message!=null&&node.getRegionDestination()!=null){
|
||||
stampAsExpired(message);
|
||||
if(message!=null && node.getRegionDestination()!=null){
|
||||
DeadLetterStrategy deadLetterStrategy=node
|
||||
.getRegionDestination().getDeadLetterStrategy();
|
||||
if(deadLetterStrategy!=null){
|
||||
|
@ -688,10 +712,6 @@ public class RegionBroker extends EmptyBroker {
|
|||
// message may be inflight to other subscriptions so do not modify
|
||||
message = message.copy();
|
||||
}
|
||||
long expiration=message.getExpiration();
|
||||
message.setExpiration(0);
|
||||
message.setProperty(ORIGINAL_EXPIRATION,new Long(
|
||||
expiration));
|
||||
if(!message.isPersistent()){
|
||||
message.setPersistent(true);
|
||||
message.setProperty("originalDeliveryMode",
|
||||
|
|
|
@ -611,14 +611,6 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess
|
|||
}
|
||||
}
|
||||
|
||||
public boolean isExpired() {
|
||||
long expireTime = this.getExpiration();
|
||||
if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public Callback getAcknowledgeCallback() {
|
||||
return acknowledgeCallback;
|
||||
}
|
||||
|
|
|
@ -88,6 +88,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
|
|||
private transient ActiveMQConnection connection;
|
||||
private transient org.apache.activemq.broker.region.Destination regionDestination;
|
||||
private transient MemoryUsage memoryUsage;
|
||||
private transient boolean expired;
|
||||
|
||||
private BrokerId[] brokerPath;
|
||||
private BrokerId[] cluster;
|
||||
|
@ -338,6 +339,9 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
|
|||
|
||||
public void setExpiration(long expiration) {
|
||||
this.expiration = expiration;
|
||||
if (this.expiration > 0) {
|
||||
expired = false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -435,11 +439,13 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
|
|||
}
|
||||
|
||||
public boolean isExpired() {
|
||||
long expireTime = getExpiration();
|
||||
if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
|
||||
return true;
|
||||
if (!expired) {
|
||||
long expireTime = getExpiration();
|
||||
if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
|
||||
expired = true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
return expired;
|
||||
}
|
||||
|
||||
public boolean isAdvisory() {
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePoli
|
|||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
@ -107,7 +108,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
|
|||
public void run() {
|
||||
try {
|
||||
int i = 0;
|
||||
while (i++ < 30000) {
|
||||
while (i++ < 10000) {
|
||||
producer.send(session.createTextMessage("test"));
|
||||
}
|
||||
producer.close();
|
||||
|
@ -123,21 +124,41 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
|
|||
producingThread.join();
|
||||
session.close();
|
||||
|
||||
Thread.sleep(2000);
|
||||
|
||||
DestinationViewMBean view = createView(destination);
|
||||
LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount()
|
||||
+ ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount());
|
||||
final DestinationViewMBean view = createView(destination);
|
||||
|
||||
assertEquals("got what did not expire", received.get(), view.getDequeueCount() - view.getExpiredCount());
|
||||
|
||||
long expiry = System.currentTimeMillis() + 30000;
|
||||
while (view.getInFlightCount() > 0 && System.currentTimeMillis() < expiry) {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount()
|
||||
+ ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount());
|
||||
// wait for all to inflight to expire
|
||||
assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() {
|
||||
public boolean isSatisified() throws Exception {
|
||||
return view.getInFlightCount() == 0;
|
||||
}
|
||||
}));
|
||||
assertEquals("Wrong inFlightCount: ", 0, view.getInFlightCount());
|
||||
|
||||
LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount()
|
||||
+ ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount());
|
||||
|
||||
// wait for all sent to get delivered and expire
|
||||
assertTrue("all sent messages expired ", Wait.waitFor(new Wait.Condition() {
|
||||
public boolean isSatisified() throws Exception {
|
||||
long oldEnqueues = view.getEnqueueCount();
|
||||
Thread.sleep(200);
|
||||
LOG.info("Stats: received: " + received.get() + ", size= " + view.getQueueSize() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount()
|
||||
+ ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount());
|
||||
return oldEnqueues == view.getEnqueueCount();
|
||||
}
|
||||
}, 60*1000));
|
||||
|
||||
|
||||
LOG.info("Stats: received: " + received.get() + ", size= " + view.getQueueSize() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount()
|
||||
+ ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount());
|
||||
|
||||
assertTrue("got at least what did not expire", received.get() >= view.getDequeueCount() - view.getExpiredCount());
|
||||
|
||||
assertTrue("all messages expired - queue size gone to zero " + view.getQueueSize(), Wait.waitFor(new Wait.Condition() {
|
||||
public boolean isSatisified() throws Exception {
|
||||
return view.getQueueSize() == 0;
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
|
||||
|
@ -229,6 +250,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
|
|||
defaultPolicy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
|
||||
}
|
||||
defaultPolicy.setExpireMessagesPeriod(expireMessagesPeriod);
|
||||
defaultPolicy.setMaxExpirePageSize(1200);
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
policyMap.setDefaultEntry(defaultPolicy);
|
||||
broker.setDestinationPolicy(policyMap);
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
|
|||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
@ -47,7 +48,6 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(ExpiredMessagesWithNoConsumerTest.class);
|
||||
|
||||
private static final int expiryPeriod = 1000;
|
||||
|
||||
BrokerService broker;
|
||||
Connection connection;
|
||||
|
@ -81,8 +81,8 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
|
|||
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
PolicyEntry defaultEntry = new PolicyEntry();
|
||||
defaultEntry.setExpireMessagesPeriod(expiryPeriod);
|
||||
defaultEntry.setMaxExpirePageSize(200);
|
||||
defaultEntry.setExpireMessagesPeriod(100);
|
||||
defaultEntry.setMaxExpirePageSize(800);
|
||||
|
||||
if (memoryLimit) {
|
||||
// so memory is not consumed by DLQ turn if off
|
||||
|
@ -106,11 +106,11 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
|
|||
connection = factory.createConnection();
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
producer = session.createProducer(destination);
|
||||
producer.setTimeToLive(100);
|
||||
producer.setTimeToLive(1000);
|
||||
connection.start();
|
||||
final long sendCount = 2000;
|
||||
|
||||
Thread producingThread = new Thread("Producing Thread") {
|
||||
final Thread producingThread = new Thread("Producing Thread") {
|
||||
public void run() {
|
||||
try {
|
||||
int i = 0;
|
||||
|
@ -130,21 +130,27 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
|
|||
|
||||
producingThread.start();
|
||||
|
||||
final long expiry = System.currentTimeMillis() + 20*1000;
|
||||
while (producingThread.isAlive() && expiry > System.currentTimeMillis()) {
|
||||
producingThread.join(1000);
|
||||
}
|
||||
|
||||
assertTrue("producer completed within time ", !producingThread.isAlive());
|
||||
assertTrue("producer completed within time", Wait.waitFor(new Wait.Condition() {
|
||||
public boolean isSatisified() throws Exception {
|
||||
producingThread.join(1000);
|
||||
return !producingThread.isAlive();
|
||||
}
|
||||
}));
|
||||
|
||||
Thread.sleep(3*expiryPeriod);
|
||||
DestinationViewMBean view = createView(destination);
|
||||
assertEquals("All sent have expired ", sendCount, view.getExpiredCount());
|
||||
final DestinationViewMBean view = createView(destination);
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
public boolean isSatisified() throws Exception {
|
||||
return sendCount == view.getExpiredCount();
|
||||
}
|
||||
});
|
||||
LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
|
||||
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
|
||||
+ ", size= " + view.getQueueSize());
|
||||
assertEquals("All sent have expired", sendCount, view.getExpiredCount());
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void testExpiredMessagesWitVerySlowConsumer() throws Exception {
|
||||
// first ack delivered after expiry
|
||||
public void testExpiredMessagesWithVerySlowConsumer() throws Exception {
|
||||
createBroker();
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
connection = factory.createConnection();
|
||||
|
@ -153,7 +159,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
|
|||
final int ttl = 4000;
|
||||
producer.setTimeToLive(ttl);
|
||||
|
||||
final long sendCount = 1001;
|
||||
final long sendCount = 1500;
|
||||
final CountDownLatch receivedOneCondition = new CountDownLatch(1);
|
||||
final CountDownLatch waitCondition = new CountDownLatch(1);
|
||||
|
||||
|
@ -165,6 +171,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
|
|||
LOG.info("Got my message: " + message);
|
||||
receivedOneCondition.countDown();
|
||||
waitCondition.await(60, TimeUnit.SECONDS);
|
||||
LOG.info("acking message: " + message);
|
||||
message.acknowledge();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
|
@ -176,7 +183,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
|
|||
connection.start();
|
||||
|
||||
|
||||
Thread producingThread = new Thread("Producing Thread") {
|
||||
final Thread producingThread = new Thread("Producing Thread") {
|
||||
public void run() {
|
||||
try {
|
||||
int i = 0;
|
||||
|
@ -195,30 +202,46 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
|
|||
};
|
||||
|
||||
producingThread.start();
|
||||
assertTrue("got one message", receivedOneCondition.await(20, TimeUnit.SECONDS));
|
||||
|
||||
final long expiry = System.currentTimeMillis() + 20*1000;
|
||||
while (producingThread.isAlive() && expiry > System.currentTimeMillis()) {
|
||||
producingThread.join(1000);
|
||||
}
|
||||
|
||||
assertTrue("got one message", receivedOneCondition.await(10, TimeUnit.SECONDS));
|
||||
assertTrue("producer completed within time ", !producingThread.isAlive());
|
||||
|
||||
Thread.sleep(2 * Math.max(ttl, expiryPeriod));
|
||||
DestinationViewMBean view = createView(destination);
|
||||
assertTrue("producer completed within time ", Wait.waitFor(new Wait.Condition() {
|
||||
public boolean isSatisified() throws Exception {
|
||||
producingThread.join(1000);
|
||||
return !producingThread.isAlive();
|
||||
}
|
||||
}));
|
||||
|
||||
final DestinationViewMBean view = createView(destination);
|
||||
|
||||
assertEquals("all dispatched up to default prefetch ", 1000, view.getDispatchCount());
|
||||
assertEquals("All sent save one have expired ", sendCount, view.getExpiredCount());
|
||||
assertTrue("all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() {
|
||||
public boolean isSatisified() throws Exception {
|
||||
return 1000 == view.getDispatchCount();
|
||||
}
|
||||
}));
|
||||
assertTrue("All sent have expired ", Wait.waitFor(new Wait.Condition() {
|
||||
public boolean isSatisified() throws Exception {
|
||||
return sendCount == view.getExpiredCount();
|
||||
}
|
||||
}));
|
||||
|
||||
LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
|
||||
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
|
||||
+ ", size= " + view.getQueueSize());
|
||||
|
||||
// let the ack happen
|
||||
waitCondition.countDown();
|
||||
|
||||
Thread.sleep(Math.max(ttl, expiryPeriod));
|
||||
|
||||
assertEquals("all sent save one have expired ", sendCount, view.getExpiredCount());
|
||||
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
public boolean isSatisified() throws Exception {
|
||||
return 0 == view.getInFlightCount();
|
||||
}
|
||||
});
|
||||
LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
|
||||
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
|
||||
+ ", size= " + view.getQueueSize());
|
||||
assertEquals("prefetch gets back to 0 ", 0, view.getInFlightCount());
|
||||
assertEquals("size gets back to 0 ", 0, view.getQueueSize());
|
||||
assertEquals("dequeues match sent/expired ", sendCount, view.getDequeueCount());
|
||||
|
||||
consumer.close();
|
||||
LOG.info("done: " + getName());
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.util;
|
||||
|
||||
|
||||
public class Wait {
|
||||
|
||||
public static final long MAX_WAIT_MILLIS = 30*1000;
|
||||
|
||||
public interface Condition {
|
||||
boolean isSatisified() throws Exception;
|
||||
}
|
||||
|
||||
public static boolean waitFor(Condition condition) throws Exception {
|
||||
return waitFor(condition, MAX_WAIT_MILLIS);
|
||||
}
|
||||
|
||||
public static boolean waitFor(final Condition condition, final long duration) throws Exception {
|
||||
final long expiry = System.currentTimeMillis() + duration;
|
||||
while (!condition.isSatisified() && System.currentTimeMillis() < expiry) {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
return condition.isSatisified();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue