From 3c3b5bbd74d4ba29cb9ed614ec0bd022c85cc506 Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Wed, 19 Oct 2011 11:13:19 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3550 - local option for vritual topics git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1186095 13f79535-47bb-0310-9956-ffa450edef68 --- .../SelectorAwareVirtualTopicInterceptor.java | 12 ++++++------ .../broker/region/virtual/VirtualTopic.java | 13 +++++++++++-- .../region/virtual/VirtualTopicInterceptor.java | 6 ++++-- ...rokerVirtualDestDinamicallyIncludedDestTest.java | 12 +++++++----- 4 files changed, 28 insertions(+), 15 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java index d7bdef7bb6..c152445bdb 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java @@ -16,10 +16,6 @@ */ package org.apache.activemq.broker.region.virtual; -import java.io.IOException; -import java.util.List; -import java.util.Set; - import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.Destination; @@ -29,10 +25,14 @@ import org.apache.activemq.command.Message; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; +import java.io.IOException; +import java.util.List; +import java.util.Set; + public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicInterceptor { - public SelectorAwareVirtualTopicInterceptor(Destination next, String prefix, String postfix) { - super(next, prefix, postfix); + public SelectorAwareVirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) { + super(next, prefix, postfix, local); } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java index 3721459e1a..6146754fe0 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java @@ -40,6 +40,7 @@ public class VirtualTopic implements VirtualDestination { private String postfix = ""; private String name = ">"; private boolean selectorAware = false; + private boolean local = false; public ActiveMQDestination getVirtualDestination() { @@ -47,8 +48,8 @@ public class VirtualTopic implements VirtualDestination { } public Destination intercept(Destination destination) { - return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, getPrefix(), getPostfix()) : - new VirtualTopicInterceptor(destination, getPrefix(), getPostfix()); + return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, getPrefix(), getPostfix(), isLocal()) : + new VirtualTopicInterceptor(destination, getPrefix(), getPostfix(), isLocal()); } @@ -111,4 +112,12 @@ public class VirtualTopic implements VirtualDestination { public boolean isSelectorAware() { return selectorAware; } + + public boolean isLocal() { + return local; + } + + public void setLocal(boolean local) { + this.local = local; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java index eefedd95c6..991a625426 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java @@ -33,15 +33,17 @@ public class VirtualTopicInterceptor extends DestinationFilter { private String prefix; private String postfix; + private boolean local; - public VirtualTopicInterceptor(Destination next, String prefix, String postfix) { + public VirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) { super(next); this.prefix = prefix; this.postfix = postfix; + this.local = local; } public void send(ProducerBrokerExchange context, Message message) throws Exception { - if (!message.isAdvisory()) { + if (!message.isAdvisory() && !(local && message.getBrokerPath() != null)) { ActiveMQDestination queueConsumers = getQueueConsumersWildcard(message.getDestination()); send(context, message, queueConsumers); } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java index 91349cb801..be175354c6 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java @@ -189,19 +189,19 @@ public class TwoBrokerVirtualDestDinamicallyIncludedDestTest extends JmsMultiple nc1.setDecreaseNetworkConsumerPriority(decreaseNetworkConsumerPriority); nc1.setSuppressDuplicateQueueSubscriptions(suppressDuplicateQueueSubscriptions); nc1.addStaticallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.TOPIC_TYPE)); - nc1.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE)); + //nc1.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE)); nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.QUEUE_TYPE)); nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.TOPIC_TYPE)); - //nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE)); + nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE)); NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL, conduit); nc2.setDecreaseNetworkConsumerPriority(decreaseNetworkConsumerPriority); nc2.setSuppressDuplicateQueueSubscriptions(suppressDuplicateQueueSubscriptions); nc2.addStaticallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.TOPIC_TYPE)); - nc2.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE)); + //nc2.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE)); nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.QUEUE_TYPE)); nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.TOPIC_TYPE)); - //nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE)); + nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE)); } private BrokerService createAndConfigureBroker(URI uri) throws Exception { @@ -211,7 +211,9 @@ public class TwoBrokerVirtualDestDinamicallyIncludedDestTest extends JmsMultiple // make all topics virtual and consumers use the default prefix VirtualDestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor(); - virtualDestinationInterceptor.setVirtualDestinations(new VirtualDestination[]{new VirtualTopic()}); + VirtualTopic vTopic = new VirtualTopic(); + vTopic.setLocal(true); + virtualDestinationInterceptor.setVirtualDestinations(new VirtualDestination[]{vTopic}); DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[]{virtualDestinationInterceptor}; broker.setDestinationInterceptors(destinationInterceptors); return broker;