mirror of https://github.com/apache/activemq.git
modify test to validate setting isDLQ flag via startup destination query options
This commit is contained in:
parent
35f30102a6
commit
18d05ba5e0
|
@ -41,6 +41,7 @@ import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
|
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.leveldb.LevelDBStore;
|
import org.apache.activemq.leveldb.LevelDBStore;
|
||||||
import org.apache.activemq.util.IOHelper;
|
import org.apache.activemq.util.IOHelper;
|
||||||
|
@ -78,7 +79,7 @@ public class AMQ6059Test {
|
||||||
@Test
|
@Test
|
||||||
public void testDLQRecovery() throws Exception {
|
public void testDLQRecovery() throws Exception {
|
||||||
|
|
||||||
sendMessage(new ActiveMQQueue("leveldbQueue"));
|
sendMessage(new ActiveMQQueue("QName"));
|
||||||
TimeUnit.SECONDS.sleep(3);
|
TimeUnit.SECONDS.sleep(3);
|
||||||
|
|
||||||
LOG.info("### Check for expired message moving to DLQ.");
|
LOG.info("### Check for expired message moving to DLQ.");
|
||||||
|
@ -101,7 +102,8 @@ public class AMQ6059Test {
|
||||||
}));
|
}));
|
||||||
|
|
||||||
verifyMessageIsRecovered(dlqQueue);
|
verifyMessageIsRecovered(dlqQueue);
|
||||||
restartBroker(broker);
|
restartBroker();
|
||||||
|
verifyIsDlq(dlqQueue);
|
||||||
verifyMessageIsRecovered(dlqQueue);
|
verifyMessageIsRecovered(dlqQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,24 +113,16 @@ public class AMQ6059Test {
|
||||||
|
|
||||||
private BrokerService createBrokerWithDLQ(boolean purge) throws Exception {
|
private BrokerService createBrokerWithDLQ(boolean purge) throws Exception {
|
||||||
BrokerService broker = new BrokerService();
|
BrokerService broker = new BrokerService();
|
||||||
|
ActiveMQQueue dlq = new ActiveMQQueue("ActiveMQ.DLQ?isDLQ=true");
|
||||||
|
|
||||||
File directory = new File("target/activemq-data/leveldb");
|
broker.setDestinations(new ActiveMQDestination[]{dlq});
|
||||||
if (purge) {
|
|
||||||
IOHelper.deleteChildren(directory);
|
|
||||||
}
|
|
||||||
|
|
||||||
LevelDBStore levelDBStore = new LevelDBStore();
|
|
||||||
levelDBStore.setDirectory(directory);
|
|
||||||
if (purge) {
|
|
||||||
levelDBStore.deleteAllMessages();
|
|
||||||
}
|
|
||||||
|
|
||||||
PolicyMap pMap = new PolicyMap();
|
PolicyMap pMap = new PolicyMap();
|
||||||
|
|
||||||
SharedDeadLetterStrategy sharedDLQStrategy = new SharedDeadLetterStrategy();
|
SharedDeadLetterStrategy sharedDLQStrategy = new SharedDeadLetterStrategy();
|
||||||
sharedDLQStrategy.setProcessNonPersistent(true);
|
sharedDLQStrategy.setProcessNonPersistent(true);
|
||||||
sharedDLQStrategy.setProcessExpired(true);
|
sharedDLQStrategy.setProcessExpired(true);
|
||||||
sharedDLQStrategy.setDeadLetterQueue(new ActiveMQQueue("ActiveMQ.DLQ"));
|
sharedDLQStrategy.setDeadLetterQueue(dlq);
|
||||||
sharedDLQStrategy.setExpiration(10000);
|
sharedDLQStrategy.setExpiration(10000);
|
||||||
|
|
||||||
PolicyEntry defaultPolicy = new PolicyEntry();
|
PolicyEntry defaultPolicy = new PolicyEntry();
|
||||||
|
@ -138,7 +132,6 @@ public class AMQ6059Test {
|
||||||
|
|
||||||
pMap.put(new ActiveMQQueue(">"), defaultPolicy);
|
pMap.put(new ActiveMQQueue(">"), defaultPolicy);
|
||||||
broker.setDestinationPolicy(pMap);
|
broker.setDestinationPolicy(pMap);
|
||||||
broker.setPersistenceAdapter(levelDBStore);
|
|
||||||
if (purge) {
|
if (purge) {
|
||||||
broker.setDeleteAllMessagesOnStartup(true);
|
broker.setDeleteAllMessagesOnStartup(true);
|
||||||
}
|
}
|
||||||
|
@ -146,7 +139,7 @@ public class AMQ6059Test {
|
||||||
return broker;
|
return broker;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void restartBroker(BrokerService broker) throws Exception {
|
private void restartBroker() throws Exception {
|
||||||
broker.stop();
|
broker.stop();
|
||||||
broker.waitUntilStopped();
|
broker.waitUntilStopped();
|
||||||
broker = createBrokerWithDLQ(false);
|
broker = createBrokerWithDLQ(false);
|
||||||
|
|
Loading…
Reference in New Issue