added test cases to demonstrate shared and individual DLQ strategies; which highlight a bug in the rollback logic in the client

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@360108 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2005-12-30 15:36:19 +00:00
parent ffb9d2aabb
commit 10c6b2964f
5 changed files with 176 additions and 9 deletions

View File

@ -175,13 +175,14 @@ abstract public class PrefetchSubscription extends AbstractSubscription {
Message message = node.getMessage(); Message message = node.getMessage();
if( message !=null ) { if( message !=null ) {
// TODO is this meant to be == null? // TODO is this meant to be == null - it was != ?
if( message.getOriginalDestination()!=null ) if( message.getOriginalDestination()==null )
message.setOriginalDestination(message.getDestination()); message.setOriginalDestination(message.getDestination());
ActiveMQDestination originalDestination = message.getOriginalDestination(); ActiveMQDestination originalDestination = message.getOriginalDestination();
DeadLetterStrategy deadLetterStrategy = node.getRegionDestination().getDeadLetterStrategy(); DeadLetterStrategy deadLetterStrategy = node.getRegionDestination().getDeadLetterStrategy();
message.setDestination(deadLetterStrategy.getDeadLetterQueueFor(originalDestination)); ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(originalDestination);
message.setDestination(deadLetterDestination);
if( message.getOriginalTransactionId()!=null ) if( message.getOriginalTransactionId()!=null )
message.setOriginalTransactionId(message.getTransactionId()); message.setOriginalTransactionId(message.getTransactionId());

View File

@ -41,12 +41,18 @@ public class PolicyEntry extends DestinationMapEntry {
if (dispatchPolicy != null) { if (dispatchPolicy != null) {
queue.setDispatchPolicy(dispatchPolicy); queue.setDispatchPolicy(dispatchPolicy);
} }
if (deadLetterStrategy != null) {
queue.setDeadLetterStrategy(deadLetterStrategy);
}
} }
public void configure(Topic topic) { public void configure(Topic topic) {
if (dispatchPolicy != null) { if (dispatchPolicy != null) {
topic.setDispatchPolicy(dispatchPolicy); topic.setDispatchPolicy(dispatchPolicy);
} }
if (deadLetterStrategy != null) {
topic.setDeadLetterStrategy(deadLetterStrategy);
}
if (subscriptionRecoveryPolicy != null) { if (subscriptionRecoveryPolicy != null) {
topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy); topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy);
} }

View File

@ -0,0 +1,91 @@
/**
*
* Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activemq.broker.policy;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.RedeliveryPolicy;
import javax.jms.Destination;
import javax.jms.Message;
/**
*
* @version $Revision$
*/
public class DeadLetterTest extends DeadLetterTestSupport {
private int rollbackCount;
protected void doTest() throws Exception {
connection.start();
ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
System.out.println("Will redeliver messages: " + rollbackCount + " times");
makeConsumer();
makeDlqConsumer();
sendMessages();
// now lets receive and rollback N times
for (int i = 0; i < messageCount; i++) {
consumeAndRollback(i);
}
for (int i = 0; i < messageCount; i++) {
Message msg = dlqConsumer.receive(1000);
assertMessage(msg, i);
assertNotNull("Should be a DLQ message for loop: " + i, msg);
}
}
protected void consumeAndRollback(int messageCounter) throws Exception {
for (int i = 0; i < rollbackCount; i++) {
Message message = consumer.receive(5000);
assertNotNull("No message received for message: " + messageCounter + " and rollback loop: " + i, message);
assertMessage(message, messageCounter);
session.rollback();
}
System.out.println("Rolled back: " + rollbackCount + " times");
}
protected void setUp() throws Exception {
transactedMode = true;
super.setUp();
}
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
ActiveMQConnectionFactory answer = super.createConnectionFactory();
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setMaximumRedeliveries(3);
policy.setBackOffMultiplier((short) 1);
policy.setInitialRedeliveryDelay(10);
policy.setUseExponentialBackOff(false);
answer.setRedeliveryPolicy(policy);
return answer;
}
protected Destination createDlqDestination() {
return new ActiveMQQueue("ActiveMQ.DLQ");
}
}

View File

@ -18,9 +18,6 @@ package org.apache.activemq.broker.policy;
import org.apache.activemq.TestSupport; import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.DeliveryMode; import javax.jms.DeliveryMode;
@ -30,6 +27,7 @@ import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic; import javax.jms.Topic;
/** /**
@ -49,6 +47,8 @@ public abstract class DeadLetterTestSupport extends TestSupport {
protected Destination dlqDestination; protected Destination dlqDestination;
protected MessageConsumer dlqConsumer; protected MessageConsumer dlqConsumer;
protected BrokerService broker; protected BrokerService broker;
protected boolean transactedMode = false;
protected int acknowledgeMode = Session.CLIENT_ACKNOWLEDGE;
protected void setUp() throws Exception { protected void setUp() throws Exception {
super.setUp(); super.setUp();
@ -57,7 +57,7 @@ public abstract class DeadLetterTestSupport extends TestSupport {
connection = createConnection(); connection = createConnection();
connection.setClientID(toString()); connection.setClientID(toString());
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); session = connection.createSession(transactedMode, acknowledgeMode);
connection.start(); connection.start();
} }
@ -80,6 +80,7 @@ public abstract class DeadLetterTestSupport extends TestSupport {
protected void makeConsumer() throws JMSException { protected void makeConsumer() throws JMSException {
Destination destination = getDestination(); Destination destination = getDestination();
System.out.println("Consuming from: " + destination);
if (durableSubscriber) { if (durableSubscriber) {
consumer = session.createDurableSubscriber((Topic) destination, destination.toString()); consumer = session.createDurableSubscriber((Topic) destination, destination.toString());
} }
@ -96,17 +97,34 @@ public abstract class DeadLetterTestSupport extends TestSupport {
} }
protected void sendMessages() throws JMSException { protected void sendMessages() throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(getDestination()); producer = session.createProducer(getDestination());
producer.setDeliveryMode(deliveryMode); producer.setDeliveryMode(deliveryMode);
producer.setTimeToLive(timeToLive); producer.setTimeToLive(timeToLive);
System.out.println("Sending " + messageCount + " messages to: " + getDestination()); System.out.println("Sending " + messageCount + " messages to: " + getDestination());
for (int i = 0; i < messageCount; i++) { for (int i = 0; i < messageCount; i++) {
Message message = session.createTextMessage("msg: " + i); Message message = createMessage(session, i);
producer.send(message); producer.send(message);
} }
} }
protected TextMessage createMessage(Session session, int i) throws JMSException {
return session.createTextMessage(getMessageText(i));
}
protected String getMessageText(int i) {
return "message: " + i;
}
protected void assertMessage(Message message, int i) throws Exception {
System.out.println("Received message: " + message);
assertNotNull("No message received for index: " + i, message);
assertTrue("Should be a TextMessage not: " + message, message instanceof TextMessage);
TextMessage textMessage = (TextMessage) message;
assertEquals("text of message: " + i, getMessageText(i), textMessage .getText());
}
protected abstract Destination createDlqDestination(); protected abstract Destination createDlqDestination();
public void testTransientTopicMessage() throws Exception { public void testTransientTopicMessage() throws Exception {
@ -143,5 +161,4 @@ public abstract class DeadLetterTestSupport extends TestSupport {
} }
return destination; return destination;
} }
} }

View File

@ -0,0 +1,52 @@
/**
*
* Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activemq.broker.policy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import javax.jms.Destination;
/**
*
* @version $Revision$
*/
public class IndividualDeadLetterTest extends DeadLetterTest {
protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();
PolicyEntry policy = new PolicyEntry();
policy.setDeadLetterStrategy(new IndividualDeadLetterStrategy());
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
broker.setDestinationPolicy(pMap);
return broker;
}
protected Destination createDlqDestination() {
return new ActiveMQQueue("ActiveMQ.DLQ.Queue." + getClass().getName() + "." + getName());
}
}