From cc81680e10e5c7140ec3e28091df23e9d3c3233b Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Tue, 20 Oct 2015 18:15:30 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6027 Adding support for consumers on virtual destinations to create network demand. This behavior is turned off by default but can be enabled. For example, if a consumer comes online for a queue that is part of a VirtualTopic, this will cause a network of brokers to forward messages because a demand subscription will be created. Same for if a consumer comes online for a forwarded destination from a composite destination. There is also an option to enable flow based on the existence of a virtual destination if the virtual destination is forwarding to a Queue. Full configuration instructions for this feature will be on the wiki page. --- .../activemq/advisory/AdvisoryBroker.java | 267 ++++ ...nationFilterVirtualDestinationMatcher.java | 53 + .../advisory/VirtualDestinationMatcher.java | 29 + .../org/apache/activemq/broker/Broker.java | 5 + .../apache/activemq/broker/BrokerFilter.java | 13 + .../apache/activemq/broker/BrokerService.java | 35 + .../apache/activemq/broker/EmptyBroker.java | 11 + .../apache/activemq/broker/ErrorBroker.java | 13 + .../activemq/broker/MutableBrokerFilter.java | 13 + .../region/virtual/CompositeDestination.java | 41 + .../broker/region/virtual/CompositeQueue.java | 5 + .../broker/region/virtual/CompositeTopic.java | 5 + .../broker/region/virtual/VirtualTopic.java | 49 + .../DemandForwardingBridgeSupport.java | 3 +- .../network/NetworkBridgeConfiguration.java | 28 +- .../activemq/advisory/AdvisorySupport.java | 31 + .../activemq/command/NetworkBridgeFilter.java | 8 +- .../plugin/UpdateVirtualDestinationsTask.java | 48 + activemq-unit-tests/pom.xml | 4 + .../network/VirtualConsumerDemandTest.java | 1418 +++++++++++++++++ 20 files changed, 2076 insertions(+), 3 deletions(-) create mode 100644 activemq-broker/src/main/java/org/apache/activemq/advisory/DestinationFilterVirtualDestinationMatcher.java create mode 100644 activemq-broker/src/main/java/org/apache/activemq/advisory/VirtualDestinationMatcher.java create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 36f5f0b372..bc5f105584 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -18,6 +18,7 @@ package org.apache.activemq.advisory; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; @@ -40,6 +41,7 @@ import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.broker.region.TopicSubscription; +import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; @@ -55,6 +57,7 @@ import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; +import org.apache.activemq.command.SessionId; import org.apache.activemq.security.SecurityContext; import org.apache.activemq.state.ProducerState; import org.apache.activemq.usage.Usage; @@ -78,6 +81,22 @@ public class AdvisoryBroker extends BrokerFilter { private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock(); protected final Map consumers = new LinkedHashMap(); + /** + * This is a set to track all of the virtual destinations that have been added to the broker so + * they can be easily referenced later. + */ + protected final Set virtualDestinations = Collections.newSetFromMap(new ConcurrentHashMap()); + /** + * This is a map to track all consumers that exist on the virtual destination so that we can fire + * an advisory later when they go away to remove the demand. + */ + protected final ConcurrentMap virtualDestinationConsumers = new ConcurrentHashMap<>(); + /** + * This is a map to track unique demand for the existence of a virtual destination so we make sure + * we don't send duplicate advisories. + */ + protected final ConcurrentMap brokerConsumerDests = new ConcurrentHashMap<>(); + protected final ConcurrentMap producers = new ConcurrentHashMap(); protected final ConcurrentMap destinations = new ConcurrentHashMap(); protected final ConcurrentMap networkBridges = new ConcurrentHashMap(); @@ -85,6 +104,8 @@ public class AdvisoryBroker extends BrokerFilter { private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); + private VirtualDestinationMatcher virtualDestinationMatcher = new DestinationFilterVirtualDestinationMatcher(); + public AdvisoryBroker(Broker next) { super(next); advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); @@ -112,6 +133,15 @@ public class AdvisoryBroker extends BrokerFilter { consumersLock.writeLock().lock(); try { consumers.put(info.getConsumerId(), info); + + //check if this is a consumer on a destination that matches a virtual destination + if (getBrokerService().isUseVirtualDestSubs()) { + for (VirtualDestination virtualDestination : virtualDestinations) { + if (virtualDestinationMatcher.matches(virtualDestination, info.getDestination())) { + fireVirtualDestinationAddAdvisory(context, info, info.getDestination(), virtualDestination); + } + } + } } finally { consumersLock.writeLock().unlock(); } @@ -171,6 +201,15 @@ public class AdvisoryBroker extends BrokerFilter { } } + // Replay the virtual destination consumers. + if (AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) { + for (Iterator iter = virtualDestinationConsumers.keySet().iterator(); iter.hasNext(); ) { + ConsumerInfo key = iter.next(); + ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(key.getDestination()); + fireConsumerAdvisory(context, key.getDestination(), topic, key); + } + } + // Replay network bridges if (AdvisorySupport.isNetworkBridgeAdvisoryTopic(info.getDestination())) { for (Iterator iter = networkBridges.keySet().iterator(); iter.hasNext(); ) { @@ -199,6 +238,16 @@ public class AdvisoryBroker extends BrokerFilter { public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean create) throws Exception { Destination answer = super.addDestination(context, destination, create); if (!AdvisorySupport.isAdvisoryTopic(destination)) { + //for queues, create demand if isUseVirtualDestSubsOnCreation is true + if (getBrokerService().isUseVirtualDestSubsOnCreation() && destination.isQueue()) { + //check if this new destination matches a virtual destination that exists + for (VirtualDestination virtualDestination : virtualDestinations) { + if (virtualDestinationMatcher.matches(virtualDestination, destination)) { + fireVirtualDestinationAddAdvisory(context, null, destination, virtualDestination); + } + } + } + DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination); DestinationInfo previous = destinations.putIfAbsent(destination, info); if (previous == null) { @@ -228,6 +277,28 @@ public class AdvisoryBroker extends BrokerFilter { super.removeDestination(context, destination, timeout); DestinationInfo info = destinations.remove(destination); if (info != null) { + + //on destination removal, remove all demand if using virtual dest subs + if (getBrokerService().isUseVirtualDestSubs()) { + for (ConsumerInfo consumerInfo : virtualDestinationConsumers.keySet()) { + //find all consumers for this virtual destination + VirtualDestination virtualDestination = virtualDestinationConsumers.get(consumerInfo); + + //find a consumer that matches this virtualDest and destination + if (virtualDestinationMatcher.matches(virtualDestination, destination)) { + //in case of multiple matches + VirtualConsumerPair key = new VirtualConsumerPair(virtualDestination, destination); + ConsumerInfo i = brokerConsumerDests.get(key); + if (consumerInfo.equals(i)) { + if (brokerConsumerDests.remove(key) != null) { + fireVirtualDestinationRemoveAdvisory(context, consumerInfo); + break; + } + } + } + } + } + // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate info = info.copy(); info.setDestination(destination); @@ -285,6 +356,11 @@ public class AdvisoryBroker extends BrokerFilter { consumersLock.writeLock().lock(); try { consumers.remove(info.getConsumerId()); + + //remove the demand for this consumer if it matches a virtual destination + if(getBrokerService().isUseVirtualDestSubs()) { + fireVirtualDestinationRemoveAdvisory(context, info); + } } finally { consumersLock.writeLock().unlock(); } @@ -467,6 +543,140 @@ public class AdvisoryBroker extends BrokerFilter { } } + private final IdGenerator connectionIdGenerator = new IdGenerator("advisory"); + private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator(); + private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); + + @Override + public void virtualDestinationAdded(ConnectionContext context, + VirtualDestination virtualDestination) { + super.virtualDestinationAdded(context, virtualDestination); + + if (virtualDestinations.add(virtualDestination)) { + try { + // Don't advise advisory topics. + if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) { + + //create demand for consumers on virtual destinations + consumersLock.readLock().lock(); + try { + //loop through existing destinations to see if any match this newly + //created virtual destination + if (getBrokerService().isUseVirtualDestSubsOnCreation()) { + //for matches that are a queue, fire an advisory for demand + for (ActiveMQDestination destination : destinations.keySet()) { + if(destination.isQueue()) { + if (virtualDestinationMatcher.matches(virtualDestination, destination)) { + fireVirtualDestinationAddAdvisory(context, null, destination, virtualDestination); + } + } + } + } + + //loop through existing consumers to see if any of them are consuming on a destination + //that matches the new virtual destination + for (Iterator iter = consumers.values().iterator(); iter.hasNext(); ) { + ConsumerInfo info = iter.next(); + if (virtualDestinationMatcher.matches(virtualDestination, info.getDestination())) { + fireVirtualDestinationAddAdvisory(context, info, info.getDestination(), virtualDestination); + } + } + } finally { + consumersLock.readLock().unlock(); + } + } + } catch (Exception e) { + handleFireFailure("virtualDestinationAdded", e); + } + } + } + + private void fireVirtualDestinationAddAdvisory(ConnectionContext context, ConsumerInfo info, ActiveMQDestination activeMQDest, + VirtualDestination virtualDestination) throws Exception { + //if no consumer info, we need to create one - this is the case when an advisory is fired + //because of the existence of a destination matching a virtual destination + if (info == null) { + ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId()); + SessionId sessionId = new SessionId(connectionId, sessionIdGenerator.getNextSequenceId()); + ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); + + info = new ConsumerInfo(consumerId); + + //store the virtual destination and the activeMQDestination as a pair so that we can keep track + //of all matching forwarded destinations that caused demand + if(brokerConsumerDests.putIfAbsent(new VirtualConsumerPair(virtualDestination, activeMQDest), info) == null) { + info.setDestination(virtualDestination.getVirtualDestination()); + ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination()); + + if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) { + fireConsumerAdvisory(context, info.getDestination(), topic, info); + } + } + //this is the case of a real consumer coming online + } else { + info = info.copy(); + info.setDestination(virtualDestination.getVirtualDestination()); + ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination()); + + if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) { + fireConsumerAdvisory(context, info.getDestination(), topic, info); + } + } + } + + @Override + public void virtualDestinationRemoved(ConnectionContext context, + VirtualDestination virtualDestination) { + super.virtualDestinationRemoved(context, virtualDestination); + + if (virtualDestinations.remove(virtualDestination)) { + try { + consumersLock.readLock().lock(); + try { + // remove the demand created by the addition of the virtual destination + if (getBrokerService().isUseVirtualDestSubsOnCreation()) { + if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) { + for (ConsumerInfo info : virtualDestinationConsumers.keySet()) { + //find all consumers for this virtual destination + if (virtualDestinationConsumers.get(info).equals(virtualDestination)) { + fireVirtualDestinationRemoveAdvisory(context, info); + } + + //check consumers created for the existence of a destination to see if they + //match the consumerinfo and clean up + for (VirtualConsumerPair activeMQDest : brokerConsumerDests.keySet()) { + ConsumerInfo i = brokerConsumerDests.get(activeMQDest); + if (info.equals(i)) { + brokerConsumerDests.remove(activeMQDest); + } + } + } + } + } + } finally { + consumersLock.readLock().unlock(); + } + } catch (Exception e) { + handleFireFailure("virtualDestinationAdded", e); + } + } + } + + private void fireVirtualDestinationRemoveAdvisory(ConnectionContext context, + ConsumerInfo info) throws Exception { + + VirtualDestination virtualDestination = virtualDestinationConsumers.remove(info); + if (virtualDestination != null) { + ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(virtualDestination.getVirtualDestination()); + + ActiveMQDestination dest = info.getDestination(); + + if (!dest.isTemporary() || destinations.containsKey(dest)) { + fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand()); + } + } + } + @Override public void isFull(ConnectionContext context, Destination destination, Usage usage) { super.isFull(context, destination, usage); @@ -681,4 +891,61 @@ public class AdvisoryBroker extends BrokerFilter { public Map getAdvisoryDestinations() { return destinations; } + + private class VirtualConsumerPair { + private final VirtualDestination virtualDestination; + + //destination that matches this virtualDestination as part target + //this is so we can keep track of more than one destination that might + //match the virtualDestination and cause demand + private final ActiveMQDestination activeMQDestination; + + public VirtualConsumerPair(VirtualDestination virtualDestination, + ActiveMQDestination activeMQDestination) { + super(); + this.virtualDestination = virtualDestination; + this.activeMQDestination = activeMQDestination; + } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + getOuterType().hashCode(); + result = prime + * result + + ((activeMQDestination == null) ? 0 : activeMQDestination + .hashCode()); + result = prime + * result + + ((virtualDestination == null) ? 0 : virtualDestination + .hashCode()); + return result; + } + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + VirtualConsumerPair other = (VirtualConsumerPair) obj; + if (!getOuterType().equals(other.getOuterType())) + return false; + if (activeMQDestination == null) { + if (other.activeMQDestination != null) + return false; + } else if (!activeMQDestination.equals(other.activeMQDestination)) + return false; + if (virtualDestination == null) { + if (other.virtualDestination != null) + return false; + } else if (!virtualDestination.equals(other.virtualDestination)) + return false; + return true; + } + private AdvisoryBroker getOuterType() { + return AdvisoryBroker.this; + } + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/DestinationFilterVirtualDestinationMatcher.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/DestinationFilterVirtualDestinationMatcher.java new file mode 100644 index 0000000000..5c57cf0b22 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/DestinationFilterVirtualDestinationMatcher.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import org.apache.activemq.broker.region.virtual.CompositeDestination; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualTopic; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.filter.DestinationFilter; + +/** + * This class will use a destination filter to see if the activeMQ destination matches + * the given virtual destination + * + */ +public class DestinationFilterVirtualDestinationMatcher implements VirtualDestinationMatcher { + + /* (non-Javadoc) + * @see org.apache.activemq.advisory.VirtualDestinationMatcher#matches(org.apache.activemq.broker.region.virtual.VirtualDestination) + */ + @Override + public boolean matches(VirtualDestination virtualDestination, ActiveMQDestination activeMQDest) { + if (virtualDestination instanceof CompositeDestination) { + DestinationFilter filter = DestinationFilter.parseFilter(virtualDestination.getMappedDestinations()); + if (filter.matches(activeMQDest)) { + return true; + } + } else if (virtualDestination instanceof VirtualTopic) { + DestinationFilter filter = DestinationFilter.parseFilter(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT)); + if (filter.matches(activeMQDest)) { + return true; + } + } + + return false; + } + +} diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/VirtualDestinationMatcher.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/VirtualDestinationMatcher.java new file mode 100644 index 0000000000..571a311a18 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/VirtualDestinationMatcher.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.command.ActiveMQDestination; + +/** + * + * + */ +public interface VirtualDestinationMatcher { + + public boolean matches(VirtualDestination virtualDestination, ActiveMQDestination activeMQDest); +} diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java index fa8e4fdd28..87cb3bcd2f 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java @@ -26,6 +26,7 @@ import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Region; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -385,6 +386,10 @@ public interface Broker extends Region, Service { */ void isFull(ConnectionContext context,Destination destination,Usage usage); + void virtualDestinationAdded(ConnectionContext context, VirtualDestination virtualDestination); + + void virtualDestinationRemoved(ConnectionContext context, VirtualDestination virtualDestination); + /** * called when the broker becomes the master in a master/slave * configuration diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java index 132b46dc4e..2a8ae718da 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -24,6 +24,7 @@ import java.util.concurrent.ThreadPoolExecutor; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -359,6 +360,18 @@ public class BrokerFilter implements Broker { next.slowConsumer(context, destination,subs); } + @Override + public void virtualDestinationAdded(ConnectionContext context, + VirtualDestination virtualDestination) { + next.virtualDestinationAdded(context, virtualDestination); + } + + @Override + public void virtualDestinationRemoved(ConnectionContext context, + VirtualDestination virtualDestination) { + next.virtualDestinationRemoved(context, virtualDestination); + } + @Override public void nowMasterBroker() { next.nowMasterBroker(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index 61a4cefd67..5e7dd979ca 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -203,6 +203,15 @@ public class BrokerService implements Service { private boolean useVirtualTopics = true; private boolean useMirroredQueues = false; private boolean useTempMirroredQueues = true; + /** + * Whether or not virtual destination subscriptions should cause network demand + */ + private boolean useVirtualDestSubs = false; + /** + * Whether or no the creation of destinations that match virtual destinations + * should cause network demand + */ + private boolean useVirtualDestSubsOnCreation = false; private BrokerId brokerId; private volatile DestinationInterceptor[] destinationInterceptors; private ActiveMQDestination[] destinations; @@ -2699,6 +2708,14 @@ public class BrokerService implements Service { if (virtualDestination instanceof VirtualTopic) { consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT)); } + if (isUseVirtualDestSubs()) { + try { + broker.virtualDestinationAdded(getAdminConnectionContext(), virtualDestination); + LOG.debug("Adding virtual destination: {}", virtualDestination); + } catch (Exception e) { + LOG.warn("Could not fire virtual destination consumer advisory", e); + } + } } } } @@ -3133,4 +3150,22 @@ public class BrokerService implements Service { public void setRejectDurableConsumers(boolean rejectDurableConsumers) { this.rejectDurableConsumers = rejectDurableConsumers; } + + public boolean isUseVirtualDestSubs() { + return useVirtualDestSubs; + } + + public void setUseVirtualDestSubs( + boolean useVirtualDestSubs) { + this.useVirtualDestSubs = useVirtualDestSubs; + } + + public boolean isUseVirtualDestSubsOnCreation() { + return useVirtualDestSubsOnCreation; + } + + public void setUseVirtualDestSubsOnCreation( + boolean useVirtualDestSubsOnCreation) { + this.useVirtualDestSubsOnCreation = useVirtualDestSubsOnCreation; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java index 8185554ac4..c4059a08f5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -25,6 +25,7 @@ import java.util.concurrent.ThreadPoolExecutor; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -345,6 +346,16 @@ public class EmptyBroker implements Broker { public void slowConsumer(ConnectionContext context,Destination destination, Subscription subs) { } + @Override + public void virtualDestinationAdded(ConnectionContext context, + VirtualDestination virtualDestination) { + } + + @Override + public void virtualDestinationRemoved(ConnectionContext context, + VirtualDestination virtualDestination) { + } + @Override public void nowMasterBroker() { } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java index ae42141e95..35501e3150 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -25,6 +25,7 @@ import java.util.concurrent.ThreadPoolExecutor; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -361,6 +362,18 @@ public class ErrorBroker implements Broker { throw new BrokerStoppedException(this.message); } + @Override + public void virtualDestinationAdded(ConnectionContext context, + VirtualDestination virtualDestination) { + throw new BrokerStoppedException(this.message); + } + + @Override + public void virtualDestinationRemoved(ConnectionContext context, + VirtualDestination virtualDestination) { + throw new BrokerStoppedException(this.message); + } + @Override public void nowMasterBroker() { throw new BrokerStoppedException(this.message); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java index 2eea2e840b..6306325fec 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -370,6 +371,18 @@ public class MutableBrokerFilter implements Broker { getNext().slowConsumer(context, dest,subs); } + @Override + public void virtualDestinationAdded(ConnectionContext context, + VirtualDestination virtualDestination) { + getNext().virtualDestinationAdded(context, virtualDestination); + } + + @Override + public void virtualDestinationRemoved(ConnectionContext context, + VirtualDestination virtualDestination) { + getNext().virtualDestinationRemoved(context, virtualDestination); + } + @Override public void nowMasterBroker() { getNext().nowMasterBroker(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java index 56588393ab..1b976c01c8 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java @@ -143,4 +143,45 @@ public abstract class CompositeDestination implements VirtualDestination { } }; } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (concurrentSend ? 1231 : 1237); + result = prime * result + (copyMessage ? 1231 : 1237); + result = prime * result + (forwardOnly ? 1231 : 1237); + result = prime * result + + ((forwardTo == null) ? 0 : forwardTo.hashCode()); + result = prime * result + ((name == null) ? 0 : name.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + CompositeDestination other = (CompositeDestination) obj; + if (concurrentSend != other.concurrentSend) + return false; + if (copyMessage != other.copyMessage) + return false; + if (forwardOnly != other.forwardOnly) + return false; + if (forwardTo == null) { + if (other.forwardTo != null) + return false; + } else if (!forwardTo.equals(other.forwardTo)) + return false; + if (name == null) { + if (other.name != null) + return false; + } else if (!name.equals(other.name)) + return false; + return true; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java index 1b0f75dae4..d253d9f3db 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java @@ -38,4 +38,9 @@ public class CompositeQueue extends CompositeDestination { // nothing to do for mapped destinations return destination; } + + @Override + public String toString() { + return "CompositeQueue [" + getName() + "]"; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java index 667a80cee9..9b817d0b9c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java @@ -41,4 +41,9 @@ public class CompositeTopic extends CompositeDestination { } return destination; } + + @Override + public String toString() { + return "CompositeTopic [" + getName() + "]"; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java index 14ea3fe34f..7049ccbc60 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java @@ -194,4 +194,53 @@ public class VirtualTopic implements VirtualDestination { public void setTransactedSend(boolean transactedSend) { this.transactedSend = transactedSend; } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (concurrentSend ? 1231 : 1237); + result = prime * result + (local ? 1231 : 1237); + result = prime * result + ((name == null) ? 0 : name.hashCode()); + result = prime * result + ((postfix == null) ? 0 : postfix.hashCode()); + result = prime * result + ((prefix == null) ? 0 : prefix.hashCode()); + result = prime * result + (selectorAware ? 1231 : 1237); + result = prime * result + (transactedSend ? 1231 : 1237); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + VirtualTopic other = (VirtualTopic) obj; + if (concurrentSend != other.concurrentSend) + return false; + if (local != other.local) + return false; + if (name == null) { + if (other.name != null) + return false; + } else if (!name.equals(other.name)) + return false; + if (postfix == null) { + if (other.postfix != null) + return false; + } else if (!postfix.equals(other.postfix)) + return false; + if (prefix == null) { + if (other.prefix != null) + return false; + } else if (!prefix.equals(other.prefix)) + return false; + if (selectorAware != other.selectorAware) + return false; + if (transactedSend != other.transactedSend) + return false; + return true; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index ad6fd61115..fac39ac4c1 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -1352,7 +1352,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException { - if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) { + if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) || + AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) { sub.getLocalInfo().setDispatchAsync(true); } else { sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync()); diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java index 3a59f3031c..09127a1c85 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java @@ -29,6 +29,7 @@ import org.apache.activemq.command.ConsumerInfo; public class NetworkBridgeConfiguration { private boolean conduitSubscriptions = true; + private boolean useVirtualDestSubs; private boolean dynamicOnly; private boolean dispatchAsync = true; private boolean decreaseNetworkConsumerPriority; @@ -237,11 +238,27 @@ public class NetworkBridgeConfiguration { filter.append("."); filter.append(destination.getPhysicalName()); delimiter = ","; + + if (useVirtualDestSubs) { + filter.append(delimiter); + filter.append(AdvisorySupport.VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX); + filter.append(destination.getDestinationTypeAsString()); + filter.append("."); + filter.append(destination.getPhysicalName()); + } } } return filter.toString(); } else { - return AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">"; + StringBuffer filter = new StringBuffer(); + filter.append(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX); + filter.append(">"); + if (useVirtualDestSubs) { + filter.append(","); + filter.append(AdvisorySupport.VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX); + filter.append(">"); + } + return filter.toString(); } } else { // prepend consumer advisory prefix @@ -449,4 +466,13 @@ public class NetworkBridgeConfiguration { this.checkDuplicateMessagesOnDuplex = checkDuplicateMessagesOnDuplex; } + public boolean isUseVirtualDestSus() { + return useVirtualDestSubs; + } + + public void setUseVirtualDestSubs( + boolean useVirtualDestSubs) { + this.useVirtualDestSubs = useVirtualDestSubs; + } + } diff --git a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java index b26c600f8f..ac3ee0338a 100755 --- a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java +++ b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java @@ -37,8 +37,11 @@ public final class AdvisorySupport { public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue."; public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic."; public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer."; + public static final String VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "VirtualDestination.Consumer."; public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue."; public static final String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic."; + public static final String QUEUE_VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX = VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue."; + public static final String TOPIC_VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX = VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic."; public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Topic."; public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue."; public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic."; @@ -116,6 +119,16 @@ public final class AdvisorySupport { return getAdvisoryTopic(destination, prefix, true); } + public static ActiveMQTopic getVirtualDestinationConsumerAdvisoryTopic(ActiveMQDestination destination) { + String prefix; + if (destination.isQueue()) { + prefix = QUEUE_VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX; + } else { + prefix = TOPIC_VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX; + } + return getAdvisoryTopic(destination, prefix, true); + } + public static ActiveMQTopic getProducerAdvisoryTopic(Destination destination) throws JMSException { return getProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); } @@ -389,6 +402,24 @@ public final class AdvisorySupport { } } + public static boolean isVirtualDestinationConsumerAdvisoryTopic(Destination destination) throws JMSException { + return isVirtualDestinationConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); + } + + public static boolean isVirtualDestinationConsumerAdvisoryTopic(ActiveMQDestination destination) { + if (destination.isComposite()) { + ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); + for (int i = 0; i < compositeDestinations.length; i++) { + if (isVirtualDestinationConsumerAdvisoryTopic(compositeDestinations[i])) { + return true; + } + } + return false; + } else { + return destination.isTopic() && destination.getPhysicalName().startsWith(VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX); + } + } + public static boolean isSlowConsumerAdvisoryTopic(Destination destination) throws JMSException { return isSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); } diff --git a/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java b/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java index 245c09840e..5bd80b0740 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java @@ -51,14 +51,17 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression { this.consumerInfo = consumerInfo; } + @Override public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; } + @Override public boolean isMarshallAware() { return false; } + @Override public boolean matches(MessageEvaluationContext mec) throws JMSException { try { // for Queues - the message can be acknowledged and dropped whilst @@ -72,6 +75,7 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression { } } + @Override public Object evaluate(MessageEvaluationContext message) throws JMSException { return matches(message) ? Boolean.TRUE : Boolean.FALSE; } @@ -125,7 +129,9 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression { } public static boolean isAdvisoryInterpretedByNetworkBridge(Message message) { - return AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) || AdvisorySupport.isTempDestinationAdvisoryTopic(message.getDestination()); + return AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) || + AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(message.getDestination()) || + AdvisorySupport.isTempDestinationAdvisoryTopic(message.getDestination()); } public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/UpdateVirtualDestinationsTask.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/UpdateVirtualDestinationsTask.java index cd0121cfc8..ef7e1b509a 100644 --- a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/UpdateVirtualDestinationsTask.java +++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/UpdateVirtualDestinationsTask.java @@ -18,16 +18,23 @@ package org.apache.activemq.plugin; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.CompositeDestinationInterceptor; import org.apache.activemq.broker.region.DestinationInterceptor; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class UpdateVirtualDestinationsTask implements Runnable { + public static final Logger LOG = LoggerFactory.getLogger(UpdateVirtualDestinationsTask.class); private final AbstractRuntimeConfigurationBroker plugin; public UpdateVirtualDestinationsTask( @@ -49,11 +56,52 @@ public abstract class UpdateVirtualDestinationsTask implements Runnable { // update existing interceptor final VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) destinationInterceptor; + Set existingVirtualDests = new HashSet<>(); + Collections.addAll(existingVirtualDests, virtualDestinationInterceptor.getVirtualDestinations()); + + Set newVirtualDests = new HashSet<>(); + Collections.addAll(newVirtualDests, getVirtualDestinations()); + + Set addedVirtualDests = new HashSet<>(); + Set removedVirtualDests = new HashSet<>(); + //detect new virtual destinations + for (VirtualDestination newVirtualDest : newVirtualDests) { + if (!existingVirtualDests.contains(newVirtualDest)) { + addedVirtualDests.add(newVirtualDest); + } + } + //detect removed virtual destinations + for (VirtualDestination existingVirtualDest : existingVirtualDests) { + if (!newVirtualDests.contains(existingVirtualDest)) { + removedVirtualDests.add(existingVirtualDest); + } + } + virtualDestinationInterceptor .setVirtualDestinations(getVirtualDestinations()); plugin.info("applied updates to: " + virtualDestinationInterceptor); updatedExistingInterceptor = true; + + ConnectionContext connectionContext; + try { + connectionContext = plugin.getBrokerService().getAdminConnectionContext(); + //signal updates + if (plugin.getBrokerService().isUseVirtualDestSubs()) { + for (VirtualDestination removedVirtualDest : removedVirtualDests) { + plugin.virtualDestinationRemoved(connectionContext, removedVirtualDest); + LOG.info("Removing virtual destination: {}", removedVirtualDest); + } + + for (VirtualDestination addedVirtualDest : addedVirtualDests) { + plugin.virtualDestinationAdded(connectionContext, addedVirtualDest); + LOG.info("Adding virtual destination: {}", addedVirtualDest); + } + } + + } catch (Exception e) { + LOG.warn("Could not process virtual destination advisories", e); + } } } diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml index 5df7fcffc6..e03b2463c8 100755 --- a/activemq-unit-tests/pom.xml +++ b/activemq-unit-tests/pom.xml @@ -70,6 +70,10 @@ org.apache.activemq activemq-partition + + org.apache.activemq + activemq-runtime-config + org.apache.geronimo.specs geronimo-jms_1.1_spec diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java new file mode 100644 index 0000000000..3439ccbc1b --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java @@ -0,0 +1,1418 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.lang.reflect.Field; +import java.net.URI; +import java.util.Arrays; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.advisory.AdvisoryBroker; +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.virtual.CompositeQueue; +import org.apache.activemq.broker.region.virtual.CompositeTopic; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker; +import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +/** + * This test is to show that dynamicallyIncludedDestinations will work properly + * when a network of brokers is configured to treat Virtual Destinations (Virtual topic and composite destination) + * as demand. + */ +@RunWith(Parameterized.class) +public class VirtualConsumerDemandTest { + + protected static final int MESSAGE_COUNT = 10; + private static final Logger LOG = LoggerFactory.getLogger(VirtualConsumerDemandTest.class); + + + /** + * test params + */ + @Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { + //not duplex, useVirtualDestSubsOnCreation + {false, true}, + //duplex + {true, false}, + {true, true}, + {false, false} + }); + } + + protected Connection localConnection; + protected Connection remoteConnection; + protected BrokerService localBroker; + protected BrokerService remoteBroker; + protected JavaRuntimeConfigurationBroker runtimeBroker; + protected Session localSession; + protected Session remoteSession; + protected ActiveMQTopic included; + protected ActiveMQTopic excluded; + protected String consumerName = "durableSubs"; + protected String testTopicName = "include.test.bar"; + protected String testQueueName = "include.test.foo"; + + private final boolean isDuplex; + private final boolean isUseVirtualDestSubsOnCreation; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(new File("target")); + + + public VirtualConsumerDemandTest(boolean isDuplex, boolean isUseVirtualDestSubsOnCreation) { + // Assume.assumeTrue( + super(); + this.isDuplex = isDuplex; + this.isUseVirtualDestSubsOnCreation = isUseVirtualDestSubsOnCreation; + } + + + /** + * Test that the creation of a virtual topic will cause demand + * even without a consumer for the case of useVirtualDestSubsOnCreation == true + * + * @throws Exception + */ + @Test(timeout = 60 * 1000) + public void testVirtualTopic() throws Exception { + Assume.assumeTrue(isUseVirtualDestSubsOnCreation); + doSetUp(true, null); + + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer("VirtualTopic.>"); + + MessageProducer includedProducer = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar")); + Thread.sleep(2000); + Message test = localSession.createTextMessage("test"); + + final DestinationStatistics destinationStatistics = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar")).getDestinationStatistics(); + + //this will create the destination so messages accumulate + final DestinationStatistics remoteStats = remoteBroker.getDestination(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar")).getDestinationStatistics(); + waitForConsumerCount(destinationStatistics, 1); + + includedProducer.send(test); + + //assert statistics + waitForDispatchFromLocalBroker(destinationStatistics, 1); + assertLocalBrokerStatistics(destinationStatistics, 1); + assertEquals("remote dest messages", 1, remoteStats.getMessages().getCount()); + + assertRemoteAdvisoryCount(advisoryConsumer, 1); + assertAdvisoryBrokerCounts(1,1,1); + } + + + + /** + * Test that the creation of a virtual topic with a consumer will cause + * demand regardless of useVirtualDestSubsOnCreation + * + * @throws Exception + */ + @Test(timeout = 60 * 1000) + public void testVirtualTopicWithConsumer() throws Exception { + doSetUp(true, null); + + //use just the default virtual topic setup + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer("VirtualTopic.>"); + + MessageProducer includedProducer = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar")); + Thread.sleep(2000); + Message test = localSession.createTextMessage("test"); + + final DestinationStatistics destinationStatistics = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar")).getDestinationStatistics(); + + MessageConsumer bridgeConsumer = remoteSession.createConsumer(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar")); + waitForConsumerCount(destinationStatistics, 1); + + includedProducer.send(test); + assertNotNull(bridgeConsumer.receive(5000)); + + waitForDispatchFromLocalBroker(destinationStatistics, 1); + assertLocalBrokerStatistics(destinationStatistics, 1); + + assertRemoteAdvisoryCount(advisoryConsumer, 2, 1); + + if (isUseVirtualDestSubsOnCreation) { + assertAdvisoryBrokerCounts(1,2,1); + } else { + assertAdvisoryBrokerCounts(1,1,0); + } + } + + + /** + * Test that when a consumer goes offline for a virtual topic, that messages still flow + * to that queue if isUseVirtualDestSubsOnCreation is true + * + * @throws Exception + */ + @Test(timeout = 60 * 1000) + public void testVirtualTopicWithConsumerGoOffline() throws Exception { + Assume.assumeTrue(isUseVirtualDestSubsOnCreation); + //use just the default virtual topic setup + doSetUp(true, null); + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer("VirtualTopic.>"); + + MessageProducer includedProducer = localSession.createProducer(new ActiveMQTopic("VirtualTopic.include.test.bar")); + Thread.sleep(2000); + Message test = localSession.createTextMessage("test"); + + final DestinationStatistics destinationStatistics = localBroker.getDestination(new ActiveMQTopic("VirtualTopic.include.test.bar")).getDestinationStatistics(); + + MessageConsumer bridgeConsumer = remoteSession.createConsumer(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar")); + waitForConsumerCount(destinationStatistics, 1); + + includedProducer.send(test); + assertNotNull(bridgeConsumer.receive(5000)); + + //assert a message was forwarded + waitForDispatchFromLocalBroker(destinationStatistics, 1); + assertLocalBrokerStatistics(destinationStatistics, 1); + + //close the consumer and send a second message + bridgeConsumer.close(); + Thread.sleep(2000); + includedProducer.send(test); + + //check that the message was forwarded + waitForDispatchFromLocalBroker(destinationStatistics, 2); + assertLocalBrokerStatistics(destinationStatistics, 2); + + //make sure that the message can be received + MessageConsumer bridgeConsumer2 = remoteSession.createConsumer(new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar")); + assertNotNull(bridgeConsumer2.receive(5000)); + + //should be 4 advisories...1 or the virtual destination creation to a queue, + //2 for new consumers, and 1 for a closed consumer + assertRemoteAdvisoryCount(advisoryConsumer, 4); + assertAdvisoryBrokerCounts(1,2,1); + } + + /** + * This test shows that if isUseVirtualDestSubsOnCreation is true, + * the creation of a composite destination that forwards to a Queue will create + * a virtual consumer and cause demand so that the queue will accumulate messages + * + * @throws Exception + */ + @Test(timeout = 60 * 1000) + public void testDynamicFlow() throws Exception { + Assume.assumeTrue(isUseVirtualDestSubsOnCreation); + + doSetUp(true, null); + + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); + + //configure a virtual destination that forwards messages from topic testQueueName + //to queue "include.test.bar.bridge" + CompositeTopic compositeTopic = createCompositeTopic(testTopicName, + new ActiveMQQueue("include.test.bar.bridge")); + + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}); + + MessageProducer includedProducer = localSession.createProducer(included); + Thread.sleep(2000); + Message test = localSession.createTextMessage("test"); + + final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics(); + final DestinationStatistics remoteDestStatistics = remoteBroker.getDestination( + new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics(); + + waitForConsumerCount(destinationStatistics, 1); + includedProducer.send(test); + + waitForDispatchFromLocalBroker(destinationStatistics, 1); + assertLocalBrokerStatistics(destinationStatistics, 1); + assertEquals("remote dest messages", 1, remoteDestStatistics.getMessages().getCount()); + + assertRemoteAdvisoryCount(advisoryConsumer, 1); + assertAdvisoryBrokerCounts(1,1,1); + } + + + /** + * Test that dynamic flow works for virtual destinations when a second composite + * topic is included that forwards to the same queue, but is excluded from + * being forwarded from the remote broker + * + * @throws Exception + */ + @Test(timeout = 60 * 1000) + public void testSecondNonIncludedCompositeTopic() throws Exception { + Assume.assumeTrue(isUseVirtualDestSubsOnCreation); + + doSetUp(true, null); + + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); + + //configure a composite topic that isn't included + CompositeTopic compositeTopic = createCompositeTopic("include.test.bar2", + new ActiveMQQueue("include.test.bar.bridge")); + + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}); + + Thread.sleep(2000); + + //add one that is included + CompositeTopic compositeTopic2 = createCompositeTopic(testTopicName, + new ActiveMQQueue("include.test.bar.bridge")); + + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic, compositeTopic2}); + + MessageProducer includedProducer = localSession.createProducer(included); + Thread.sleep(2000); + Message test = localSession.createTextMessage("test"); + + final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics(); + final DestinationStatistics remoteDestStatistics = remoteBroker.getDestination( + new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics(); + + waitForConsumerCount(destinationStatistics, 1); + + includedProducer.send(test); + + waitForDispatchFromLocalBroker(destinationStatistics, 1); + assertLocalBrokerStatistics(destinationStatistics, 1); + assertEquals("remote dest messages", 1, remoteDestStatistics.getMessages().getCount()); + + assertRemoteAdvisoryCount(advisoryConsumer, 1); + assertAdvisoryBrokerCounts(2,2,2); + + } + + /** + * Test that no messages are forwarded when isUseVirtualDestSubsOnCreation is false + * and there are no consumers + * + * @throws Exception + */ + @Test(timeout = 60 * 1000) + public void testNoUseVirtualDestinationSubscriptionsOnCreation() throws Exception { + Assume.assumeTrue(!isUseVirtualDestSubsOnCreation); + + doSetUp(true, null); + + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); + + //configure a virtual destination that forwards messages from topic testQueueName + //to queue "include.test.bar.bridge" + CompositeTopic compositeTopic = createCompositeTopic(testTopicName, + new ActiveMQQueue("include.test.bar.bridge")); + + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}); + + MessageProducer includedProducer = localSession.createProducer(included); + Thread.sleep(2000); + Message test = localSession.createTextMessage("test"); + + final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics(); + final DestinationStatistics remoteDestStatistics = remoteBroker.getDestination( + new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics(); + + includedProducer.send(test); + Thread.sleep(2000); + + waitForDispatchFromLocalBroker(destinationStatistics, 0); + assertLocalBrokerStatistics(destinationStatistics, 0); + assertEquals("remote dest messages", 0, remoteDestStatistics.getMessages().getCount()); + + assertRemoteAdvisoryCount(advisoryConsumer, 0); + assertAdvisoryBrokerCounts(1,0,0); + + } + + + /** + * Test that messages still flow when updating a composite topic to remove 1 of the + * forwarded destinations, but keep the other one + * + * @throws Exception + */ + @Test(timeout = 60 * 1000) + public void testTwoTargetsRemove1() throws Exception { + Assume.assumeTrue(isUseVirtualDestSubsOnCreation); + + doSetUp(true, null); + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); + + //configure a virtual destination that forwards messages from topic testQueueName + //to queue "include.test.bar.bridge" and "include.test.bar.bridge2" + CompositeTopic compositeTopic = createCompositeTopic(testTopicName, + new ActiveMQQueue("include.test.bar.bridge"), + new ActiveMQQueue("include.test.bar.bridge2")); + + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}); + + MessageProducer includedProducer = localSession.createProducer(included); + Thread.sleep(2000); + Message test = localSession.createTextMessage("test"); + + final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics(); + final DestinationStatistics remoteDestStatistics = remoteBroker.getDestination( + new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics(); + final DestinationStatistics remoteDestStatistics2 = remoteBroker.getDestination( + new ActiveMQQueue("include.test.bar.bridge2")).getDestinationStatistics(); + + Thread.sleep(2000); + //two advisory messages sent for each target when destinations are created + assertRemoteAdvisoryCount(advisoryConsumer, 2); + assertAdvisoryBrokerCounts(1,2,2); + + waitForConsumerCount(destinationStatistics, 1); + + includedProducer.send(test); + + waitForDispatchFromLocalBroker(destinationStatistics, 1); + assertLocalBrokerStatistics(destinationStatistics, 1); + + assertEquals("remote dest messages", 1, remoteDestStatistics.getMessages().getCount()); + assertEquals("remote2 dest messages", 1, remoteDestStatistics2.getMessages().getCount()); + + compositeTopic = createCompositeTopic(testTopicName, + new ActiveMQQueue("include.test.bar.bridge")); + + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}, true); + Thread.sleep(2000); + + includedProducer.send(test); + + waitForDispatchFromLocalBroker(destinationStatistics, 2); + assertLocalBrokerStatistics(destinationStatistics, 2); + + assertEquals("remote dest messages", 2, remoteDestStatistics.getMessages().getCount()); + assertEquals("remote2 dest messages", 1, remoteDestStatistics2.getMessages().getCount()); + + //We delete 2, and re-add 1 target queue + assertRemoteAdvisoryCount(advisoryConsumer, 3); + assertAdvisoryBrokerCounts(1,1,1); + + } + + /** + * Test that messages still flow after removing one of the destinations that is a target + * but the other one sticks around + * + * @throws Exception + */ + @Test(timeout = 60 * 1000) + public void testTwoTargetsRemove1Destination() throws Exception { + Assume.assumeTrue(isUseVirtualDestSubsOnCreation); + + doSetUp(true, null); + + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); + + //configure a virtual destination that forwards messages from topic testQueueName + //to queue "include.test.bar.bridge" and "include.test.bar.bridge2" + CompositeTopic compositeTopic = createCompositeTopic(testTopicName, + new ActiveMQQueue("include.test.bar.bridge"), + new ActiveMQQueue("include.test.bar.bridge2")); + + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}); + + MessageProducer includedProducer = localSession.createProducer(included); + Thread.sleep(2000); + Message test = localSession.createTextMessage("test"); + + final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics(); + final DestinationStatistics remoteDestStatistics = remoteBroker.getDestination( + new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics(); + final DestinationStatistics remoteDestStatistics2 = remoteBroker.getDestination( + new ActiveMQQueue("include.test.bar.bridge2")).getDestinationStatistics(); + + waitForConsumerCount(destinationStatistics, 1); + + includedProducer.send(test); + + waitForDispatchFromLocalBroker(destinationStatistics, 1); + assertLocalBrokerStatistics(destinationStatistics, 1); + + assertEquals("remote dest messages", 1, remoteDestStatistics.getMessages().getCount()); + assertEquals("remote2 dest messages", 1, remoteDestStatistics2.getMessages().getCount()); + + remoteBroker.removeDestination(new ActiveMQQueue("include.test.bar.bridge2")); + Thread.sleep(2000); + //2 for each target queue destination in the virtual subscription + //1 for the removal of a queue + assertRemoteAdvisoryCount(advisoryConsumer, 3); + assertAdvisoryBrokerCounts(1,1,1); + + includedProducer.send(test); + + //make sure messages are still forwarded even after 1 target was deleted + waitForDispatchFromLocalBroker(destinationStatistics, 2); + assertLocalBrokerStatistics(destinationStatistics, 2); + + assertEquals("remote dest messages", 2, remoteDestStatistics.getMessages().getCount()); + + //1 because a send causes the queue to be recreated again which sends a new demand advisory + assertRemoteAdvisoryCount(advisoryConsumer, 1); + assertAdvisoryBrokerCounts(1,2,2); + + } + + /** + * Test that demand is destroyed after removing both targets from the composite Topic + * @throws Exception + */ + @Test(timeout = 60 * 1000) + public void testTwoTargetsRemoveBoth() throws Exception { + Assume.assumeTrue(isUseVirtualDestSubsOnCreation); + doSetUp(true, null); + + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); + + //configure a virtual destination that forwards messages from topic testQueueName + //to queue "include.test.bar.bridge" and "include.test.bar.bridge2" + CompositeTopic compositeTopic = createCompositeTopic(testTopicName, + new ActiveMQQueue("include.test.bar.bridge"), + new ActiveMQQueue("include.test.bar.bridge2")); + + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}); + + MessageProducer includedProducer = localSession.createProducer(included); + Thread.sleep(2000); + Message test = localSession.createTextMessage("test"); + + final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics(); + final DestinationStatistics remoteDestStatistics = remoteBroker.getDestination( + new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics(); + final DestinationStatistics remoteDestStatistics2 = remoteBroker.getDestination( + new ActiveMQQueue("include.test.bar.bridge2")).getDestinationStatistics(); + + waitForConsumerCount(destinationStatistics, 1); + + includedProducer.send(test); + + waitForDispatchFromLocalBroker(destinationStatistics, 1); + assertLocalBrokerStatistics(destinationStatistics, 1); + + assertEquals("remote dest messages", 1, remoteDestStatistics.getMessages().getCount()); + assertEquals("remote2 dest messages", 1, remoteDestStatistics2.getMessages().getCount()); + + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {}, true); + Thread.sleep(2000); + includedProducer.send(test); + + Thread.sleep(2000); + assertLocalBrokerStatistics(destinationStatistics, 1); + + assertEquals("remote dest messages", 1, remoteDestStatistics.getMessages().getCount()); + assertEquals("remote2 dest messages", 1, remoteDestStatistics2.getMessages().getCount()); + + //2 for each target queue destination in the virtual subscription + //2 for the removal of the virtual destination, which requires 2 advisories because there are 2 targets + assertRemoteAdvisoryCount(advisoryConsumer, 4); + assertAdvisoryBrokerCounts(0,0,0); + } + + /** + * Test that dynamic flow works when the destination is created before the + * virtual destination has been added to the broker + * + * @throws Exception + */ + @Test(timeout = 60 * 1000) + public void testDestinationAddedFirst() throws Exception { + Assume.assumeTrue(isUseVirtualDestSubsOnCreation); + doSetUp(true, null); + + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); + + remoteBroker.getBroker().addDestination(remoteBroker.getAdminConnectionContext(), + new ActiveMQQueue("include.test.bar.bridge"), false); + + Thread.sleep(2000); + //configure a virtual destination that forwards messages from topic testQueueName + //to queue "include.test.bar.bridge" + CompositeTopic compositeTopic = createCompositeTopic(testTopicName, + new ActiveMQQueue("include.test.bar.bridge")); + + final DestinationStatistics remoteDestStatistics = remoteBroker.getDestination( + new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics(); + + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}, true); + + MessageProducer includedProducer = localSession.createProducer(included); + Thread.sleep(2000); + Message test = localSession.createTextMessage("test"); + + final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics(); + + waitForConsumerCount(destinationStatistics, 1); + + includedProducer.send(test); + + waitForDispatchFromLocalBroker(destinationStatistics, 1); + assertLocalBrokerStatistics(destinationStatistics, 1); + assertEquals("remote dest messages", 1, remoteDestStatistics.getMessages().getCount()); + + assertRemoteAdvisoryCount(advisoryConsumer, 1); + assertAdvisoryBrokerCounts(1,1,1); + } + + /** + * This test shows that a consumer listening on the target of a composite destination will create + * a virtual consumer and cause demand so that the consumer will receive messages, regardless + * of whether isUseVirtualDestSubsOnCreation is true or false + * + * @throws Exception + */ + @Test(timeout = 60 * 1000) + public void testWithConsumer() throws Exception { + doSetUp(true, null); + + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); + + //configure a virtual destination that forwards messages from topic testQueueName + //to queue "include.test.bar.bridge" + CompositeTopic compositeTopic = createCompositeTopic(testTopicName, + new ActiveMQQueue("include.test.bar.bridge")); + + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}); + + MessageProducer includedProducer = localSession.createProducer(included); + Thread.sleep(2000); + Message test = localSession.createTextMessage("test"); + + final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics(); + + MessageConsumer bridgeConsumer = remoteSession.createConsumer(new ActiveMQQueue("include.test.bar.bridge")); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + //should only be 1 because of conduit subs even though there is 2 consumers + //for the case where isUseVirtualDestSubsOnCreation is true, + //1 for the composite destination creation and 1 for the actual consumer + return 1 == destinationStatistics.getConsumers().getCount(); + } + }); + + includedProducer.send(test); + assertNotNull(bridgeConsumer.receive(5000)); + + waitForDispatchFromLocalBroker(destinationStatistics, 1); + + assertLocalBrokerStatistics(destinationStatistics, 1); + + //if isUseVirtualDestSubsOnCreation is true we should have + //two advisory consumer messages, else 1 + assertRemoteAdvisoryCount(advisoryConsumer, 2, 1); + if (isUseVirtualDestSubsOnCreation) { + assertAdvisoryBrokerCounts(1,2,1); + } else { + assertAdvisoryBrokerCounts(1,1,0); + } + + } + + /** + * Test that demand still exists when only 1 of 2 consumers is removed from the + * destination + * + * @throws Exception + */ + @Test(timeout = 60 * 1000) + public void testWith2ConsumersRemove1() throws Exception { + doSetUp(true, null); + + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); + + //configure a virtual destination that forwards messages from topic testQueueName + //to queue "include.test.bar.bridge" + CompositeTopic compositeTopic = createCompositeTopic(testTopicName, + new ActiveMQQueue("include.test.bar.bridge")); + + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}); + + MessageProducer includedProducer = localSession.createProducer(included); + Thread.sleep(2000); + Message test = localSession.createTextMessage("test"); + + final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics(); + + MessageConsumer bridgeConsumer = remoteSession.createConsumer(new ActiveMQQueue("include.test.bar.bridge")); + MessageConsumer bridgeConsumer2 = remoteSession.createConsumer(new ActiveMQQueue("include.test.bar.bridge")); + + //should only be 1 because of conduit subs even though there is 2 consumers + //for the case where isUseVirtualDestSubsOnCreation is true, + //1 for the composite destination creation and 1 for the actual consumer + waitForConsumerCount(destinationStatistics, 1); + + includedProducer.send(test); + waitForDispatchFromLocalBroker(destinationStatistics, 1); + assertTrue(bridgeConsumer.receive(5000) != null || bridgeConsumer2.receive(5000) != null); + + assertLocalBrokerStatistics(destinationStatistics, 1); + + bridgeConsumer2.close(); + + includedProducer.send(test); + + //make sure the message is still forwarded + waitForDispatchFromLocalBroker(destinationStatistics, 2); + assertLocalBrokerStatistics(destinationStatistics, 2); + assertNotNull(bridgeConsumer.receive(5000)); + + assertRemoteAdvisoryCount(advisoryConsumer, 4, 3); + if (isUseVirtualDestSubsOnCreation) { + assertAdvisoryBrokerCounts(1,2,1); + } else { + assertAdvisoryBrokerCounts(1,1,0); + } + } + + /** + * Test that demand is removed after both consumers are removed when + * isUseVirtualDestSubsOnCreation is false + * + * @throws Exception + */ + @Test(timeout = 60 * 1000) + public void testWith2ConsumersRemoveBoth() throws Exception { + Assume.assumeTrue(!isUseVirtualDestSubsOnCreation); + + doSetUp(true, null); + + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); + + //configure a virtual destination that forwards messages from topic testQueueName + //to queue "include.test.bar.bridge" + CompositeTopic compositeTopic = createCompositeTopic(testTopicName, + new ActiveMQQueue("include.test.bar.bridge")); + + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}); + + MessageProducer includedProducer = localSession.createProducer(included); + Thread.sleep(2000); + Message test = localSession.createTextMessage("test"); + + final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics(); + + MessageConsumer bridgeConsumer = remoteSession.createConsumer(new ActiveMQQueue("include.test.bar.bridge")); + MessageConsumer bridgeConsumer2 = remoteSession.createConsumer(new ActiveMQQueue("include.test.bar.bridge")); + + //should only be 1 because of conduit subs even though there is 2 consumers + //for the case where isUseVirtualDestSubsOnCreation is true, + //1 for the composite destination creation and 1 for the actual consumer + waitForConsumerCount(destinationStatistics, 1); + assertAdvisoryBrokerCounts(1,2,0); + + includedProducer.send(test); + waitForDispatchFromLocalBroker(destinationStatistics, 1); + assertTrue(bridgeConsumer.receive(5000) != null || bridgeConsumer2.receive(5000) != null); + + assertLocalBrokerStatistics(destinationStatistics, 1); + + bridgeConsumer.close(); + bridgeConsumer2.close(); + + Thread.sleep(2000); + includedProducer.send(test); + Thread.sleep(2000); + + assertLocalBrokerStatistics(destinationStatistics, 1); + + //in this test, virtual destinations don't cause demand, only consumers on them + //so we should have 2 create and 2 destroy + assertRemoteAdvisoryCount(advisoryConsumer, 4); + assertAdvisoryBrokerCounts(1,0,0); + + } + + /** + * Show that messages won't be send for an excluded destination + * + * @throws Exception + */ + @Test(timeout = 60 * 1000) + public void testExcluded() throws Exception { + doSetUp(true, null); + + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); + + //configure a virtual destination that forwards messages to an excluded destination + CompositeTopic compositeTopic = createCompositeTopic("excluded.test.bar", + new ActiveMQQueue("excluded.test.bar.bridge")); + + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}); + + MessageProducer includedProducer = localSession.createProducer(excluded); + // allow for consumer infos to perculate arround + Thread.sleep(2000); + Message test = localSession.createTextMessage("test"); + + + MessageConsumer bridgeConsumer = remoteSession.createConsumer(new ActiveMQQueue("excluded.test.bar.bridge")); + Thread.sleep(2000); + includedProducer.send(test); + assertNull(bridgeConsumer.receive(5000)); + + final DestinationStatistics destinationStatistics = localBroker.getDestination(excluded).getDestinationStatistics(); + assertEquals("broker consumer count", 0, destinationStatistics.getConsumers().getCount()); + + assertLocalBrokerStatistics(destinationStatistics, 0); + + assertRemoteAdvisoryCount(advisoryConsumer, 0); + if (isUseVirtualDestSubsOnCreation) { + assertAdvisoryBrokerCounts(1,2,1); + } else { + assertAdvisoryBrokerCounts(1,1,0); + } + + } + + /** + * Test that demand will be created when using a composite queue instead of a composite topic + * + * @throws Exception + */ + @Test(timeout = 60 * 1000) + public void testSourceQueue() throws Exception { + doSetUp(true, null); + + MessageConsumer advisoryConsumer = getQueueVirtualDestinationAdvisoryConsumer(testQueueName); + + //configure a virtual destination that forwards messages from queue testQueueName + //to topic "include.test.foo.bridge" + CompositeQueue compositeQueue = createCompositeQueue(testQueueName, + new ActiveMQQueue("include.test.foo.bridge")); + + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeQueue}); + + MessageProducer includedProducer = localSession.createProducer(new ActiveMQQueue(testQueueName)); + Thread.sleep(2000); + Message test = localSession.createTextMessage("test"); + + final DestinationStatistics destinationStatistics = localBroker.getDestination(new ActiveMQQueue(testQueueName)).getDestinationStatistics(); + MessageConsumer bridgeConsumer = remoteSession.createConsumer(new ActiveMQQueue("include.test.foo.bridge")); + waitForConsumerCount(destinationStatistics, 1); + + includedProducer.send(test); + assertNotNull(bridgeConsumer.receive(5000)); + + final DestinationStatistics remoteStats = remoteBroker.getDestination(new ActiveMQQueue(testQueueName)).getDestinationStatistics(); + + waitForDispatchFromLocalBroker(destinationStatistics, 1); + + //should only be 1 because of conduit subs + assertEquals("broker consumer count", 1, destinationStatistics.getConsumers().getCount()); + + assertLocalBrokerStatistics(destinationStatistics, 1); + + //check remote stats - confirm the message isn't on the remote queue and was forwarded only + //since that's how the composite queue was set up + assertEquals("message count", 0, remoteStats.getMessages().getCount()); + + assertRemoteAdvisoryCount(advisoryConsumer, 2, 1); + if (isUseVirtualDestSubsOnCreation) { + assertAdvisoryBrokerCounts(1,2,1); + } else { + assertAdvisoryBrokerCounts(1,1,0); + } + } + + + /** + * Test that the demand will be removed if the virtual destination is deleted + * + * @throws Exception + */ + @Test(timeout = 60 * 1000) + public void testFlowRemoved() throws Exception { + CompositeTopic compositeTopic = createCompositeTopic(testTopicName, + new ActiveMQQueue("include.test.bar.bridge")); + + doSetUp(true, new VirtualDestination[] {compositeTopic}); + + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); + + //sleep to allow the route to be set up + Thread.sleep(2000); + + remoteBroker.getBroker().addDestination(remoteBroker.getAdminConnectionContext(), + new ActiveMQQueue("include.test.bar.bridge"), false); + + Thread.sleep(2000); + + //remove the virtual destinations after startup + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {}, true); + + MessageProducer includedProducer = localSession.createProducer(included); + Thread.sleep(2000); + Message test = localSession.createTextMessage("test"); + + //assert that no message was received + //by the time we get here, there is no more virtual destinations so this won't + //trigger demand + MessageConsumer bridgeConsumer = remoteSession.createConsumer(new ActiveMQQueue("include.test.bar.bridge")); + Thread.sleep(2000); + includedProducer.send(test); + assertNull(bridgeConsumer.receive(5000)); + + assertRemoteAdvisoryCount(advisoryConsumer, 2, 0); + assertAdvisoryBrokerCounts(0,0,0); + } + + @Test(timeout = 60 * 1000) + public void testReplay() throws Exception { + Assume.assumeTrue(isUseVirtualDestSubsOnCreation); + + CompositeTopic compositeTopic = createCompositeTopic(testTopicName, + new ActiveMQQueue("include.test.bar.bridge")); + + doSetUp(true, new VirtualDestination[] {compositeTopic}, false); + + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); + + Thread.sleep(2000); + + remoteBroker.getBroker().addDestination(remoteBroker.getAdminConnectionContext(), + new ActiveMQQueue("include.test.bar.bridge"), false); + + Thread.sleep(2000); + + //start the local broker after establishing the virtual topic to test replay + localBroker.addNetworkConnector(connector); + connector.start(); + + Thread.sleep(2000); + + //there should be an extra advisory because of replay + assertRemoteAdvisoryCount(advisoryConsumer, 2); + assertAdvisoryBrokerCounts(1,1,1); + } + + @Test(timeout = 60 * 1000) + public void testReplayWithConsumer() throws Exception { + + CompositeTopic compositeTopic = createCompositeTopic(testTopicName, + new ActiveMQQueue("include.test.bar.bridge")); + + doSetUp(true, new VirtualDestination[] {compositeTopic}, false); + + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); + + Thread.sleep(2000); + + remoteBroker.getBroker().addDestination(remoteBroker.getAdminConnectionContext(), + new ActiveMQQueue("include.test.bar.bridge"), false); + + Thread.sleep(2000); + + MessageProducer includedProducer = localSession.createProducer(included); + Message test = localSession.createTextMessage("test"); + MessageConsumer bridgeConsumer = remoteSession.createConsumer(new ActiveMQQueue("include.test.bar.bridge")); + Thread.sleep(2000); + + //start the local broker after establishing the virtual topic to test replay + localBroker.addNetworkConnector(connector); + connector.start(); + + Thread.sleep(2000); + includedProducer.send(test); + assertNotNull(bridgeConsumer.receive(5000)); + + //with isUseVirtualDestSubsOnCreation is true, there should be 4 advisories (2 replay) + //with !isUseVirtualDestSubsOnCreation, there should be 2 advisories (1 replay) + assertRemoteAdvisoryCount(advisoryConsumer, 4, 2); + if (isUseVirtualDestSubsOnCreation) { + assertAdvisoryBrokerCounts(1,2,1); + } else { + assertAdvisoryBrokerCounts(1,1,0); + } + } + + /** + * Test that the demand will be removed if the virtual destination is deleted + * + * @throws Exception + */ + @Test(timeout = 60 * 1000) + public void testRemovedIfNoConsumer() throws Exception { + Assume.assumeTrue(isUseVirtualDestSubsOnCreation); + + CompositeTopic compositeTopic = createCompositeTopic(testTopicName, + new ActiveMQQueue("include.test.bar.bridge")); + + doSetUp(true, new VirtualDestination[] {compositeTopic}); + + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); + + Thread.sleep(2000); + + //destination creation will trigger the advisory since the virtual topic exists + final DestinationStatistics destinationStatistics = + localBroker.getDestination(new ActiveMQQueue(testQueueName)).getDestinationStatistics(); + final DestinationStatistics remoteDestStatistics = remoteBroker.getDestination( + new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics(); + + Thread.sleep(2000); + assertAdvisoryBrokerCounts(1,1,1); + + //remove the virtual destinations after startup, will trigger a remove advisory + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {}); + + MessageProducer includedProducer = localSession.createProducer(included); + Thread.sleep(2000); + Message test = localSession.createTextMessage("test"); + includedProducer.send(test); + + assertEquals("broker consumer count", 0, destinationStatistics.getConsumers().getCount()); + assertLocalBrokerStatistics(destinationStatistics, 0); + assertEquals("remote dest messages", 0, remoteDestStatistics.getMessages().getCount()); + + //one add and one remove advisory + assertRemoteAdvisoryCount(advisoryConsumer, 2); + assertAdvisoryBrokerCounts(0,0,0); + } + + + /** + * Test that demand is created when the target of the compositeTopic is another topic + * and a consumer comes online + * @throws Exception + */ + @Test(timeout = 60 * 1000) + public void testToTopic() throws Exception { + doSetUp(true, null); + + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); + + //configure a virtual destination that forwards messages from topic testQueueName + //to topic "include.test.bar.bridge" + CompositeTopic compositeTopic = createCompositeTopic(testTopicName, + new ActiveMQTopic("include.test.bar.bridge")); + + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}); + + MessageProducer includedProducer = localSession.createProducer(included); + Thread.sleep(2000); + Message test = localSession.createTextMessage("test"); + + MessageConsumer bridgeConsumer = remoteSession.createConsumer(new ActiveMQTopic("include.test.bar.bridge")); + Thread.sleep(2000); + includedProducer.send(test); + assertNotNull(bridgeConsumer.receive(5000)); + + assertRemoteAdvisoryCount(advisoryConsumer, 1); + assertAdvisoryBrokerCounts(1,1,0); + } + + /** + * Test that demand is NOT created when the target of the compositeTopic is another topic + * and there are no consumers since the existience of a topic shouldn't case demand without + * a consumer or durable on it + * + * @throws Exception + */ + @Test(timeout = 60 * 1000) + public void testToTopicNoConsumer() throws Exception { + doSetUp(true, null); + + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); + + //configure a virtual destination that forwards messages from topic testQueueName + //to topic "include.test.bar.bridge" + CompositeTopic compositeTopic = createCompositeTopic(testTopicName, + new ActiveMQTopic("include.test.bar.bridge")); + + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}); + + MessageProducer includedProducer = localSession.createProducer(included); + Thread.sleep(2000); + Message test = localSession.createTextMessage("test"); + includedProducer.send(test); + + final DestinationStatistics destinationStatistics = localBroker.getDestination(excluded).getDestinationStatistics(); + assertEquals("broker consumer count", 0, destinationStatistics.getConsumers().getCount()); + assertLocalBrokerStatistics(destinationStatistics, 0); + + assertRemoteAdvisoryCount(advisoryConsumer, 0); + assertAdvisoryBrokerCounts(1,0,0); + } + + /** + * Test that demand will be created because of the existing of a durable subscription + * created on a topic that is the target of a compositeTopic + */ + @Test(timeout = 60 * 1000) + public void testToTopicWithDurable() throws Exception { + doSetUp(true, null); + + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); + + //configure a virtual destination that forwards messages from topic testQueueName + //to topic "include.test.bar.bridge" + CompositeTopic compositeTopic = createCompositeTopic(testTopicName, + new ActiveMQTopic("include.test.bar.bridge")); + + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}); + + MessageProducer includedProducer = localSession.createProducer(included); + Thread.sleep(2000); + Message test = localSession.createTextMessage("test"); + + + final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics(); + + MessageConsumer bridgeConsumer = remoteSession.createDurableSubscriber( + new ActiveMQTopic("include.test.bar.bridge"), "sub1"); + Thread.sleep(2000); + includedProducer.send(test); + assertNotNull(bridgeConsumer.receive(5000)); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 1 == destinationStatistics.getDequeues().getCount(); + } + }); + + assertEquals("broker dest stat dispatched", 1, destinationStatistics.getDispatched().getCount()); + assertEquals("broker dest stat dequeues", 1, destinationStatistics.getDequeues().getCount()); + + assertRemoteAdvisoryCount(advisoryConsumer, 1); + assertAdvisoryBrokerCounts(1,1,0); + + } + + /** + * Test that messages still flow to the durable subscription on the forwarded + * destination even if it is offline + */ + @Test(timeout = 60 * 1000) + public void testToTopicWithDurableOffline() throws Exception { + doSetUp(true, null); + + MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); + + //configure a virtual destination that forwards messages from topic testQueueName + //to topic "include.test.bar.bridge" + CompositeTopic compositeTopic = createCompositeTopic(testTopicName, + new ActiveMQTopic("include.test.bar.bridge")); + + runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}); + + MessageProducer includedProducer = localSession.createProducer(included); + Thread.sleep(2000); + Message test = localSession.createTextMessage("test"); + + final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics(); + + //create a durable subscription and go offline + MessageConsumer bridgeConsumer = remoteSession.createDurableSubscriber( + new ActiveMQTopic("include.test.bar.bridge"), "sub1"); + bridgeConsumer.close(); + Thread.sleep(2000); + includedProducer.send(test); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 1 == destinationStatistics.getDequeues().getCount() && + destinationStatistics.getDispatched().getCount() == 1; + } + }); + + //offline durable should still get receive the message over the bridge and ack + assertEquals("broker dest stat dispatched", 1, destinationStatistics.getDispatched().getCount()); + assertEquals("broker dest stat dequeues", 1, destinationStatistics.getDequeues().getCount()); + + //reconnect to receive the message + MessageConsumer bridgeConsumer2 = remoteSession.createDurableSubscriber( + new ActiveMQTopic("include.test.bar.bridge"), "sub1"); + assertNotNull(bridgeConsumer2.receive(5000)); + + Thread.sleep(2000); + //make sure stats did not change + assertEquals("broker dest stat dispatched", 1, destinationStatistics.getDispatched().getCount()); + assertEquals("broker dest stat dequeues", 1, destinationStatistics.getDequeues().getCount()); + + assertRemoteAdvisoryCount(advisoryConsumer, 3); + assertAdvisoryBrokerCounts(1,1,0); + + } + + @Before + public void setUp() throws Exception { + + } + + @After + public void tearDown() throws Exception { + doTearDown(); + } + + protected void doTearDown() throws Exception { + if (localConnection != null) { + localConnection.close(); + } + if (remoteConnection != null) { + remoteConnection.close(); + } + if (localBroker != null) { + localBroker.stop(); + } + if (remoteBroker != null) { + remoteBroker.stop(); + } + } + + + protected void doSetUp(boolean deleteAllMessages, + VirtualDestination[] remoteVirtualDests) throws Exception { + doSetUp(deleteAllMessages, remoteVirtualDests, true); + } + + protected void doSetUp(boolean deleteAllMessages, + VirtualDestination[] remoteVirtualDests, boolean startNetworkConnector) throws Exception { + remoteBroker = createRemoteBroker(isUseVirtualDestSubsOnCreation, remoteVirtualDests); + remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); + remoteBroker.start(); + remoteBroker.waitUntilStarted(); + localBroker = createLocalBroker(startNetworkConnector); + localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); + localBroker.start(); + localBroker.waitUntilStarted(); + URI localURI = localBroker.getVmConnectorURI(); + ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI); + fac.setAlwaysSyncSend(true); + fac.setDispatchAsync(false); + localConnection = fac.createConnection(); + localConnection.setClientID("clientId"); + localConnection.start(); + URI remoteURI = remoteBroker.getVmConnectorURI(); + fac = new ActiveMQConnectionFactory(remoteURI); + remoteConnection = fac.createConnection(); + remoteConnection.setClientID("clientId"); + remoteConnection.start(); + included = new ActiveMQTopic(testTopicName); + excluded = new ActiveMQTopic("exclude.test.bar"); + localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + + protected NetworkConnector connector; + protected BrokerService createLocalBroker(boolean startNetworkConnector) throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setMonitorConnectionSplits(true); + brokerService.setDataDirectoryFile(tempFolder.newFolder()); + brokerService.setBrokerName("localBroker"); + + connector = new DiscoveryNetworkConnector(new URI("static:(tcp://localhost:61617)")); + connector.setName("networkConnector"); + connector.setDynamicOnly(false); + connector.setDecreaseNetworkConsumerPriority(false); + connector.setConduitSubscriptions(true); + connector.setDuplex(isDuplex); + connector.setUseVirtualDestSubs(true); + connector.setDynamicallyIncludedDestinations(Lists.newArrayList(new ActiveMQQueue(testQueueName), + new ActiveMQTopic(testTopicName), new ActiveMQTopic("VirtualTopic.>"))); + connector.setExcludedDestinations(Lists.newArrayList(new ActiveMQQueue("exclude.test.foo"), + new ActiveMQTopic("exclude.test.bar"))); + + if (startNetworkConnector) { + brokerService.addNetworkConnector(connector); + } + + brokerService.addConnector("tcp://localhost:61616"); + + return brokerService; + } + + protected AdvisoryBroker remoteAdvisoryBroker; + protected BrokerService createRemoteBroker(boolean isUsevirtualDestinationSubscriptionsOnCreation, + VirtualDestination[] remoteVirtualDests) throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setBrokerName("remoteBroker"); + brokerService.setUseJmx(false); + brokerService.setDataDirectoryFile(tempFolder.newFolder()); + brokerService.setPlugins(new BrokerPlugin[] {new JavaRuntimeConfigurationPlugin()}); + brokerService.setUseVirtualDestSubs(true); + brokerService.setUseVirtualDestSubsOnCreation(isUsevirtualDestinationSubscriptionsOnCreation); + + //apply interceptor before getting the broker, which will cause it to be built + if (remoteVirtualDests != null) { + VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); + interceptor.setVirtualDestinations(remoteVirtualDests); + brokerService.setDestinationInterceptors(new DestinationInterceptor[]{interceptor}); + } + + runtimeBroker = (JavaRuntimeConfigurationBroker) + brokerService.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class); + remoteAdvisoryBroker = (AdvisoryBroker) + brokerService.getBroker().getAdaptor(AdvisoryBroker.class); + + NetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:(tcp://localhost:61616)")); + brokerService.addNetworkConnector(connector); + + brokerService.addConnector("tcp://localhost:61617"); + + + + return brokerService; + } + + protected CompositeTopic createCompositeTopic(String name, ActiveMQDestination...forwardTo) { + CompositeTopic compositeTopic = new CompositeTopic(); + compositeTopic.setName(name); + compositeTopic.setForwardOnly(true); + compositeTopic.setForwardTo( Lists.newArrayList(forwardTo)); + + return compositeTopic; + } + + protected CompositeQueue createCompositeQueue(String name, ActiveMQDestination...forwardTo) { + CompositeQueue compositeQueue = new CompositeQueue(); + compositeQueue.setName(name); + compositeQueue.setForwardOnly(true); + compositeQueue.setForwardTo( Lists.newArrayList(forwardTo)); + + return compositeQueue; + } + + protected void waitForConsumerCount(final DestinationStatistics destinationStatistics, final int count) throws Exception { + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + //should only be 1 for the composite destination creation + return count == destinationStatistics.getConsumers().getCount(); + } + }); + } + + protected void waitForDispatchFromLocalBroker(final DestinationStatistics destinationStatistics, final int count) throws Exception { + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return count == destinationStatistics.getDequeues().getCount() && + count == destinationStatistics.getDispatched().getCount() && + count == destinationStatistics.getForwards().getCount(); + } + }); + } + + protected MessageConsumer getVirtualDestinationAdvisoryConsumer(String topic) throws JMSException { + return remoteSession.createConsumer(AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic( + new ActiveMQTopic(topic))); + } + + protected MessageConsumer getQueueVirtualDestinationAdvisoryConsumer(String queue) throws JMSException { + return remoteSession.createConsumer(AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic( + new ActiveMQQueue(queue))); + } + + protected void assertLocalBrokerStatistics(final DestinationStatistics localStatistics, final int count) { + assertEquals("local broker dest stat dispatched", count, localStatistics.getDispatched().getCount()); + assertEquals("local broker dest stat dequeues", count, localStatistics.getDequeues().getCount()); + assertEquals("local broker dest stat forwards", count, localStatistics.getForwards().getCount()); + } + + protected void assertRemoteAdvisoryCount(final MessageConsumer advisoryConsumer, final int count) throws JMSException { + int available = 0; + ActiveMQMessage message = null; + while ((message = (ActiveMQMessage) advisoryConsumer.receive(1000)) != null) { + available++; + LOG.debug("advisory data structure: {}", message.getDataStructure()); + } + assertEquals(count, available); + } + + protected void assertRemoteAdvisoryCount(final MessageConsumer advisoryConsumer, + final int isSubOnCreationCount, final int isNotSubOnCreationCount) throws JMSException { + if (isUseVirtualDestSubsOnCreation) { + assertRemoteAdvisoryCount(advisoryConsumer, isSubOnCreationCount); + } else { + assertRemoteAdvisoryCount(advisoryConsumer, isNotSubOnCreationCount); + } + } + + @SuppressWarnings("unchecked") + protected void assertAdvisoryBrokerCounts(int virtualDestinationsCount, + int virtualDestinationConsumersCount, int brokerConsumerDestsCount) throws Exception { + + Field virtualDestinationsField = AdvisoryBroker.class.getDeclaredField("virtualDestinations"); + Field virtualDestinationConsumersField = AdvisoryBroker.class.getDeclaredField("virtualDestinationConsumers"); + Field brokerConsumerDestsField = AdvisoryBroker.class.getDeclaredField("brokerConsumerDests"); + + virtualDestinationsField.setAccessible(true); + virtualDestinationConsumersField.setAccessible(true); + brokerConsumerDestsField.setAccessible(true); + + Set virtualDestinations = (Set) + virtualDestinationsField.get(remoteAdvisoryBroker); + + ConcurrentMap virtualDestinationConsumers = + (ConcurrentMap) + virtualDestinationConsumersField.get(remoteAdvisoryBroker); + + ConcurrentMap brokerConsumerDests = + (ConcurrentMap) + brokerConsumerDestsField.get(remoteAdvisoryBroker); + + assertEquals(virtualDestinationsCount, virtualDestinations.size()); + assertEquals(virtualDestinationConsumersCount, virtualDestinationConsumers.size()); + assertEquals(brokerConsumerDestsCount, brokerConsumerDests.size()); + } + +}