mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-4995 - reapply destination interceptor after change
This commit is contained in:
parent
ad3041034b
commit
55da9bc821
|
@ -365,6 +365,11 @@ public class BrokerFilter implements Broker {
|
||||||
next.processConsumerControl(consumerExchange, control);
|
next.processConsumerControl(consumerExchange, control);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reapplyInterceptor() {
|
||||||
|
next.reapplyInterceptor();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Scheduler getScheduler() {
|
public Scheduler getScheduler() {
|
||||||
return next.getScheduler();
|
return next.getScheduler();
|
||||||
|
|
|
@ -357,6 +357,11 @@ public class EmptyBroker implements Broker {
|
||||||
ConsumerControl control) {
|
ConsumerControl control) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reapplyInterceptor() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Scheduler getScheduler() {
|
public Scheduler getScheduler() {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -367,6 +367,11 @@ public class ErrorBroker implements Broker {
|
||||||
throw new BrokerStoppedException(this.message);
|
throw new BrokerStoppedException(this.message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reapplyInterceptor() {
|
||||||
|
throw new BrokerStoppedException(this.message);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Scheduler getScheduler() {
|
public Scheduler getScheduler() {
|
||||||
throw new BrokerStoppedException(this.message);
|
throw new BrokerStoppedException(this.message);
|
||||||
|
|
|
@ -376,6 +376,11 @@ public class MutableBrokerFilter implements Broker {
|
||||||
getNext().processConsumerControl(consumerExchange, control);
|
getNext().processConsumerControl(consumerExchange, control);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reapplyInterceptor() {
|
||||||
|
getNext().reapplyInterceptor();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Scheduler getScheduler() {
|
public Scheduler getScheduler() {
|
||||||
return getNext().getScheduler();
|
return getNext().getScheduler();
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.activemq.broker.ConsumerBrokerExchange;
|
||||||
import org.apache.activemq.DestinationDoesNotExistException;
|
import org.apache.activemq.DestinationDoesNotExistException;
|
||||||
import org.apache.activemq.broker.ProducerBrokerExchange;
|
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
|
import org.apache.activemq.broker.region.virtual.CompositeDestinationFilter;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ConsumerControl;
|
import org.apache.activemq.command.ConsumerControl;
|
||||||
import org.apache.activemq.command.ConsumerId;
|
import org.apache.activemq.command.ConsumerId;
|
||||||
|
@ -580,4 +581,18 @@ public abstract class AbstractRegion implements Region {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void reapplyInterceptor() {
|
||||||
|
DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
|
||||||
|
Map<ActiveMQDestination, Destination> map = getDestinationMap();
|
||||||
|
for (ActiveMQDestination key : map.keySet()) {
|
||||||
|
Destination destination = map.get(key);
|
||||||
|
if (destination instanceof CompositeDestinationFilter) {
|
||||||
|
destination = ((CompositeDestinationFilter)destination).next;
|
||||||
|
}
|
||||||
|
destination = destinationInterceptor.intercept(destination);
|
||||||
|
getDestinationMap().put(key, destination);
|
||||||
|
destinations.put(key, destination);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -150,4 +150,6 @@ public interface Region extends Service {
|
||||||
|
|
||||||
void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control);
|
void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control);
|
||||||
|
|
||||||
|
void reapplyInterceptor();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -859,4 +859,11 @@ public class RegionBroker extends EmptyBroker {
|
||||||
public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
|
public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
|
||||||
this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
|
this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void reapplyInterceptor() {
|
||||||
|
queueRegion.reapplyInterceptor();
|
||||||
|
topicRegion.reapplyInterceptor();
|
||||||
|
tempQueueRegion.reapplyInterceptor();
|
||||||
|
tempTopicRegion.reapplyInterceptor();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -580,6 +580,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
|
||||||
RegionBroker regionBroker = (RegionBroker) getBrokerService().getRegionBroker();
|
RegionBroker regionBroker = (RegionBroker) getBrokerService().getRegionBroker();
|
||||||
((CompositeDestinationInterceptor) regionBroker.getDestinationInterceptor()).setInterceptors(destinationInterceptors);
|
((CompositeDestinationInterceptor) regionBroker.getDestinationInterceptor()).setInterceptors(destinationInterceptors);
|
||||||
info("applied new: " + interceptorsList);
|
info("applied new: " + interceptorsList);
|
||||||
|
regionBroker.reapplyInterceptor();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -80,6 +80,20 @@ public class VirtualDestTest extends RuntimeConfigTestSupport {
|
||||||
exerciseCompositeQueue("VirtualDestination.CompositeQueue", "VirtualDestination.QueueConsumer");
|
exerciseCompositeQueue("VirtualDestination.CompositeQueue", "VirtualDestination.QueueConsumer");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testModComposite() throws Exception {
|
||||||
|
final String brokerConfig = configurationSeed + "-mod-composite-vd-broker";
|
||||||
|
applyNewConfig(brokerConfig, configurationSeed + "-add-composite-vd");
|
||||||
|
startBroker(brokerConfig);
|
||||||
|
assertTrue("broker alive", brokerService.isStarted());
|
||||||
|
exerciseCompositeQueue("VirtualDestination.CompositeQueue", "VirtualDestination.QueueConsumer");
|
||||||
|
|
||||||
|
applyNewConfig(brokerConfig, configurationSeed + "-mod-composite-vd", SLEEP);
|
||||||
|
exerciseCompositeQueue("VirtualDestination.CompositeQueue", "VirtualDestination.QueueConsumer");
|
||||||
|
|
||||||
|
exerciseCompositeQueue("VirtualDestination.CompositeQueue", "VirtualDestination.CompositeQueue");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNewNoDefaultVirtualTopicSupport() throws Exception {
|
public void testNewNoDefaultVirtualTopicSupport() throws Exception {
|
||||||
final String brokerConfig = configurationSeed + "-no-vd-vt-broker";
|
final String brokerConfig = configurationSeed + "-no-vd-vt-broker";
|
||||||
|
|
Loading…
Reference in New Issue