mirror of https://github.com/apache/activemq.git
NO-JIRA: Add a delay to fail in case an async operation has not fully
created the DLQ yet.
This commit is contained in:
parent
4c1a6b4c2b
commit
8be1486508
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
|
@ -37,13 +37,12 @@ import org.apache.activemq.broker.region.RegionBroker;
|
|||
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public abstract class DeadLetterTestSupport extends TestSupport {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DeadLetterTestSupport.class);
|
||||
|
||||
protected int messageCount = 10;
|
||||
|
@ -62,6 +61,7 @@ public abstract class DeadLetterTestSupport extends TestSupport {
|
|||
protected int acknowledgeMode = Session.CLIENT_ACKNOWLEDGE;
|
||||
protected Destination destination;
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
super.setUp();
|
||||
broker = createBroker();
|
||||
|
@ -77,6 +77,7 @@ public abstract class DeadLetterTestSupport extends TestSupport {
|
|||
return toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void tearDown() throws Exception {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
|
@ -118,7 +119,7 @@ public abstract class DeadLetterTestSupport extends TestSupport {
|
|||
LOG.info("Consuming from dead letter on: " + dlqDestination);
|
||||
dlqConsumer = session.createConsumer(dlqDestination);
|
||||
}
|
||||
|
||||
|
||||
protected void makeDlqBrowser() throws Exception {
|
||||
dlqDestination = createDlqDestination();
|
||||
|
||||
|
@ -127,9 +128,20 @@ public abstract class DeadLetterTestSupport extends TestSupport {
|
|||
verifyIsDlq((Queue) dlqDestination);
|
||||
}
|
||||
|
||||
protected void verifyIsDlq(Queue dlqQ) throws Exception {
|
||||
final QueueViewMBean queueViewMBean = getProxyToQueue(dlqQ.getQueueName());
|
||||
assertTrue("is dlq", queueViewMBean.isDLQ());
|
||||
protected void verifyIsDlq(final Queue dlqQ) throws Exception {
|
||||
assertTrue("Need to verify a DLQ exists: " + dlqQ.getQueueName(), Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
QueueViewMBean dlqView = null;
|
||||
try {
|
||||
dlqView = getProxyToQueue(dlqQ.getQueueName());
|
||||
} catch (Throwable error) {
|
||||
}
|
||||
|
||||
return dlqView != null ? dlqView.isDLQ() : false;
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
protected void sendMessages() throws JMSException {
|
||||
|
@ -182,9 +194,9 @@ public abstract class DeadLetterTestSupport extends TestSupport {
|
|||
deliveryMode = DeliveryMode.NON_PERSISTENT;
|
||||
durableSubscriber = false;
|
||||
doTest();
|
||||
validateConsumerPrefetch(this.getDestinationString(), 0);
|
||||
validateConsumerPrefetch(this.getDestinationString(), 0);
|
||||
}
|
||||
|
||||
|
||||
public void testDurableQueueMessage() throws Exception {
|
||||
super.topic = false;
|
||||
deliveryMode = DeliveryMode.PERSISTENT;
|
||||
|
@ -199,7 +211,7 @@ public abstract class DeadLetterTestSupport extends TestSupport {
|
|||
}
|
||||
return destination;
|
||||
}
|
||||
|
||||
|
||||
private void validateConsumerPrefetch(String destination, long expectedCount) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
|
@ -210,8 +222,8 @@ public abstract class DeadLetterTestSupport extends TestSupport {
|
|||
if (dest.getName().equals(destination)) {
|
||||
DestinationStatistics stats = dest.getDestinationStatistics();
|
||||
LOG.info(">>>> inflight for : " + dest.getName() + ": " + stats.getInflight().getCount());
|
||||
assertEquals("inflight for: " + dest.getName() + ": " + stats.getInflight().getCount() + " matches",
|
||||
expectedCount, stats.getInflight().getCount());
|
||||
assertEquals("inflight for: " + dest.getName() + ": " + stats.getInflight().getCount() + " matches",
|
||||
expectedCount, stats.getInflight().getCount());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
|
@ -35,10 +35,8 @@ import org.apache.activemq.command.ActiveMQTopic;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class IndividualDeadLetterTest extends DeadLetterTest {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(IndividualDeadLetterTest.class);
|
||||
|
||||
@Override
|
||||
|
@ -99,7 +97,6 @@ public class IndividualDeadLetterTest extends DeadLetterTest {
|
|||
Queue testQueue = new ActiveMQQueue("ActiveMQ.DLQ.Queue.ActiveMQ.DLQ.Queue." + getClass().getName() + "." + getName());
|
||||
MessageConsumer testConsumer = session.createConsumer(testQueue);
|
||||
assertNull("The message shouldn't be sent to another DLQ", testConsumer.receive(1000));
|
||||
|
||||
}
|
||||
|
||||
protected void browseDlq() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue