From be919fbc94230ba48a4bc762a9d69df86c70066a Mon Sep 17 00:00:00 2001 From: gtully Date: Tue, 24 Feb 2015 15:36:19 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4483 - rework to use destination option to indicate dlq, dlq strategy is typically not in place for dlq dests, option is set when a dlq is first used via region broker sendTodlq, fix and tests --- .../activemq/broker/jmx/DestinationView.java | 2 +- .../activemq/broker/region/BaseDestination.java | 5 ++--- .../apache/activemq/broker/region/Destination.java | 2 -- .../activemq/broker/region/DestinationFilter.java | 5 ----- .../activemq/broker/region/RegionBroker.java | 1 + .../broker/region/policy/DeadLetterStrategy.java | 2 -- .../policy/IndividualDeadLetterStrategy.java | 14 -------------- .../region/policy/SharedDeadLetterStrategy.java | 8 -------- .../broker/view/BrokerDestinationView.java | 2 +- .../activemq/command/ActiveMQDestination.java | 13 +++++++++++++ .../test/java/org/apache/activemq/TestSupport.java | 12 ++++++++++++ .../activemq/broker/policy/DeadLetterTest.java | 2 ++ .../broker/policy/DeadLetterTestSupport.java | 11 +++++++++-- .../broker/policy/IndividualDeadLetterTest.java | 4 +++- .../apache/activemq/usecases/MemoryLimitTest.java | 7 ------- 15 files changed, 44 insertions(+), 46 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index 8330231ee5..bf9d0d528e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -524,7 +524,7 @@ public class DestinationView implements DestinationViewMBean { @Override public boolean isDLQ() { - return destination.isDLQ(); + return destination.getActiveMQDestination().isDLQ(); } @Override diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 5a41df3d84..5d51b24300 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -843,9 +843,8 @@ public abstract class BaseDestination implements Destination { return ack; } - @Override - public boolean isDLQ() { - return getDeadLetterStrategy().isDLQ(this.getActiveMQDestination()); + protected boolean isDLQ() { + return destination.isDLQ(); } @Override diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java index 16173e5f35..7413a14693 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -241,7 +241,5 @@ public interface Destination extends Service, Task, Message.MessageDestination { public void clearPendingMessages(); - public boolean isDLQ(); - void duplicateFromStore(Message message, Subscription subscription); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index dfc3841337..7f253767ca 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -383,11 +383,6 @@ public class DestinationFilter implements Destination { next.clearPendingMessages(); } - @Override - public boolean isDLQ() { - return next.isDLQ(); - } - @Override public void duplicateFromStore(Message message, Subscription subscription) { next.duplicateFromStore(message, subscription); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 26e0207e55..893ded39bf 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -796,6 +796,7 @@ public class RegionBroker extends EmptyBroker { if (context.getSecurityContext() == null || !context.getSecurityContext().isBrokerContext()) { adminContext = BrokerSupport.getConnectionContext(this); } + addDestination(adminContext, deadLetterDestination, false).getActiveMQDestination().setDLQ(); BrokerSupport.resendNoCopy(adminContext, message, deadLetterDestination); return true; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java index 5affb7223a..7b83dc93d4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java @@ -60,8 +60,6 @@ public interface DeadLetterStrategy { */ public void setProcessNonPersistent(boolean processNonPersistent); - public boolean isDLQ(ActiveMQDestination destination); - /** * Allows for a Message that was already processed by a DLQ to be rolled back in case * of a move or a retry of that message, otherwise the Message would be considered a diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java index 72c0a269be..1dfaa15667 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java @@ -168,18 +168,4 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy { } } - @Override - public boolean isDLQ(ActiveMQDestination destination) { - String name = destination.getPhysicalName(); - if (destination.isQueue()) { - if ((queuePrefix != null && name.startsWith(queuePrefix)) || (queueSuffix != null && name.endsWith(queueSuffix))) { - return true; - } - } else { - if ((topicPrefix != null && name.startsWith(topicPrefix)) || (topicSuffix != null && name.endsWith(topicSuffix))) { - return true; - } - } - return false; - } } \ No newline at end of file diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java index 42b35ce7b5..41f1f10280 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java @@ -48,12 +48,4 @@ public class SharedDeadLetterStrategy extends AbstractDeadLetterStrategy { this.deadLetterQueue = deadLetterQueue; } - @Override - public boolean isDLQ(ActiveMQDestination destination) { - if (destination.equals(deadLetterQueue)) { - return true; - } else { - return false; - } - } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java index a80e64e741..f009d62967 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java @@ -169,7 +169,7 @@ public class BrokerDestinationView { * @return true if the destination is a Dead Letter Queue */ public boolean isDLQ() { - return destination.isDLQ(); + return destination.getActiveMQDestination().isDLQ(); } diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java index bb80a8e7af..09d54ab0f5 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java @@ -22,6 +22,7 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -58,6 +59,7 @@ public abstract class ActiveMQDestination extends JNDIBaseStorable implements Da public static final String TOPIC_QUALIFIED_PREFIX = "topic://"; public static final String TEMP_QUEUE_QUALIFED_PREFIX = "temp-queue://"; public static final String TEMP_TOPIC_QUALIFED_PREFIX = "temp-topic://"; + public static final String IS_DLQ = "isDLQ"; public static final String TEMP_DESTINATION_NAME_PREFIX = "ID:"; @@ -398,6 +400,17 @@ public abstract class ActiveMQDestination extends JNDIBaseStorable implements Da return isPattern; } + public boolean isDLQ() { + return options != null && options.containsKey(IS_DLQ); + } + + public void setDLQ() { + if (options == null) { + options = new HashMap(); + } + options.put(IS_DLQ, String.valueOf(true)); + } + public static UnresolvedDestinationTransformer getUnresolvableDestinationTransformer() { return unresolvableDestinationTransformer; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java index a762f89f6a..07c38f57af 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java @@ -26,7 +26,11 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.TextMessage; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import org.apache.activemq.broker.BrokerRegistry; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.region.DestinationStatistics; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.command.ActiveMQDestination; @@ -173,6 +177,14 @@ public abstract class TestSupport extends CombinationTestSupport { regionBroker.getTopicRegion().getDestinationMap(); } + protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException { + BrokerService brokerService = BrokerRegistry.getInstance().lookup("localhost"); + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name); + QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext() + .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + return proxy; + } + public static enum PersistenceAdapterChoice {LevelDB, KahaDB, AMQ, JDBC, MEM }; public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java index 6c31237759..3357eaf442 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.broker.policy; import javax.jms.Destination; import javax.jms.Message; +import javax.jms.Queue; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; @@ -52,6 +53,7 @@ public class DeadLetterTest extends DeadLetterTestSupport { consumeAndRollback(i); } + verifyIsDlq((Queue) dlqDestination); for (int i = 0; i < messageCount; i++) { Message msg = dlqConsumer.receive(1000); assertMessage(msg, i); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java index b275f2ea39..6d05b6d0a8 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java @@ -31,6 +31,7 @@ import javax.jms.Topic; import org.apache.activemq.TestSupport; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.region.DestinationStatistics; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; @@ -118,11 +119,17 @@ public abstract class DeadLetterTestSupport extends TestSupport { dlqConsumer = session.createConsumer(dlqDestination); } - protected void makeDlqBrowser() throws JMSException { + protected void makeDlqBrowser() throws Exception { dlqDestination = createDlqDestination(); LOG.info("Browsing dead letter on: " + dlqDestination); - dlqBrowser = session.createBrowser((Queue)dlqDestination); + dlqBrowser = session.createBrowser((Queue)dlqDestination); + verifyIsDlq((Queue) dlqDestination); + } + + protected void verifyIsDlq(Queue dlqQ) throws Exception { + final QueueViewMBean queueViewMBean = getProxyToQueue(dlqQ.getQueueName()); + assertTrue("is dlq", queueViewMBean.isDLQ()); } protected void sendMessages() throws JMSException { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java index a587be89d4..1f92962bb8 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java @@ -31,6 +31,7 @@ import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +51,8 @@ public class IndividualDeadLetterTest extends DeadLetterTest { policy.setDeadLetterStrategy(strategy); PolicyMap pMap = new PolicyMap(); - pMap.setDefaultEntry(policy); + pMap.put(new ActiveMQQueue(getDestinationString()), policy); + pMap.put(new ActiveMQTopic(getDestinationString()), policy); broker.setDestinationPolicy(pMap); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java index 647c683886..e3641be327 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java @@ -253,11 +253,4 @@ public class MemoryLimitTest extends TestSupport { assertEquals("consumer got all produced messages", producer.getMessageCount(), consumer.getReceived()); } - protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException { - BrokerService brokerService = BrokerRegistry.getInstance().lookup("localhost"); - ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name); - QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext() - .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); - return proxy; - } }