diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java index 7bc675ed78..b1d3c1877e 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -365,6 +365,11 @@ public class BrokerFilter implements Broker { next.processConsumerControl(consumerExchange, control); } + @Override + public void reapplyInterceptor() { + next.reapplyInterceptor(); + } + @Override public Scheduler getScheduler() { return next.getScheduler(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java index 9110059f0a..2d2e6ba7ab 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -357,6 +357,11 @@ public class EmptyBroker implements Broker { ConsumerControl control) { } + @Override + public void reapplyInterceptor() { + + } + @Override public Scheduler getScheduler() { return null; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java index db75d275de..f692d8a2c9 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -367,6 +367,11 @@ public class ErrorBroker implements Broker { throw new BrokerStoppedException(this.message); } + @Override + public void reapplyInterceptor() { + throw new BrokerStoppedException(this.message); + } + @Override public Scheduler getScheduler() { throw new BrokerStoppedException(this.message); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java index 0e6b199ed6..112378a84f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java @@ -376,6 +376,11 @@ public class MutableBrokerFilter implements Broker { getNext().processConsumerControl(consumerExchange, control); } + @Override + public void reapplyInterceptor() { + getNext().reapplyInterceptor(); + } + @Override public Scheduler getScheduler() { return getNext().getScheduler(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index efa02cbc9a..7f7b7e4c99 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -31,6 +31,7 @@ import org.apache.activemq.broker.ConsumerBrokerExchange; import org.apache.activemq.DestinationDoesNotExistException; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.virtual.CompositeDestinationFilter; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConsumerId; @@ -580,4 +581,18 @@ public abstract class AbstractRegion implements Region { } } } + + public void reapplyInterceptor() { + DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); + Map map = getDestinationMap(); + for (ActiveMQDestination key : map.keySet()) { + Destination destination = map.get(key); + if (destination instanceof CompositeDestinationFilter) { + destination = ((CompositeDestinationFilter)destination).next; + } + destination = destinationInterceptor.intercept(destination); + getDestinationMap().put(key, destination); + destinations.put(key, destination); + } + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Region.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Region.java index ab9d1ebd47..d9d7c9d6c7 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Region.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Region.java @@ -149,5 +149,7 @@ public interface Region extends Service { Set getDestinations(ActiveMQDestination destination); void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control); + + void reapplyInterceptor(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 40f599bfc4..4d547537d9 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -859,4 +859,11 @@ public class RegionBroker extends EmptyBroker { public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) { this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend; } + + public void reapplyInterceptor() { + queueRegion.reapplyInterceptor(); + topicRegion.reapplyInterceptor(); + tempQueueRegion.reapplyInterceptor(); + tempTopicRegion.reapplyInterceptor(); + } } diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/RuntimeConfigurationBroker.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/RuntimeConfigurationBroker.java index 90dba00f5f..73c172d923 100644 --- a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/RuntimeConfigurationBroker.java +++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/RuntimeConfigurationBroker.java @@ -580,6 +580,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter { RegionBroker regionBroker = (RegionBroker) getBrokerService().getRegionBroker(); ((CompositeDestinationInterceptor) regionBroker.getDestinationInterceptor()).setInterceptors(destinationInterceptors); info("applied new: " + interceptorsList); + regionBroker.reapplyInterceptor(); } } }); diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/VirtualDestTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/VirtualDestTest.java index 5c007a70a8..94a10463a3 100644 --- a/activemq-runtime-config/src/test/java/org/apache/activemq/VirtualDestTest.java +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/VirtualDestTest.java @@ -80,6 +80,20 @@ public class VirtualDestTest extends RuntimeConfigTestSupport { exerciseCompositeQueue("VirtualDestination.CompositeQueue", "VirtualDestination.QueueConsumer"); } + @Test + public void testModComposite() throws Exception { + final String brokerConfig = configurationSeed + "-mod-composite-vd-broker"; + applyNewConfig(brokerConfig, configurationSeed + "-add-composite-vd"); + startBroker(brokerConfig); + assertTrue("broker alive", brokerService.isStarted()); + exerciseCompositeQueue("VirtualDestination.CompositeQueue", "VirtualDestination.QueueConsumer"); + + applyNewConfig(brokerConfig, configurationSeed + "-mod-composite-vd", SLEEP); + exerciseCompositeQueue("VirtualDestination.CompositeQueue", "VirtualDestination.QueueConsumer"); + + exerciseCompositeQueue("VirtualDestination.CompositeQueue", "VirtualDestination.CompositeQueue"); + } + @Test public void testNewNoDefaultVirtualTopicSupport() throws Exception { final String brokerConfig = configurationSeed + "-no-vd-vt-broker";