https://issues.apache.org/jira/browse/AMQ-4682 - implement virtualDestinationInterceptor runtime updates

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1516451 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2013-08-22 13:47:15 +00:00
parent 53ec6655b0
commit 92860ae622
15 changed files with 580 additions and 50 deletions

View File

@ -27,7 +27,7 @@ import org.apache.activemq.command.ActiveMQDestination;
*/
public class CompositeDestinationInterceptor implements DestinationInterceptor {
private final DestinationInterceptor[] interceptors;
private DestinationInterceptor[] interceptors;
public CompositeDestinationInterceptor(final DestinationInterceptor[] interceptors) {
this.interceptors = interceptors;
@ -52,5 +52,8 @@ public class CompositeDestinationInterceptor implements DestinationInterceptor {
interceptors[i].create(broker, context, destination);
}
}
public void setInterceptors(final DestinationInterceptor[] interceptors) {
this.interceptors = interceptors;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.broker.region.virtual;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@ -96,4 +97,8 @@ public class VirtualDestinationInterceptor implements DestinationInterceptor {
};
}
@Override
public String toString() {
return "VirtualDestinationInterceptor" + Arrays.asList(virtualDestinations);
}
}

View File

@ -120,4 +120,9 @@ public class VirtualTopic implements VirtualDestination {
public void setLocal(boolean local) {
this.local = local;
}
@Override
public String toString() {
return new StringBuilder("VirtualTopic:").append(prefix).append(',').append(name).append(',').append(postfix).append(',').append(selectorAware).append(',').append(local).toString();
}
}

View File

@ -88,6 +88,14 @@
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -17,12 +17,15 @@
package org.apache.activemq.plugin;
import java.io.IOException;
import java.util.Collections;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
@ -31,8 +34,19 @@ import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.schema.core.Broker;
import org.apache.activemq.schema.core.CompositeQueue;
import org.apache.activemq.schema.core.CompositeTopic;
import org.apache.activemq.schema.core.NetworkConnector;
import org.apache.activemq.schema.core.VirtualDestinationInterceptor;
import org.apache.activemq.schema.core.VirtualTopic;
import org.apache.activemq.spring.Utils;
import org.apache.activemq.util.IntrospectionSupport;
import org.slf4j.Logger;
@ -50,6 +64,8 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
private Resource configToMonitor;
private Broker currentConfiguration;
private Runnable monitorTask;
private ConcurrentLinkedQueue<Runnable> destinationInterceptorUpdateWork = new ConcurrentLinkedQueue<Runnable>();
private final ReentrantReadWriteLock addDestinationBarrier = new ReentrantReadWriteLock();
public RuntimeConfigurationBroker(org.apache.activemq.broker.Broker next) {
super(next);
@ -80,8 +96,33 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
super.stop();
}
// modification to virtual destinations interceptor needs exclusive access to destination add
@Override
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception {
Runnable work = destinationInterceptorUpdateWork.poll();
if (work != null) {
try {
addDestinationBarrier.writeLock().lockInterruptibly();
do {
work.run();
work = destinationInterceptorUpdateWork.poll();
} while (work != null);
return super.addDestination(context, destination, createIfTemporary);
} finally {
addDestinationBarrier.writeLock().unlock();
}
} else {
try {
addDestinationBarrier.readLock().lockInterruptibly();
return super.addDestination(context, destination, createIfTemporary);
} finally {
addDestinationBarrier.readLock().unlock();
}
}
}
private void monitorModification(final Resource configToMonitor) {
Runnable monitorTask = new Runnable() {
monitorTask = new Runnable() {
@Override
public void run() {
try {
@ -105,63 +146,79 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
LOG.info("configuration change in " + configToMonitor + " at: " + new Date(lastModified));
LOG.info("current:" + currentConfiguration);
LOG.info("new :" + changed);
processNetworkConnectors(currentConfiguration, changed);
processSelectiveChanges(currentConfiguration, changed);
currentConfiguration = changed;
} else {
LOG.info("file modification but no material change to configuration in " + configToMonitor + " at: " + new Date(lastModified));
}
}
private void processNetworkConnectors(Broker currentConfiguration, Broker modifiedConfiguration) {
List<Broker.NetworkConnectors> currentNCsElems = filterElement(
currentConfiguration.getContents(), Broker.NetworkConnectors.class);
List<Broker.NetworkConnectors> modifiedNCsElems = filterElement(
modifiedConfiguration.getContents(), Broker.NetworkConnectors.class);
private void processSelectiveChanges(Broker currentConfiguration, Broker modifiedConfiguration) {
for (Class upDatable : new Class[]{Broker.NetworkConnectors.class, Broker.DestinationInterceptors.class}) {
processChanges(currentConfiguration, modifiedConfiguration, upDatable);
}
}
private void processChanges(Broker currentConfiguration, Broker modifiedConfiguration, Class upDatable) {
List current = filter(currentConfiguration, upDatable);
List modified = filter(modifiedConfiguration, upDatable);
int modIndex = 0, currentIndex = 0;
for (; modIndex < modifiedNCsElems.size() && currentIndex < currentNCsElems.size(); modIndex++, currentIndex++) {
// walk the list of individual nc's...
applyModifications(currentNCsElems.get(currentIndex).getContents(),
modifiedNCsElems.get(modIndex).getContents());
for (; modIndex < modified.size() && currentIndex < current.size(); modIndex++, currentIndex++) {
// walk the list for mods
applyModifications(getContents(current.get(currentIndex)),
getContents(modified.get(modIndex)));
}
for (; modIndex < modifiedNCsElems.size(); modIndex++) {
// new networkConnectors element; add all
for (Object nc : modifiedNCsElems.get(modIndex).getContents()) {
addNetworkConnector(nc);
for (; modIndex < modified.size(); modIndex++) {
// new element; add all
for (Object nc : getContents(modified.get(modIndex))) {
addNew(nc);
}
}
for (; currentIndex < currentNCsElems.size(); currentIndex++) {
// removal of networkConnectors element; remove all
for (Object nc : modifiedNCsElems.get(modIndex).getContents()) {
removeNetworkConnector(nc);
for (; currentIndex < current.size(); currentIndex++) {
// removal of element; remove all
for (Object nc : getContents(current.get(currentIndex))) {
remove(nc);
}
}
}
// mapping all supported updatable elements to support getContents
private List<Object> getContents(Object o) {
try {
return (List<Object>) o.getClass().getMethod("getContents", new Class[]{}).invoke(o, new Object[]{});
} catch (Exception e) {
LOG.info("Failed to access getContents for " + o + ", runtime modifications not supported", e);
}
return new ArrayList<Object>();
}
private void applyModifications(List<Object> current, List<Object> modification) {
int modIndex = 0, currentIndex = 0;
for (; modIndex < modification.size() && currentIndex < current.size(); modIndex++, currentIndex++) {
Object currentNc = current.get(currentIndex);
Object candidateNc = modification.get(modIndex);
if (! currentNc.equals(candidateNc)) {
LOG.info("modification to:" + currentNc + " , with: " + candidateNc);
removeNetworkConnector(currentNc);
addNetworkConnector(candidateNc);
Object existing = current.get(currentIndex);
Object candidate = modification.get(modIndex);
if (! existing.equals(candidate)) {
LOG.info("modification to:" + existing + " , with: " + candidate);
remove(existing);
addNew(candidate);
}
}
for (; modIndex < modification.size(); modIndex++) {
addNetworkConnector(modification.get(modIndex));
addNew(modification.get(modIndex));
}
for (; currentIndex < current.size(); currentIndex++) {
removeNetworkConnector(current.get(currentIndex));
remove(current.get(currentIndex));
}
}
private void removeNetworkConnector(Object o) {
private void remove(Object o) {
if (o instanceof NetworkConnector) {
NetworkConnector toRemove = (NetworkConnector) o;
for (org.apache.activemq.network.NetworkConnector existingCandidate :
@ -177,6 +234,24 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
}
}
}
} else if (o instanceof VirtualDestinationInterceptor) {
// whack it
destinationInterceptorUpdateWork.add(new Runnable() {
public void run() {
List<DestinationInterceptor> interceptorsList = new ArrayList<DestinationInterceptor>();
for (DestinationInterceptor candidate : getBrokerService().getDestinationInterceptors()) {
if (!(candidate instanceof org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor)) {
interceptorsList.add(candidate);
}
}
DestinationInterceptor[] destinationInterceptors = interceptorsList.toArray(new DestinationInterceptor[]{});
getBrokerService().setDestinationInterceptors(destinationInterceptors);
((CompositeDestinationInterceptor) ((RegionBroker) getBrokerService().getRegionBroker()).getDestinationInterceptor()).setInterceptors(destinationInterceptors);
LOG.trace("removed VirtualDestinationInterceptor from: " + interceptorsList);
}
});
} else {
LOG.info("No runtime support for removal of: " + o);
}
}
@ -196,7 +271,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
return true;
}
private void addNetworkConnector(Object o) {
private void addNew(Object o) {
if (o instanceof NetworkConnector) {
NetworkConnector networkConnector = (NetworkConnector) o;
if (networkConnector.getUri() != null) {
@ -206,7 +281,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
Properties properties = new Properties();
IntrospectionSupport.getProperties(networkConnector, properties, null);
properties.remove("uri");
LOG.trace("Applying props: " + properties);
LOG.trace("applying networkConnector props: " + properties);
IntrospectionSupport.setProperties(nc, properties);
nc.start();
LOG.info("started new network connector: " + nc);
@ -214,19 +289,87 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
LOG.error("Failed to add new networkConnector " + networkConnector, e);
}
}
} else if (o instanceof VirtualDestinationInterceptor) {
final VirtualDestinationInterceptor dto = (VirtualDestinationInterceptor) o;
destinationInterceptorUpdateWork.add(new Runnable() {
public void run() {
boolean updatedExistingInterceptor = false;
for (DestinationInterceptor destinationInterceptor : getBrokerService().getDestinationInterceptors()) {
if (destinationInterceptor instanceof org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor) {
// update existing interceptor
final org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor virtualDestinationInterceptor =
(org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor) destinationInterceptor;
virtualDestinationInterceptor.setVirtualDestinations(fromDto(dto));
LOG.trace("applied updates to: " + virtualDestinationInterceptor);
updatedExistingInterceptor = true;
}
}
if (!updatedExistingInterceptor) {
// add
org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor virtualDestinationInterceptor =
new org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor();
virtualDestinationInterceptor.setVirtualDestinations(fromDto(dto));
List<DestinationInterceptor> interceptorsList = new ArrayList<DestinationInterceptor>();
interceptorsList.addAll(Arrays.asList(getBrokerService().getDestinationInterceptors()));
interceptorsList.add(virtualDestinationInterceptor);
DestinationInterceptor[] destinationInterceptors = interceptorsList.toArray(new DestinationInterceptor[]{});
getBrokerService().setDestinationInterceptors(destinationInterceptors);
((CompositeDestinationInterceptor) ((RegionBroker) getBrokerService().getRegionBroker()).getDestinationInterceptor()).setInterceptors(destinationInterceptors);
LOG.trace("applied new: " + interceptorsList);
}
}
});
} else {
LOG.info("No runtime support for modifications to " + o);
}
}
private <T> List<T> filterElement(List<Object> objectList, Class<T> type) {
List<T> result = new LinkedList<T>();
private VirtualDestination[] fromDto(VirtualDestinationInterceptor virtualDestinationInterceptor) {
List<VirtualDestination> answer = new ArrayList<VirtualDestination>();
for (Object vd : filter(virtualDestinationInterceptor, VirtualDestinationInterceptor.VirtualDestinations.class)) {
for (Object vt : filter(vd, VirtualTopic.class)) {
answer.add(fromDto(vt, new org.apache.activemq.broker.region.virtual.VirtualTopic()));
}
for (Object vt : filter(vd, CompositeTopic.class)) {
answer.add(fromDto(vt, new org.apache.activemq.broker.region.virtual.CompositeTopic()));
}
for (Object vt : filter(vd, CompositeQueue.class)) {
answer.add(fromDto(vt, new org.apache.activemq.broker.region.virtual.CompositeQueue()));
}
}
VirtualDestination[] array = new VirtualDestination[answer.size()];
answer.toArray(array);
return array;
}
private VirtualDestination fromDto(Object dto, VirtualDestination instance) {
Properties properties = new Properties();
IntrospectionSupport.getProperties(dto, properties, null);
LOG.trace("applying props: " + properties + ", to " + instance.getClass().getSimpleName());
IntrospectionSupport.setProperties(instance, properties);
return instance;
}
private <T> List<Object> filter(Object obj, Class<T> type) {
return filter(getContents(obj), type);
}
private <T> List<Object> filter(List<Object> objectList, Class<T> type) {
List<Object> result = new LinkedList<Object>();
for (Object o : objectList) {
if (o instanceof JAXBElement) {
JAXBElement element = (JAXBElement) o;
if (element.getDeclaredType() == type) {
result.add((T) element.getValue());
}
} else if (type.isAssignableFrom(o.getClass())) {
result.add((T) o);
}
}
return result;

View File

@ -2,13 +2,26 @@
xmlns:jxb="http://java.sun.com/xml/ns/jaxb"
xmlns:xs="http://www.w3.org/2001/XMLSchema">
<jxb:bindings schemaLocation="activemq.xsd" node="/xs:schema/xs:element[@name='broker']">
<jxb:bindings schemaLocation="activemq.xsd" node="/xs:schema">
<jxb:bindings node="xs:complexType/xs:choice">
<!-- provide uniform accessor to all interesting elements via getContents() -->
<jxb:bindings node="xs:element[@name='broker']/xs:complexType/xs:choice">
<jxb:property name="Contents" />
</jxb:bindings>
<jxb:bindings node="xs:complexType/xs:choice/xs:choice/xs:element[@name='networkConnectors']/xs:complexType/xs:choice">
<jxb:bindings node="xs:element[@name='broker']/xs:complexType/xs:choice/xs:choice/xs:element[@name='networkConnectors']/xs:complexType/xs:choice">
<jxb:property name="Contents" />
</jxb:bindings>
<jxb:bindings node="xs:element[@name='broker']/xs:complexType/xs:choice/xs:choice/xs:element[@name='destinationInterceptors']/xs:complexType/xs:choice">
<jxb:property name="Contents" />
</jxb:bindings>
<jxb:bindings node="xs:element[@name='virtualDestinationInterceptor']/xs:complexType/xs:choice">
<jxb:property name="Contents" />
</jxb:bindings>
<jxb:bindings node="xs:element/xs:complexType/xs:choice/xs:choice/xs:element[@name='virtualDestinations']/xs:complexType/xs:choice">
<jxb:property name="Contents" />
</jxb:bindings>

View File

@ -24,6 +24,7 @@ import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.spring.Utils;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
@ -33,12 +34,14 @@ import org.springframework.core.io.Resource;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
public class NetworkConnectorTest {
public static final Logger LOG = LoggerFactory.getLogger(NetworkConnectorTest.class);
public static final int SLEEP = 4; // seconds
public static final String EMPTY_UPDATABLE_CONFIG = "emptyUpdatableConfig1000" ;
String configurationSeed = "networkConnectorTest";
BrokerService brokerService;
@ -55,9 +58,9 @@ public class NetworkConnectorTest {
}
@Test
public void testNewConnector() throws Exception {
public void testNew() throws Exception {
final String brokerConfig = configurationSeed + "-no-nc-broker";
applyNewConfig(brokerConfig, configurationSeed);
applyNewConfig(brokerConfig, EMPTY_UPDATABLE_CONFIG);
startBroker(brokerConfig);
assertTrue("broker alive", brokerService.isStarted());
assertEquals("no network connectors", 0, brokerService.getNetworkConnectors().size());
@ -70,12 +73,12 @@ public class NetworkConnectorTest {
NetworkConnector networkConnector = brokerService.getNetworkConnectors().get(0);
applyNewConfig(brokerConfig, configurationSeed + "-one-nc");
assertEquals("no new network connectors", 1, brokerService.getNetworkConnectors().size());
assertEquals("same instance", networkConnector, brokerService.getNetworkConnectors().get(0));
assertSame("same instance", networkConnector, brokerService.getNetworkConnectors().get(0));
}
@Test
public void testModConnector() throws Exception {
public void testMod() throws Exception {
final String brokerConfig = configurationSeed + "-one-nc-broker";
applyNewConfig(brokerConfig, configurationSeed + "-one-nc");
@ -97,11 +100,11 @@ public class NetworkConnectorTest {
// apply again - ensure no change
applyNewConfig(brokerConfig, configurationSeed + "-mod-one-nc", SLEEP);
assertEquals("no new network connectors", 1, brokerService.getNetworkConnectors().size());
assertEquals("same instance", modNetworkConnector, brokerService.getNetworkConnectors().get(0));
assertSame("same instance", modNetworkConnector, brokerService.getNetworkConnectors().get(0));
}
@Test
public void testRemoveConnector() throws Exception {
public void testRemove() throws Exception {
final String brokerConfig = configurationSeed + "-two-nc-broker";
applyNewConfig(brokerConfig, configurationSeed + "-two-nc");
@ -111,7 +114,12 @@ public class NetworkConnectorTest {
applyNewConfig(brokerConfig, configurationSeed + "-one-nc", SLEEP);
assertEquals("one network connectors", 1, brokerService.getNetworkConnectors().size());
assertTrue("expected mod on time", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 1 == brokerService.getNetworkConnectors().size();
}
}));
NetworkConnector remainingNetworkConnector = brokerService.getNetworkConnectors().get(0);
assertEquals("name match", "one", remainingNetworkConnector.getName());

View File

@ -0,0 +1,277 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.util.concurrent.TimeUnit;
import javax.jms.*;
import javax.jms.Message;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.spring.Utils;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import static org.junit.Assert.*;
public class VirtualDestTest {
public static final Logger LOG = LoggerFactory.getLogger(VirtualDestTest.class);
public static final int SLEEP = 4; // seconds
String configurationSeed = "virtualDestTest";
BrokerService brokerService;
public void startBroker(String configFileName) throws Exception {
brokerService = createBroker(configFileName);
brokerService.start();
brokerService.waitUntilStarted();
}
public BrokerService createBroker(String configFileName) throws Exception {
brokerService = new BrokerService();
return BrokerFactory.createBroker("xbean:org/apache/activemq/" + configFileName + ".xml");
}
@After
public void stopBroker() throws Exception {
brokerService.stop();
}
@Test
public void testNew() throws Exception {
final String brokerConfig = configurationSeed + "-no-vd-broker";
applyNewConfig(brokerConfig, NetworkConnectorTest.EMPTY_UPDATABLE_CONFIG);
startBroker(brokerConfig);
assertTrue("broker alive", brokerService.isStarted());
// default config has support for VirtualTopic.>
DestinationInterceptor[] interceptors = brokerService.getDestinationInterceptors();
assertEquals("one interceptor", 1, interceptors.length);
assertTrue("it is virtual topic interceptor", interceptors[0] instanceof VirtualDestinationInterceptor);
VirtualDestinationInterceptor defaultValue = (VirtualDestinationInterceptor) interceptors[0];
assertEquals("default names in place", "VirtualTopic.>",
defaultValue.getVirtualDestinations()[0].getVirtualDestination().getPhysicalName());
exerciseVirtualTopic("VirtualTopic.Default");
applyNewConfig(brokerConfig, configurationSeed + "-one-vd", SLEEP);
assertEquals("one interceptor", 1, interceptors.length);
assertTrue("it is virtual topic interceptor", interceptors[0] instanceof VirtualDestinationInterceptor);
// update will happen on addDestination
exerciseVirtualTopic("A.Default");
VirtualDestinationInterceptor newValue = (VirtualDestinationInterceptor) interceptors[0];
assertEquals("new names in place", "A.>",
defaultValue.getVirtualDestinations()[0].getVirtualDestination().getPhysicalName());
// apply again - ensure no change
applyNewConfig(brokerConfig, configurationSeed + "-one-vd");
assertSame("same instance", newValue, (((VirtualDestinationInterceptor) brokerService.getDestinationInterceptors()[0])));
}
@Test
public void testNewNoDefaultVirtualTopicSupport() throws Exception {
final String brokerConfig = configurationSeed + "-no-vd-broker";
applyNewConfig(brokerConfig, NetworkConnectorTest.EMPTY_UPDATABLE_CONFIG);
brokerService = createBroker(brokerConfig);
brokerService.setUseVirtualTopics(false);
brokerService.start();
brokerService.waitUntilStarted();
assertTrue("broker alive", brokerService.isStarted());
DestinationInterceptor[] interceptors = brokerService.getDestinationInterceptors();
assertEquals("one interceptor", 0, interceptors.length);
applyNewConfig(brokerConfig, configurationSeed + "-one-vd", SLEEP);
// update will happen on addDestination
exerciseVirtualTopic("A.Default");
interceptors = brokerService.getDestinationInterceptors();
assertEquals("one interceptor", 1, interceptors.length);
assertTrue("it is virtual topic interceptor", interceptors[0] instanceof VirtualDestinationInterceptor);
VirtualDestinationInterceptor newValue = (VirtualDestinationInterceptor) interceptors[0];
// apply again - ensure no change
applyNewConfig(brokerConfig, configurationSeed + "-one-vd");
assertSame("same instance", newValue, (((VirtualDestinationInterceptor) brokerService.getDestinationInterceptors()[0])));
}
@Test
public void testNewWithMirrorQueueSupport() throws Exception {
final String brokerConfig = configurationSeed + "-no-vd-broker";
applyNewConfig(brokerConfig, NetworkConnectorTest.EMPTY_UPDATABLE_CONFIG);
brokerService = createBroker(brokerConfig);
brokerService.setUseMirroredQueues(true);
brokerService.start();
brokerService.waitUntilStarted();
assertTrue("broker alive", brokerService.isStarted());
DestinationInterceptor[] interceptors = brokerService.getDestinationInterceptors();
assertEquals("expected interceptor", 2, interceptors.length);
applyNewConfig(brokerConfig, configurationSeed + "-one-vd", SLEEP);
// update will happen on addDestination
exerciseVirtualTopic("A.Default");
interceptors = brokerService.getDestinationInterceptors();
assertEquals("expected interceptor", 2, interceptors.length);
assertTrue("it is virtual topic interceptor", interceptors[0] instanceof VirtualDestinationInterceptor);
VirtualDestinationInterceptor newValue = (VirtualDestinationInterceptor) interceptors[0];
// apply again - ensure no change
applyNewConfig(brokerConfig, configurationSeed + "-one-vd");
assertSame("same instance", newValue, (((VirtualDestinationInterceptor) brokerService.getDestinationInterceptors()[0])));
}
@Test
public void testRemove() throws Exception {
final String brokerConfig = configurationSeed + "-one-vd-broker";
applyNewConfig(brokerConfig, configurationSeed + "-one-vd");
startBroker(brokerConfig);
assertTrue("broker alive", brokerService.isStarted());
DestinationInterceptor[] interceptors = brokerService.getDestinationInterceptors();
assertEquals("one interceptor", 1, interceptors.length);
assertTrue("it is virtual topic interceptor", interceptors[0] instanceof VirtualDestinationInterceptor);
VirtualDestinationInterceptor defaultValue = (VirtualDestinationInterceptor) interceptors[0];
assertEquals("configured names in place", "A.>",
defaultValue.getVirtualDestinations()[0].getVirtualDestination().getPhysicalName());
exerciseVirtualTopic("A.Default");
applyNewConfig(brokerConfig, NetworkConnectorTest.EMPTY_UPDATABLE_CONFIG, SLEEP);
// update will happen on addDestination
forceAddDestination("AnyDest");
assertTrue("getDestinationInterceptors empty on time", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() {
return 0 == brokerService.getDestinationInterceptors().length;
}
}));
// reverse the remove, add again
applyNewConfig(brokerConfig, configurationSeed + "-one-vd", SLEEP);
// update will happen on addDestination
exerciseVirtualTopic("A.NewOne");
interceptors = brokerService.getDestinationInterceptors();
assertEquals("expected interceptor", 1, interceptors.length);
assertTrue("it is virtual topic interceptor", interceptors[0] instanceof VirtualDestinationInterceptor);
}
@Test
public void testMod() throws Exception {
final String brokerConfig = configurationSeed + "-one-vd-broker";
applyNewConfig(brokerConfig, configurationSeed + "-one-vd");
startBroker(brokerConfig);
assertTrue("broker alive", brokerService.isStarted());
assertEquals("one interceptor", 1, brokerService.getDestinationInterceptors().length);
exerciseVirtualTopic("A.Default");
applyNewConfig(brokerConfig, configurationSeed + "-mod-one-vd", SLEEP);
exerciseVirtualTopic("B.Default");
assertEquals("still one interceptor", 1, brokerService.getDestinationInterceptors().length);
}
@Test
public void testModWithMirroredQueue() throws Exception {
final String brokerConfig = configurationSeed + "-one-vd-broker";
applyNewConfig(brokerConfig, configurationSeed + "-one-vd");
brokerService = createBroker(brokerConfig);
brokerService.setUseMirroredQueues(true);
brokerService.start();
brokerService.waitUntilStarted();
assertEquals("one interceptor", 1, brokerService.getDestinationInterceptors().length);
exerciseVirtualTopic("A.Default");
applyNewConfig(brokerConfig, configurationSeed + "-mod-one-vd", SLEEP);
exerciseVirtualTopic("B.Default");
assertEquals("still one interceptor", 1, brokerService.getDestinationInterceptors().length);
}
private void forceAddDestination(String dest) throws Exception {
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createConsumer(session.createQueue("Consumer.A." + dest));
connection.close();
}
private void exerciseVirtualTopic(String topic) throws Exception {
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue("Consumer.A." + topic));
MessageProducer producer = session.createProducer(session.createTopic(topic));
final String body = "To vt:" + topic;
producer.send(session.createTextMessage(body));
Message message = null;
for (int i=0; i<5 && message == null; i++) {
message = consumer.receive(1000);
}
assertNotNull("got message", message);
assertEquals("got expected message", body, ((TextMessage) message).getText());
connection.close();
}
private void applyNewConfig(String configName, String newConfigName) throws Exception {
applyNewConfig(configName, newConfigName, 0l);
}
private void applyNewConfig(String configName, String newConfigName, long sleep) throws Exception {
Resource resource = Utils.resourceFromString("org/apache/activemq");
FileOutputStream current = new FileOutputStream(new File(resource.getFile(), configName + ".xml"));
FileInputStream modifications = new FileInputStream(new File(resource.getFile(), newConfigName + ".xml"));
modifications.getChannel().transferTo(0, Long.MAX_VALUE, current.getChannel());
current.flush();
LOG.info("Updated: " + current.getChannel());
if (sleep > 0) {
// wait for mods to kick in
TimeUnit.SECONDS.sleep(sleep);
}
}
}

View File

@ -21,7 +21,7 @@
log4j.rootLogger=INFO, out, stdout
#log4j.logger.org.apache.activemq=DEBUG
#log4j.logger.org.apache.activemq.broker.region=TRACE
log4j.logger.org.apache.activemq.plugin.RuntimeConfigurationBroker=TRACE
# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender

View File

@ -17,7 +17,6 @@
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

View File

@ -17,7 +17,6 @@
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

View File

@ -17,7 +17,6 @@
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

View File

@ -17,7 +17,6 @@
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
<plugins>
<runtimeConfigurationPlugin checkPeriod="1000"/>
</plugins>
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name="B.>" selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
</broker>
</beans>

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false" >
<plugins>
<runtimeConfigurationPlugin checkPeriod="1000"/>
</plugins>
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name="A.>" selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
</broker>
</beans>