mirror of https://github.com/apache/activemq.git
added test case to show AMQ-458 working
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@359819 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c2408b8997
commit
27f7cab3e8
|
@ -51,22 +51,22 @@ public class AdvisorySupport {
|
|||
}
|
||||
|
||||
public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) {
|
||||
String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX+destination.getQualifiedName();
|
||||
String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX+destination.getPhysicalName();
|
||||
return new ActiveMQTopic(name);
|
||||
}
|
||||
|
||||
public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(ActiveMQDestination destination) {
|
||||
String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX+destination.getQualifiedName();
|
||||
String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX+destination.getPhysicalName();
|
||||
return new ActiveMQTopic(name);
|
||||
}
|
||||
|
||||
public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(ActiveMQDestination destination) {
|
||||
String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX+destination.getQualifiedName();
|
||||
String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX+destination.getPhysicalName();
|
||||
return new ActiveMQTopic(name);
|
||||
}
|
||||
|
||||
public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(ActiveMQDestination destination) {
|
||||
String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX+destination.getQualifiedName();
|
||||
String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX+destination.getPhysicalName();
|
||||
return new ActiveMQTopic(name);
|
||||
}
|
||||
|
||||
|
|
|
@ -59,7 +59,7 @@ public class Topic implements Destination {
|
|||
|
||||
private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
|
||||
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
|
||||
private boolean sendAdvisoryIfNoConsumers = true;
|
||||
private boolean sendAdvisoryIfNoConsumers;
|
||||
|
||||
public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats,
|
||||
TaskRunnerFactory taskFactory) {
|
||||
|
@ -320,7 +320,7 @@ public class Topic implements Destination {
|
|||
// letter queue
|
||||
ActiveMQDestination originalDestination = message.getDestination();
|
||||
if (!AdvisorySupport.isAdvisoryTopic(originalDestination)) {
|
||||
ActiveMQTopic advisoryTopic = AdvisorySupport.getExpiredTopicMessageAdvisoryTopic(originalDestination);
|
||||
ActiveMQTopic advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(originalDestination);
|
||||
message.setDestination(advisoryTopic);
|
||||
context.getBroker().send(context, message);
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
private DispatchPolicy dispatchPolicy;
|
||||
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
|
||||
private RedeliveryPolicy redeliveryPolicy;
|
||||
private boolean sendAdvisoryIfNoConsumers = true;
|
||||
private boolean sendAdvisoryIfNoConsumers;
|
||||
|
||||
public void configure(Queue queue) {
|
||||
if (dispatchPolicy != null) {
|
||||
|
|
|
@ -66,6 +66,10 @@ public class TestSupport extends TestCase {
|
|||
}
|
||||
}
|
||||
|
||||
protected Destination createDestination() {
|
||||
return createDestination(getClass().getName() + "." + getName());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param messsage
|
||||
* @param firstSet
|
||||
|
|
|
@ -0,0 +1,147 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.policy;
|
||||
|
||||
import org.apache.activemq.TestSupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.Topic;
|
||||
|
||||
/**
|
||||
* @version $Revision$
|
||||
*/
|
||||
public abstract class DeadLetterTestSupport extends TestSupport {
|
||||
|
||||
protected int messageCount = 10;
|
||||
protected long timeToLive = 250;
|
||||
protected Connection connection;
|
||||
protected Session session;
|
||||
protected MessageConsumer consumer;
|
||||
protected MessageProducer producer;
|
||||
private Destination destination;
|
||||
protected int deliveryMode = DeliveryMode.PERSISTENT;
|
||||
protected boolean durableSubscriber = false;
|
||||
protected Destination dlqDestination;
|
||||
protected MessageConsumer dlqConsumer;
|
||||
protected BrokerService broker;
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
super.setUp();
|
||||
broker = createBroker();
|
||||
broker.start();
|
||||
connection = createConnection();
|
||||
connection.setClientID(toString());
|
||||
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
connection.start();
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
if (broker != null) {
|
||||
broker.stop();
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void doTest() throws Exception;
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService broker = new BrokerService();
|
||||
broker.setPersistent(false);
|
||||
return broker;
|
||||
}
|
||||
|
||||
protected void makeConsumer() throws JMSException {
|
||||
Destination destination = getDestination();
|
||||
if (durableSubscriber) {
|
||||
consumer = session.createDurableSubscriber((Topic) destination, destination.toString());
|
||||
}
|
||||
else {
|
||||
consumer = session.createConsumer(destination);
|
||||
}
|
||||
}
|
||||
|
||||
protected void makeDlqConsumer() throws JMSException {
|
||||
dlqDestination = createDlqDestination();
|
||||
|
||||
System.out.println("Consuming from dead letter on: " + dlqDestination);
|
||||
dlqConsumer = session.createConsumer(dlqDestination);
|
||||
}
|
||||
|
||||
protected void sendMessages() throws JMSException {
|
||||
producer = session.createProducer(getDestination());
|
||||
producer.setDeliveryMode(deliveryMode);
|
||||
producer.setTimeToLive(timeToLive);
|
||||
|
||||
System.out.println("Sending " + messageCount + " messages to: " + getDestination());
|
||||
for (int i = 0; i < messageCount; i++) {
|
||||
Message message = session.createTextMessage("msg: " + i);
|
||||
producer.send(message);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract Destination createDlqDestination();
|
||||
|
||||
public void testTransientTopicMessage() throws Exception {
|
||||
super.topic = true;
|
||||
deliveryMode = DeliveryMode.NON_PERSISTENT;
|
||||
durableSubscriber = true;
|
||||
doTest();
|
||||
}
|
||||
|
||||
public void testDurableTopicMessage() throws Exception {
|
||||
super.topic = true;
|
||||
deliveryMode = DeliveryMode.PERSISTENT;
|
||||
durableSubscriber = true;
|
||||
doTest();
|
||||
}
|
||||
|
||||
public void testTransientQueueMessage() throws Exception {
|
||||
super.topic = false;
|
||||
deliveryMode = DeliveryMode.NON_PERSISTENT;
|
||||
durableSubscriber = false;
|
||||
doTest();
|
||||
}
|
||||
|
||||
public void testDurableQueueMessage() throws Exception {
|
||||
super.topic = false;
|
||||
deliveryMode = DeliveryMode.PERSISTENT;
|
||||
durableSubscriber = false;
|
||||
doTest();
|
||||
}
|
||||
|
||||
public Destination getDestination() {
|
||||
if (destination == null) {
|
||||
destination = createDestination();
|
||||
}
|
||||
return destination;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
**/
|
||||
package org.apache.activemq.broker.policy;
|
||||
|
||||
import org.apache.activemq.advisory.AdvisorySupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Message;
|
||||
|
||||
/**
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class NoConsumerDeadLetterTest extends DeadLetterTestSupport {
|
||||
|
||||
// lets disable the inapplicable tests
|
||||
public void testDurableQueueMessage() throws Exception {
|
||||
}
|
||||
|
||||
public void testDurableTopicMessage() throws Exception {
|
||||
}
|
||||
|
||||
public void testTransientQueueMessage() throws Exception {
|
||||
}
|
||||
|
||||
protected void doTest() throws Exception {
|
||||
makeDlqConsumer();
|
||||
sendMessages();
|
||||
|
||||
for (int i =0; i < messageCount; i++){
|
||||
Message msg = dlqConsumer.receive(1000);
|
||||
assertNotNull("Should be a message for loop: " + i, msg);
|
||||
}
|
||||
}
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService broker = super.createBroker();
|
||||
|
||||
PolicyEntry policy = new PolicyEntry();
|
||||
policy.setSendAdvisoryIfNoConsumers(true);
|
||||
|
||||
PolicyMap pMap = new PolicyMap();
|
||||
pMap.setDefaultEntry(policy);
|
||||
|
||||
broker.setDestinationPolicy(pMap);
|
||||
|
||||
return broker;
|
||||
}
|
||||
|
||||
protected Destination createDlqDestination() {
|
||||
return AdvisorySupport.getNoTopicConsumersAdvisoryTopic((ActiveMQDestination) getDestination());
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue