diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index c75b70cf63..eb65fb1dba 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -129,7 +129,9 @@ public abstract class AbstractRegion implements Region { dest.start(); destinations.put(destination, dest); destinationMap.put(destination, dest); - addSubscriptionsForDestination(context, dest); + if (!dest.getActiveMQDestination().isPattern()) { + addSubscriptionsForDestination(context, dest); + } } if (dest == null) { throw new JMSException("The destination " + destination + " does not exist."); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/CompositeDestinationInterceptor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/CompositeDestinationInterceptor.java index a460e8138b..c99605ef5c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/CompositeDestinationInterceptor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/CompositeDestinationInterceptor.java @@ -16,6 +16,10 @@ */ package org.apache.activemq.broker.region; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQDestination; + /** * Represents a Composite Pattern of a {@link DestinationInterceptor} * @@ -42,5 +46,11 @@ public class CompositeDestinationInterceptor implements DestinationInterceptor { interceptors[i].remove(destination); } } + + public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception { + for (int i = 0; i < interceptors.length; i++) { + interceptors[i].create(broker, context, destination); + } + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationInterceptor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationInterceptor.java index 3bd67b4fd2..8687589e6b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationInterceptor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationInterceptor.java @@ -17,6 +17,10 @@ package org.apache.activemq.broker.region; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQDestination; + /** * Represents an interceptor on destination instances. * @@ -28,4 +32,6 @@ public interface DestinationInterceptor { void remove(Destination destination); + void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception; + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 8ca0e762b1..c844983ed2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -438,6 +438,9 @@ public class RegionBroker extends EmptyBroker { @Override public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { ActiveMQDestination destination = info.getDestination(); + if (destinationInterceptor != null) { + destinationInterceptor.create(this, context, destination); + } synchronized (purgeInactiveDestinationsTask) { switch (destination.getDestinationType()) { case ActiveMQDestination.QUEUE_TYPE: diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java index c84ddac6b8..e3416f3b05 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java @@ -18,7 +18,10 @@ package org.apache.activemq.broker.region.virtual; import java.util.Collection; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.command.ActiveMQDestination; /** * @@ -35,6 +38,8 @@ public abstract class CompositeDestination implements VirtualDestination { return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(), isCopyMessage()); } + public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) { + } public void remove(Destination destination) { } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java index 4e4b247ed8..ea5af02848 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java @@ -16,9 +16,7 @@ */ package org.apache.activemq.broker.region.virtual; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.BrokerServiceAware; -import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.*; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationFilter; import org.apache.activemq.broker.region.DestinationInterceptor; @@ -86,6 +84,8 @@ public class MirroredQueue implements DestinationInterceptor, BrokerServiceAware } + public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) {} + // Properties // ------------------------------------------------------------------------- diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestination.java index 57d742c20b..21a383d676 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestination.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region.virtual; +import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationInterceptor; import org.apache.activemq.command.ActiveMQDestination; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java index 2e3a6a7ec9..85e605a328 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java @@ -21,10 +21,13 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationFilter; import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.filter.DestinationMap; @@ -45,8 +48,8 @@ public class VirtualDestinationInterceptor implements DestinationInterceptor { List destinations = new ArrayList(); for (Iterator iter = virtualDestinations.iterator(); iter.hasNext();) { VirtualDestination virtualDestination = (VirtualDestination)iter.next(); - Destination newNestination = virtualDestination.intercept(destination); - destinations.add(newNestination); + Destination newDestination = virtualDestination.intercept(destination); + destinations.add(newDestination); } if (!destinations.isEmpty()) { if (destinations.size() == 1) { @@ -60,6 +63,12 @@ public class VirtualDestinationInterceptor implements DestinationInterceptor { } + public synchronized void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception { + for (VirtualDestination virt: virtualDestinations) { + virt.create(broker, context, destination); + } + } + public synchronized void remove(Destination destination) { } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java index e32b4f2355..3721459e1a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java @@ -16,9 +16,13 @@ */ package org.apache.activemq.broker.region.virtual; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.filter.DestinationFilter; /** * Creates Virtual @@ -48,6 +52,15 @@ public class VirtualTopic implements VirtualDestination { } + public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception { + if (destination.isQueue() && destination.isPattern() && broker.getDestinations(destination).isEmpty()) { + DestinationFilter filter = DestinationFilter.parseFilter(new ActiveMQQueue(prefix + DestinationFilter.ANY_DESCENDENT)); + if (filter.matches(destination)) { + broker.addDestination(context, destination, false); + } + } + } + public void remove(Destination destination) { } diff --git a/activemq-core/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java b/activemq-core/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java index ec2e45eed0..abe7530f8d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/filter/PrefixDestinationFilter.java @@ -47,7 +47,7 @@ public class PrefixDestinationFilter extends DestinationFilter { if (path.length >= length) { int size = length - 1; for (int i = 0; i < size; i++) { - if (!prefixes[i].equals(path[i])) { + if (!path[i].equals(ANY_CHILD) && !prefixes[i].equals(ANY_CHILD) && !prefixes[i].equals(path[i])) { return false; } } diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java index 61c5c1bb79..516e9054cf 100644 --- a/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java @@ -185,6 +185,7 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport { for (Iterator i = brokerList.iterator(); i.hasNext();) { BrokerService broker = i.next().broker; broker.start(); + broker.waitUntilStarted(); } Thread.sleep(maxSetupTime); diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java index f4644b6407..65b960baa2 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/SingleBrokerVirtualDestinationsWithWildcardTest.java @@ -82,9 +82,9 @@ public class SingleBrokerVirtualDestinationsWithWildcardTest extends JmsMultiple startAllBrokers(); sendReceive("Consumer.a.local.test.>", false, "Consumer.a.local.test.>", false, 1, 1); - sendReceive("local.test.1", true, "Consumer.a.local.test.>", false, 1, 2); // duplicates due to wildcard queue pre-created + sendReceive("local.test.1", true, "Consumer.a.local.test.>", false, 1, 1); sendReceive("Consumer.a.global.test.>", false, "Consumer.a.global.test.>", false, 1, 1); - sendReceive("global.test.1", true, "Consumer.a.global.test.>", false, 1, 2); // duplicates due to wildcard queue pre-created + sendReceive("global.test.1", true, "Consumer.a.global.test.>", false, 1, 1); destroyAllBrokers(); } @@ -109,8 +109,6 @@ public class SingleBrokerVirtualDestinationsWithWildcardTest extends JmsMultiple private BrokerService createAndConfigureBroker(URI uri) throws Exception { BrokerService broker = createBroker(uri); - // without this testVirtualDestinationsWithWildcardWithoutIndividualVirtualQueue will fail - broker.setDestinations(new ActiveMQDestination[] {new ActiveMQQueue("Consumer.a.local.test.1"), new ActiveMQQueue("Consumer.a.global.test.1")}); configurePersistenceAdapter(broker);