From 10c6b2964fc716a10f22f5b37bbac46c3a0f4a7d Mon Sep 17 00:00:00 2001 From: James Strachan Date: Fri, 30 Dec 2005 15:36:19 +0000 Subject: [PATCH] added test cases to demonstrate shared and individual DLQ strategies; which highlight a bug in the rollback logic in the client git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@360108 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/PrefetchSubscription.java | 7 +- .../broker/region/policy/PolicyEntry.java | 6 ++ .../broker/policy/DeadLetterTest.java | 91 +++++++++++++++++++ .../broker/policy/DeadLetterTestSupport.java | 29 ++++-- .../policy/IndividualDeadLetterTest.java | 52 +++++++++++ 5 files changed, 176 insertions(+), 9 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index afed99c2e9..4c1ad1fdd0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -175,13 +175,14 @@ abstract public class PrefetchSubscription extends AbstractSubscription { Message message = node.getMessage(); if( message !=null ) { - // TODO is this meant to be == null? - if( message.getOriginalDestination()!=null ) + // TODO is this meant to be == null - it was != ? + if( message.getOriginalDestination()==null ) message.setOriginalDestination(message.getDestination()); ActiveMQDestination originalDestination = message.getOriginalDestination(); DeadLetterStrategy deadLetterStrategy = node.getRegionDestination().getDeadLetterStrategy(); - message.setDestination(deadLetterStrategy.getDeadLetterQueueFor(originalDestination)); + ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(originalDestination); + message.setDestination(deadLetterDestination); if( message.getOriginalTransactionId()!=null ) message.setOriginalTransactionId(message.getTransactionId()); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 23ea86aaa7..371172b04f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -41,12 +41,18 @@ public class PolicyEntry extends DestinationMapEntry { if (dispatchPolicy != null) { queue.setDispatchPolicy(dispatchPolicy); } + if (deadLetterStrategy != null) { + queue.setDeadLetterStrategy(deadLetterStrategy); + } } public void configure(Topic topic) { if (dispatchPolicy != null) { topic.setDispatchPolicy(dispatchPolicy); } + if (deadLetterStrategy != null) { + topic.setDeadLetterStrategy(deadLetterStrategy); + } if (subscriptionRecoveryPolicy != null) { topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy); } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java new file mode 100644 index 0000000000..e6b5d4306f --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java @@ -0,0 +1,91 @@ +/** + * + * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ +package org.apache.activemq.broker.policy; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.RedeliveryPolicy; + +import javax.jms.Destination; +import javax.jms.Message; + +/** + * + * @version $Revision$ + */ +public class DeadLetterTest extends DeadLetterTestSupport { + + private int rollbackCount; + + protected void doTest() throws Exception { + connection.start(); + + ActiveMQConnection amqConnection = (ActiveMQConnection) connection; + rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1; + System.out.println("Will redeliver messages: " + rollbackCount + " times"); + + makeConsumer(); + makeDlqConsumer(); + + sendMessages(); + + // now lets receive and rollback N times + for (int i = 0; i < messageCount; i++) { + consumeAndRollback(i); + } + + for (int i = 0; i < messageCount; i++) { + Message msg = dlqConsumer.receive(1000); + assertMessage(msg, i); + assertNotNull("Should be a DLQ message for loop: " + i, msg); + } + } + + protected void consumeAndRollback(int messageCounter) throws Exception { + for (int i = 0; i < rollbackCount; i++) { + Message message = consumer.receive(5000); + assertNotNull("No message received for message: " + messageCounter + " and rollback loop: " + i, message); + assertMessage(message, messageCounter); + + session.rollback(); + } + System.out.println("Rolled back: " + rollbackCount + " times"); + } + + protected void setUp() throws Exception { + transactedMode = true; + super.setUp(); + } + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + ActiveMQConnectionFactory answer = super.createConnectionFactory(); + RedeliveryPolicy policy = new RedeliveryPolicy(); + policy.setMaximumRedeliveries(3); + policy.setBackOffMultiplier((short) 1); + policy.setInitialRedeliveryDelay(10); + policy.setUseExponentialBackOff(false); + answer.setRedeliveryPolicy(policy); + return answer; + } + + protected Destination createDlqDestination() { + return new ActiveMQQueue("ActiveMQ.DLQ"); + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java index 0b37399f44..a8475c14b0 100755 --- a/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java @@ -18,9 +18,6 @@ package org.apache.activemq.broker.policy; import org.apache.activemq.TestSupport; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy; import javax.jms.Connection; import javax.jms.DeliveryMode; @@ -30,6 +27,7 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; +import javax.jms.TextMessage; import javax.jms.Topic; /** @@ -49,6 +47,8 @@ public abstract class DeadLetterTestSupport extends TestSupport { protected Destination dlqDestination; protected MessageConsumer dlqConsumer; protected BrokerService broker; + protected boolean transactedMode = false; + protected int acknowledgeMode = Session.CLIENT_ACKNOWLEDGE; protected void setUp() throws Exception { super.setUp(); @@ -57,7 +57,7 @@ public abstract class DeadLetterTestSupport extends TestSupport { connection = createConnection(); connection.setClientID(toString()); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session = connection.createSession(transactedMode, acknowledgeMode); connection.start(); } @@ -80,6 +80,7 @@ public abstract class DeadLetterTestSupport extends TestSupport { protected void makeConsumer() throws JMSException { Destination destination = getDestination(); + System.out.println("Consuming from: " + destination); if (durableSubscriber) { consumer = session.createDurableSubscriber((Topic) destination, destination.toString()); } @@ -96,17 +97,34 @@ public abstract class DeadLetterTestSupport extends TestSupport { } protected void sendMessages() throws JMSException { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(getDestination()); producer.setDeliveryMode(deliveryMode); producer.setTimeToLive(timeToLive); System.out.println("Sending " + messageCount + " messages to: " + getDestination()); for (int i = 0; i < messageCount; i++) { - Message message = session.createTextMessage("msg: " + i); + Message message = createMessage(session, i); producer.send(message); } } + protected TextMessage createMessage(Session session, int i) throws JMSException { + return session.createTextMessage(getMessageText(i)); + } + + protected String getMessageText(int i) { + return "message: " + i; + } + + protected void assertMessage(Message message, int i) throws Exception { + System.out.println("Received message: " + message); + assertNotNull("No message received for index: " + i, message); + assertTrue("Should be a TextMessage not: " + message, message instanceof TextMessage); + TextMessage textMessage = (TextMessage) message; + assertEquals("text of message: " + i, getMessageText(i), textMessage .getText()); + } + protected abstract Destination createDlqDestination(); public void testTransientTopicMessage() throws Exception { @@ -143,5 +161,4 @@ public abstract class DeadLetterTestSupport extends TestSupport { } return destination; } - } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java new file mode 100644 index 0000000000..4b5ae57ab7 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java @@ -0,0 +1,52 @@ +/** + * + * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ +package org.apache.activemq.broker.policy; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; + +import javax.jms.Destination; + +/** + * + * @version $Revision$ + */ +public class IndividualDeadLetterTest extends DeadLetterTest { + + + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + + PolicyEntry policy = new PolicyEntry(); + policy.setDeadLetterStrategy(new IndividualDeadLetterStrategy()); + + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + broker.setDestinationPolicy(pMap); + + return broker; + } + + protected Destination createDlqDestination() { + return new ActiveMQQueue("ActiveMQ.DLQ.Queue." + getClass().getName() + "." + getName()); + } +}