diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java index 120a4c62a3..b486c30255 100644 --- a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java @@ -24,6 +24,7 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class PolicyEntryTest extends RuntimeConfigTestSupport { @@ -44,6 +45,18 @@ public class PolicyEntryTest extends RuntimeConfigTestSupport { verifyQueueLimit("Before", 4194304); } + @Test + public void testModSendDuplicateFromStoreToDLQ() throws Exception { + final String brokerConfig = configurationSeed + "-policy-ml-broker"; + applyNewConfig(brokerConfig, configurationSeed + "-policy-sendDuplicateFromStoreToDLQ"); + startBroker(brokerConfig); + assertTrue("broker alive", brokerService.isStarted()); + + verifyBooleanField("AMQ.8397", "sendDuplicateFromStoreToDLQ", true); + applyNewConfig(brokerConfig, configurationSeed + "-policy-sendDuplicateFromStoreToDLQ-mod", SLEEP); + verifyBooleanField("AMQ.8397", "sendDuplicateFromStoreToDLQ", false); + } + @Test public void testAddNdMod() throws Exception { final String brokerConfig = configurationSeed + "-policy-ml-broker"; @@ -100,6 +113,26 @@ public class PolicyEntryTest extends RuntimeConfigTestSupport { verifyQueueLimit("queue.child.test2", 4194304); } + private void verifyBooleanField(String dest, String fieldName, boolean value) throws Exception { + ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(); + try { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createConsumer(session.createQueue(dest)); + + switch(fieldName) { + case "sendDuplicateFromStoreToDLQ": + assertEquals(value, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).isSendDuplicateFromStoreToDLQ()); + break; + default: + fail("Unsupported field specified: " + fieldName); + } + + } finally { + connection.close(); + } + } + private void verifyQueueLimit(String dest, int memoryLimit) throws Exception { ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(); try { diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-sendDuplicateFromStoreToDLQ-mod.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-sendDuplicateFromStoreToDLQ-mod.xml new file mode 100644 index 0000000000..0984792b23 --- /dev/null +++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-sendDuplicateFromStoreToDLQ-mod.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-sendDuplicateFromStoreToDLQ.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-sendDuplicateFromStoreToDLQ.xml new file mode 100644 index 0000000000..cbe6ccbfc0 --- /dev/null +++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-sendDuplicateFromStoreToDLQ.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SendDuplicateFromStoreToDLQTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SendDuplicateFromStoreToDLQTest.java new file mode 100644 index 0000000000..adfc88dd79 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/SendDuplicateFromStoreToDLQTest.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.File; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +/** + * This unit test is to test that setting the property "sendDuplicateFromStoreToDLQ" on + * PolicyEntry works correctly. + * + */ +public class SendDuplicateFromStoreToDLQTest { + BrokerService broker; + ConnectionFactory factory; + Connection connection; + Session session; + MessageProducer producer; + + @Before + public void setUp() throws Exception { + broker = new BrokerService(); + + File testDataDir = new File("target/activemq-data/AMQ-8397"); + broker.setDataDirectoryFile(testDataDir); + broker.setUseJmx(true); + broker.setDeleteAllMessagesOnStartup(true); + broker.getSystemUsage().getMemoryUsage().setLimit(1024l * 1024 * 64); + KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); + persistenceAdapter.setDirectory(new File(testDataDir, "kahadb")); + broker.setPersistenceAdapter(persistenceAdapter); + broker.addConnector("tcp://localhost:0"); + broker.start(); + factory = new ActiveMQConnectionFactory(broker.getTransportConnectors() + .get(0).getConnectUri().toString()); + connection = factory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + @After + public void tearDown() throws Exception { + session.close(); + connection.stop(); + connection.close(); + broker.deleteAllMessages(); + broker.stop(); + } + + @Test + public void testSendDuplicateFromStoreToDLQTrue() throws Exception { + applySendDuplicateFromStoreToDLQPolicy(true); + doProcessSendDuplicateFromStoreToDLQ(true); + } + + @Test + public void testSendDuplicateFromStoreToDLQFalse() throws Exception { + applySendDuplicateFromStoreToDLQPolicy(false); + doProcessSendDuplicateFromStoreToDLQ(false); + } + + protected void doProcessSendDuplicateFromStoreToDLQ(boolean enabled) throws Exception { + createQueue("AMQ.8397"); + org.apache.activemq.broker.region.Queue queue = (org.apache.activemq.broker.region.Queue)broker.getDestination(new ActiveMQQueue("AMQ.8397")); + assertEquals(Long.valueOf(0l), Long.valueOf(queue.getDestinationStatistics().getMessages().getCount())); + assertEquals(Long.valueOf(0l), Long.valueOf(queue.getDestinationStatistics().getDuplicateFromStore().getCount())); + + MessageConsumer messageConsumer = session.createConsumer(session.createQueue("AMQ.8397")); + producer.send(session.createTextMessage("Hello world!")); + + int loopCount=0; + int maxLoops=50; + boolean found=false; + Message recvMessage = null; + do { + recvMessage = messageConsumer.receive(200l); + if(recvMessage != null) { + found = true; + } + loopCount++; + } while(!found && loopCount < maxLoops); + + assertNotNull(recvMessage); + List queueSubscriptions = queue.getConsumers(); + assertNotNull(queueSubscriptions); + assertEquals(Integer.valueOf(1), Integer.valueOf(queueSubscriptions.size())); + queue.duplicateFromStore((org.apache.activemq.command.Message) recvMessage, queueSubscriptions.get(0)); + + org.apache.activemq.broker.region.Queue dlq = (org.apache.activemq.broker.region.Queue)broker.getDestination(new ActiveMQQueue("ActiveMQ.DLQ.Queue.AMQ.8397")); + + if(enabled) { + assertEquals(Long.valueOf(0l), Long.valueOf(queue.getDestinationStatistics().getMessages().getCount())); + assertEquals(Long.valueOf(1l), Long.valueOf(queue.getDestinationStatistics().getDuplicateFromStore().getCount())); + assertEquals(Long.valueOf(1l), Long.valueOf(dlq.getDestinationStatistics().getMessages().getCount())); + assertEquals(Long.valueOf(0l), Long.valueOf(dlq.getDestinationStatistics().getDuplicateFromStore().getCount())); + } else { + assertEquals(Long.valueOf(0l), Long.valueOf(queue.getDestinationStatistics().getMessages().getCount())); + assertEquals(Long.valueOf(1l), Long.valueOf(queue.getDestinationStatistics().getDuplicateFromStore().getCount())); + assertEquals(Long.valueOf(0l), Long.valueOf(dlq.getDestinationStatistics().getMessages().getCount())); + assertEquals(Long.valueOf(0l), Long.valueOf(dlq.getDestinationStatistics().getDuplicateFromStore().getCount())); + } + } + + private PolicyMap applySendDuplicateFromStoreToDLQPolicy(boolean enabled) { + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setSendDuplicateFromStoreToDLQ(enabled); + defaultEntry.setDeadLetterStrategy(new IndividualDeadLetterStrategy()); + policyMap.setDefaultEntry(defaultEntry); + broker.setDestinationPolicy(policyMap); + return policyMap; + } + + private void createQueue(String queueName) throws Exception { + Queue queue = session.createQueue(queueName); + producer = session.createProducer(queue); + } +}