git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1186095 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2011-10-19 11:13:19 +00:00
parent f3e9ec0b81
commit 3c3b5bbd74
4 changed files with 28 additions and 15 deletions

View File

@ -16,10 +16,6 @@
*/
package org.apache.activemq.broker.region.virtual;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
@ -29,10 +25,14 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import java.io.IOException;
import java.util.List;
import java.util.Set;
public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicInterceptor {
public SelectorAwareVirtualTopicInterceptor(Destination next, String prefix, String postfix) {
super(next, prefix, postfix);
public SelectorAwareVirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) {
super(next, prefix, postfix, local);
}
/**

View File

@ -40,6 +40,7 @@ public class VirtualTopic implements VirtualDestination {
private String postfix = "";
private String name = ">";
private boolean selectorAware = false;
private boolean local = false;
public ActiveMQDestination getVirtualDestination() {
@ -47,8 +48,8 @@ public class VirtualTopic implements VirtualDestination {
}
public Destination intercept(Destination destination) {
return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, getPrefix(), getPostfix()) :
new VirtualTopicInterceptor(destination, getPrefix(), getPostfix());
return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, getPrefix(), getPostfix(), isLocal()) :
new VirtualTopicInterceptor(destination, getPrefix(), getPostfix(), isLocal());
}
@ -111,4 +112,12 @@ public class VirtualTopic implements VirtualDestination {
public boolean isSelectorAware() {
return selectorAware;
}
public boolean isLocal() {
return local;
}
public void setLocal(boolean local) {
this.local = local;
}
}

View File

@ -33,15 +33,17 @@ public class VirtualTopicInterceptor extends DestinationFilter {
private String prefix;
private String postfix;
private boolean local;
public VirtualTopicInterceptor(Destination next, String prefix, String postfix) {
public VirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) {
super(next);
this.prefix = prefix;
this.postfix = postfix;
this.local = local;
}
public void send(ProducerBrokerExchange context, Message message) throws Exception {
if (!message.isAdvisory()) {
if (!message.isAdvisory() && !(local && message.getBrokerPath() != null)) {
ActiveMQDestination queueConsumers = getQueueConsumersWildcard(message.getDestination());
send(context, message, queueConsumers);
}

View File

@ -189,19 +189,19 @@ public class TwoBrokerVirtualDestDinamicallyIncludedDestTest extends JmsMultiple
nc1.setDecreaseNetworkConsumerPriority(decreaseNetworkConsumerPriority);
nc1.setSuppressDuplicateQueueSubscriptions(suppressDuplicateQueueSubscriptions);
nc1.addStaticallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.TOPIC_TYPE));
nc1.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE));
//nc1.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE));
nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.QUEUE_TYPE));
nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.TOPIC_TYPE));
//nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE));
nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE));
NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL, conduit);
nc2.setDecreaseNetworkConsumerPriority(decreaseNetworkConsumerPriority);
nc2.setSuppressDuplicateQueueSubscriptions(suppressDuplicateQueueSubscriptions);
nc2.addStaticallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.TOPIC_TYPE));
nc2.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE));
//nc2.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE));
nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.QUEUE_TYPE));
nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>", ActiveMQDestination.TOPIC_TYPE));
//nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE));
nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>", ActiveMQDestination.QUEUE_TYPE));
}
private BrokerService createAndConfigureBroker(URI uri) throws Exception {
@ -211,7 +211,9 @@ public class TwoBrokerVirtualDestDinamicallyIncludedDestTest extends JmsMultiple
// make all topics virtual and consumers use the default prefix
VirtualDestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor();
virtualDestinationInterceptor.setVirtualDestinations(new VirtualDestination[]{new VirtualTopic()});
VirtualTopic vTopic = new VirtualTopic();
vTopic.setLocal(true);
virtualDestinationInterceptor.setVirtualDestinations(new VirtualDestination[]{vTopic});
DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[]{virtualDestinationInterceptor};
broker.setDestinationInterceptors(destinationInterceptors);
return broker;