mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3197 - virtual destinations and wildcards
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1076651 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b98047ed8c
commit
d17ecebec4
|
@ -129,7 +129,9 @@ public abstract class AbstractRegion implements Region {
|
|||
dest.start();
|
||||
destinations.put(destination, dest);
|
||||
destinationMap.put(destination, dest);
|
||||
addSubscriptionsForDestination(context, dest);
|
||||
if (!dest.getActiveMQDestination().isPattern()) {
|
||||
addSubscriptionsForDestination(context, dest);
|
||||
}
|
||||
}
|
||||
if (dest == null) {
|
||||
throw new JMSException("The destination " + destination + " does not exist.");
|
||||
|
|
|
@ -16,6 +16,10 @@
|
|||
*/
|
||||
package org.apache.activemq.broker.region;
|
||||
|
||||
import org.apache.activemq.broker.Broker;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
|
||||
/**
|
||||
* Represents a Composite Pattern of a {@link DestinationInterceptor}
|
||||
*
|
||||
|
@ -42,5 +46,11 @@ public class CompositeDestinationInterceptor implements DestinationInterceptor {
|
|||
interceptors[i].remove(destination);
|
||||
}
|
||||
}
|
||||
|
||||
public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
|
||||
for (int i = 0; i < interceptors.length; i++) {
|
||||
interceptors[i].create(broker, context, destination);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,6 +17,10 @@
|
|||
package org.apache.activemq.broker.region;
|
||||
|
||||
|
||||
import org.apache.activemq.broker.Broker;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
|
||||
/**
|
||||
* Represents an interceptor on destination instances.
|
||||
*
|
||||
|
@ -28,4 +32,6 @@ public interface DestinationInterceptor {
|
|||
|
||||
void remove(Destination destination);
|
||||
|
||||
void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception;
|
||||
|
||||
}
|
||||
|
|
|
@ -438,6 +438,9 @@ public class RegionBroker extends EmptyBroker {
|
|||
@Override
|
||||
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
|
||||
ActiveMQDestination destination = info.getDestination();
|
||||
if (destinationInterceptor != null) {
|
||||
destinationInterceptor.create(this, context, destination);
|
||||
}
|
||||
synchronized (purgeInactiveDestinationsTask) {
|
||||
switch (destination.getDestinationType()) {
|
||||
case ActiveMQDestination.QUEUE_TYPE:
|
||||
|
|
|
@ -18,7 +18,10 @@ package org.apache.activemq.broker.region.virtual;
|
|||
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.activemq.broker.Broker;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -35,6 +38,8 @@ public abstract class CompositeDestination implements VirtualDestination {
|
|||
return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(), isCopyMessage());
|
||||
}
|
||||
|
||||
public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) {
|
||||
}
|
||||
|
||||
public void remove(Destination destination) {
|
||||
}
|
||||
|
|
|
@ -16,9 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.broker.region.virtual;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.BrokerServiceAware;
|
||||
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||
import org.apache.activemq.broker.*;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.DestinationFilter;
|
||||
import org.apache.activemq.broker.region.DestinationInterceptor;
|
||||
|
@ -86,6 +84,8 @@ public class MirroredQueue implements DestinationInterceptor, BrokerServiceAware
|
|||
|
||||
}
|
||||
|
||||
public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) {}
|
||||
|
||||
// Properties
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.broker.region.virtual;
|
||||
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.DestinationInterceptor;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
|
|
|
@ -21,10 +21,13 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.activemq.broker.Broker;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.DestinationFilter;
|
||||
import org.apache.activemq.broker.region.DestinationInterceptor;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.Message;
|
||||
import org.apache.activemq.filter.DestinationMap;
|
||||
|
||||
|
@ -45,8 +48,8 @@ public class VirtualDestinationInterceptor implements DestinationInterceptor {
|
|||
List<Destination> destinations = new ArrayList<Destination>();
|
||||
for (Iterator iter = virtualDestinations.iterator(); iter.hasNext();) {
|
||||
VirtualDestination virtualDestination = (VirtualDestination)iter.next();
|
||||
Destination newNestination = virtualDestination.intercept(destination);
|
||||
destinations.add(newNestination);
|
||||
Destination newDestination = virtualDestination.intercept(destination);
|
||||
destinations.add(newDestination);
|
||||
}
|
||||
if (!destinations.isEmpty()) {
|
||||
if (destinations.size() == 1) {
|
||||
|
@ -60,6 +63,12 @@ public class VirtualDestinationInterceptor implements DestinationInterceptor {
|
|||
}
|
||||
|
||||
|
||||
public synchronized void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
|
||||
for (VirtualDestination virt: virtualDestinations) {
|
||||
virt.create(broker, context, destination);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void remove(Destination destination) {
|
||||
}
|
||||
|
||||
|
|
|
@ -16,9 +16,13 @@
|
|||
*/
|
||||
package org.apache.activemq.broker.region.virtual;
|
||||
|
||||
import org.apache.activemq.broker.Broker;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.filter.DestinationFilter;
|
||||
|
||||
/**
|
||||
* Creates <a href="http://activemq.org/site/virtual-destinations.html">Virtual
|
||||
|
@ -48,6 +52,15 @@ public class VirtualTopic implements VirtualDestination {
|
|||
}
|
||||
|
||||
|
||||
public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
|
||||
if (destination.isQueue() && destination.isPattern() && broker.getDestinations(destination).isEmpty()) {
|
||||
DestinationFilter filter = DestinationFilter.parseFilter(new ActiveMQQueue(prefix + DestinationFilter.ANY_DESCENDENT));
|
||||
if (filter.matches(destination)) {
|
||||
broker.addDestination(context, destination, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void remove(Destination destination) {
|
||||
}
|
||||
|
||||
|
|
|
@ -47,7 +47,7 @@ public class PrefixDestinationFilter extends DestinationFilter {
|
|||
if (path.length >= length) {
|
||||
int size = length - 1;
|
||||
for (int i = 0; i < size; i++) {
|
||||
if (!prefixes[i].equals(path[i])) {
|
||||
if (!path[i].equals(ANY_CHILD) && !prefixes[i].equals(ANY_CHILD) && !prefixes[i].equals(path[i])) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -185,6 +185,7 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
|
|||
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
|
||||
BrokerService broker = i.next().broker;
|
||||
broker.start();
|
||||
broker.waitUntilStarted();
|
||||
}
|
||||
|
||||
Thread.sleep(maxSetupTime);
|
||||
|
|
|
@ -82,9 +82,9 @@ public class SingleBrokerVirtualDestinationsWithWildcardTest extends JmsMultiple
|
|||
startAllBrokers();
|
||||
|
||||
sendReceive("Consumer.a.local.test.>", false, "Consumer.a.local.test.>", false, 1, 1);
|
||||
sendReceive("local.test.1", true, "Consumer.a.local.test.>", false, 1, 2); // duplicates due to wildcard queue pre-created
|
||||
sendReceive("local.test.1", true, "Consumer.a.local.test.>", false, 1, 1);
|
||||
sendReceive("Consumer.a.global.test.>", false, "Consumer.a.global.test.>", false, 1, 1);
|
||||
sendReceive("global.test.1", true, "Consumer.a.global.test.>", false, 1, 2); // duplicates due to wildcard queue pre-created
|
||||
sendReceive("global.test.1", true, "Consumer.a.global.test.>", false, 1, 1);
|
||||
|
||||
destroyAllBrokers();
|
||||
}
|
||||
|
@ -109,8 +109,6 @@ public class SingleBrokerVirtualDestinationsWithWildcardTest extends JmsMultiple
|
|||
|
||||
private BrokerService createAndConfigureBroker(URI uri) throws Exception {
|
||||
BrokerService broker = createBroker(uri);
|
||||
// without this testVirtualDestinationsWithWildcardWithoutIndividualVirtualQueue will fail
|
||||
broker.setDestinations(new ActiveMQDestination[] {new ActiveMQQueue("Consumer.a.local.test.1"), new ActiveMQQueue("Consumer.a.global.test.1")});
|
||||
|
||||
configurePersistenceAdapter(broker);
|
||||
|
||||
|
|
Loading…
Reference in New Issue