From 27f7cab3e8bebad9a6d2c96ebb81482e2a56dd69 Mon Sep 17 00:00:00 2001 From: James Strachan Date: Thu, 29 Dec 2005 15:17:19 +0000 Subject: [PATCH] added test case to show AMQ-458 working git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@359819 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/advisory/AdvisorySupport.java | 8 +- .../apache/activemq/broker/region/Topic.java | 4 +- .../broker/region/policy/PolicyEntry.java | 2 +- .../java/org/apache/activemq/TestSupport.java | 4 + .../broker/policy/DeadLetterTestSupport.java | 147 ++++++++++++++++++ .../policy/NoConsumerDeadLetterTest.java | 73 +++++++++ 6 files changed, 231 insertions(+), 7 deletions(-) create mode 100755 activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java index 215149ac88..bc5af9d9fd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java @@ -51,22 +51,22 @@ public class AdvisorySupport { } public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) { - String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX+destination.getQualifiedName(); + String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX+destination.getPhysicalName(); return new ActiveMQTopic(name); } public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(ActiveMQDestination destination) { - String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX+destination.getQualifiedName(); + String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX+destination.getPhysicalName(); return new ActiveMQTopic(name); } public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(ActiveMQDestination destination) { - String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX+destination.getQualifiedName(); + String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX+destination.getPhysicalName(); return new ActiveMQTopic(name); } public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(ActiveMQDestination destination) { - String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX+destination.getQualifiedName(); + String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX+destination.getPhysicalName(); return new ActiveMQTopic(name); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index fd58436dbe..0643578572 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -59,7 +59,7 @@ public class Topic implements Destination { private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy(); - private boolean sendAdvisoryIfNoConsumers = true; + private boolean sendAdvisoryIfNoConsumers; public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) { @@ -320,7 +320,7 @@ public class Topic implements Destination { // letter queue ActiveMQDestination originalDestination = message.getDestination(); if (!AdvisorySupport.isAdvisoryTopic(originalDestination)) { - ActiveMQTopic advisoryTopic = AdvisorySupport.getExpiredTopicMessageAdvisoryTopic(originalDestination); + ActiveMQTopic advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(originalDestination); message.setDestination(advisoryTopic); context.getBroker().send(context, message); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 1efbadc3fe..3f71a39efb 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -34,7 +34,7 @@ public class PolicyEntry extends DestinationMapEntry { private DispatchPolicy dispatchPolicy; private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; private RedeliveryPolicy redeliveryPolicy; - private boolean sendAdvisoryIfNoConsumers = true; + private boolean sendAdvisoryIfNoConsumers; public void configure(Queue queue) { if (dispatchPolicy != null) { diff --git a/activemq-core/src/test/java/org/apache/activemq/TestSupport.java b/activemq-core/src/test/java/org/apache/activemq/TestSupport.java index f7070897b4..d816fad143 100755 --- a/activemq-core/src/test/java/org/apache/activemq/TestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/TestSupport.java @@ -66,6 +66,10 @@ public class TestSupport extends TestCase { } } + protected Destination createDestination() { + return createDestination(getClass().getName() + "." + getName()); + } + /** * @param messsage * @param firstSet diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java new file mode 100755 index 0000000000..0b37399f44 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java @@ -0,0 +1,147 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; + +/** + * @version $Revision$ + */ +public abstract class DeadLetterTestSupport extends TestSupport { + + protected int messageCount = 10; + protected long timeToLive = 250; + protected Connection connection; + protected Session session; + protected MessageConsumer consumer; + protected MessageProducer producer; + private Destination destination; + protected int deliveryMode = DeliveryMode.PERSISTENT; + protected boolean durableSubscriber = false; + protected Destination dlqDestination; + protected MessageConsumer dlqConsumer; + protected BrokerService broker; + + protected void setUp() throws Exception { + super.setUp(); + broker = createBroker(); + broker.start(); + connection = createConnection(); + connection.setClientID(toString()); + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + } + + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + if (broker != null) { + broker.stop(); + } + } + + protected abstract void doTest() throws Exception; + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + return broker; + } + + protected void makeConsumer() throws JMSException { + Destination destination = getDestination(); + if (durableSubscriber) { + consumer = session.createDurableSubscriber((Topic) destination, destination.toString()); + } + else { + consumer = session.createConsumer(destination); + } + } + + protected void makeDlqConsumer() throws JMSException { + dlqDestination = createDlqDestination(); + + System.out.println("Consuming from dead letter on: " + dlqDestination); + dlqConsumer = session.createConsumer(dlqDestination); + } + + protected void sendMessages() throws JMSException { + producer = session.createProducer(getDestination()); + producer.setDeliveryMode(deliveryMode); + producer.setTimeToLive(timeToLive); + + System.out.println("Sending " + messageCount + " messages to: " + getDestination()); + for (int i = 0; i < messageCount; i++) { + Message message = session.createTextMessage("msg: " + i); + producer.send(message); + } + } + + protected abstract Destination createDlqDestination(); + + public void testTransientTopicMessage() throws Exception { + super.topic = true; + deliveryMode = DeliveryMode.NON_PERSISTENT; + durableSubscriber = true; + doTest(); + } + + public void testDurableTopicMessage() throws Exception { + super.topic = true; + deliveryMode = DeliveryMode.PERSISTENT; + durableSubscriber = true; + doTest(); + } + + public void testTransientQueueMessage() throws Exception { + super.topic = false; + deliveryMode = DeliveryMode.NON_PERSISTENT; + durableSubscriber = false; + doTest(); + } + + public void testDurableQueueMessage() throws Exception { + super.topic = false; + deliveryMode = DeliveryMode.PERSISTENT; + durableSubscriber = false; + doTest(); + } + + public Destination getDestination() { + if (destination == null) { + destination = createDestination(); + } + return destination; + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java new file mode 100644 index 0000000000..050585ef5f --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java @@ -0,0 +1,73 @@ +/** + * + * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ +package org.apache.activemq.broker.policy; + +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; + +import javax.jms.Destination; +import javax.jms.Message; + +/** + * + * @version $Revision$ + */ +public class NoConsumerDeadLetterTest extends DeadLetterTestSupport { + + // lets disable the inapplicable tests + public void testDurableQueueMessage() throws Exception { + } + + public void testDurableTopicMessage() throws Exception { + } + + public void testTransientQueueMessage() throws Exception { + } + + protected void doTest() throws Exception { + makeDlqConsumer(); + sendMessages(); + + for (int i =0; i < messageCount; i++){ + Message msg = dlqConsumer.receive(1000); + assertNotNull("Should be a message for loop: " + i, msg); + } + } + + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + + PolicyEntry policy = new PolicyEntry(); + policy.setSendAdvisoryIfNoConsumers(true); + + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + broker.setDestinationPolicy(pMap); + + return broker; + } + + protected Destination createDlqDestination() { + return AdvisorySupport.getNoTopicConsumersAdvisoryTopic((ActiveMQDestination) getDestination()); + } + +}