https://issues.apache.org/jira/browse/AMQ-4682 - add jmx mbean, manual update, validation and test refactor

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1516650 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2013-08-22 22:47:21 +00:00
parent 85bb229aa7
commit 8b8e726eec
9 changed files with 474 additions and 131 deletions

View File

@ -17,6 +17,7 @@
package org.apache.activemq.plugin; package org.apache.activemq.plugin;
import java.io.IOException; import java.io.IOException;
import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
@ -26,6 +27,9 @@ import java.util.Properties;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.JMException;
import javax.management.ObjectName;
import javax.xml.XMLConstants;
import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement; import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException; import javax.xml.bind.JAXBException;
@ -33,14 +37,20 @@ import javax.xml.bind.Unmarshaller;
import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException; import javax.xml.parsers.ParserConfigurationException;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
import javax.xml.validation.Validator;
import javax.xml.validation.ValidatorHandler;
import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor; import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationInterceptor; import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.plugin.jmx.RuntimeConfigurationView;
import org.apache.activemq.schema.core.Broker; import org.apache.activemq.schema.core.Broker;
import org.apache.activemq.schema.core.CompositeQueue; import org.apache.activemq.schema.core.CompositeQueue;
import org.apache.activemq.schema.core.CompositeTopic; import org.apache.activemq.schema.core.CompositeTopic;
@ -59,6 +69,7 @@ import org.xml.sax.SAXException;
public class RuntimeConfigurationBroker extends BrokerFilter { public class RuntimeConfigurationBroker extends BrokerFilter {
public static final Logger LOG = LoggerFactory.getLogger(RuntimeConfigurationBroker.class); public static final Logger LOG = LoggerFactory.getLogger(RuntimeConfigurationBroker.class);
public static final String objectNamePropsAppendage = ",service=RuntimeConfiguration,name=Plugin";
private long checkPeriod; private long checkPeriod;
private long lastModified = -1; private long lastModified = -1;
private Resource configToMonitor; private Resource configToMonitor;
@ -66,6 +77,8 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
private Runnable monitorTask; private Runnable monitorTask;
private ConcurrentLinkedQueue<Runnable> destinationInterceptorUpdateWork = new ConcurrentLinkedQueue<Runnable>(); private ConcurrentLinkedQueue<Runnable> destinationInterceptorUpdateWork = new ConcurrentLinkedQueue<Runnable>();
private final ReentrantReadWriteLock addDestinationBarrier = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock addDestinationBarrier = new ReentrantReadWriteLock();
private ObjectName objectName;
private String infoString;
public RuntimeConfigurationBroker(org.apache.activemq.broker.Broker next) { public RuntimeConfigurationBroker(org.apache.activemq.broker.Broker next) {
super(next); super(next);
@ -82,6 +95,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
currentConfiguration = loadConfiguration(configToMonitor); currentConfiguration = loadConfiguration(configToMonitor);
monitorModification(configToMonitor); monitorModification(configToMonitor);
registerMbean();
} }
@Override @Override
@ -93,9 +107,31 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
LOG.warn("Failed to cancel config monitor task", letsNotStopStop); LOG.warn("Failed to cancel config monitor task", letsNotStopStop);
} }
} }
unregisterMbean();
super.stop(); super.stop();
} }
private void registerMbean() {
if (getBrokerService().isUseJmx()) {
ManagementContext managementContext = getBrokerService().getManagementContext();
try {
objectName = new ObjectName(getBrokerService().getBrokerObjectName().toString() + objectNamePropsAppendage);
managementContext.registerMBean(new RuntimeConfigurationView(this), objectName);
} catch (Exception ignored) {
LOG.debug("failed to register RuntimeConfigurationMBean", ignored);
}
}
}
private void unregisterMbean() {
if (objectName != null) {
try {
getBrokerService().getManagementContext().unregisterMBean(objectName);
} catch (JMException ignored) {
}
}
}
// modification to virtual destinations interceptor needs exclusive access to destination add // modification to virtual destinations interceptor needs exclusive access to destination add
@Override @Override
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception { public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception {
@ -121,6 +157,15 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
} }
} }
public String updateNow() {
LOG.info("Manual configuration update triggered");
infoString = "";
applyModifications(configToMonitor);
String result = infoString;
infoString = null;
return result;
}
private void monitorModification(final Resource configToMonitor) { private void monitorModification(final Resource configToMonitor) {
monitorTask = new Runnable() { monitorTask = new Runnable() {
@Override @Override
@ -134,22 +179,40 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
} }
} }
}; };
if (lastModified > 0) { if (lastModified > 0 && checkPeriod > 0) {
this.getBrokerService().getScheduler().executePeriodically(monitorTask, checkPeriod); this.getBrokerService().getScheduler().executePeriodically(monitorTask, checkPeriod);
LOG.info("Monitoring for updates (every " + checkPeriod + "millis) : " + configToMonitor); info("Monitoring for updates (every " + checkPeriod + "millis) : " + configToMonitor);
} }
} }
private void info(String s) {
LOG.info(s);
if (infoString != null) {
infoString += s;
infoString += ";";
}
}
private void info(String s, Throwable t) {
LOG.info(s, t);
if (infoString != null) {
infoString += s;
infoString += ", " + t;
infoString += ";";
}
}
private void applyModifications(Resource configToMonitor) { private void applyModifications(Resource configToMonitor) {
Broker changed = loadConfiguration(configToMonitor); Broker changed = loadConfiguration(configToMonitor);
if (!currentConfiguration.equals(changed)) { if (changed != null && !currentConfiguration.equals(changed)) {
LOG.info("configuration change in " + configToMonitor + " at: " + new Date(lastModified)); LOG.info("change in " + configToMonitor + " at: " + new Date(lastModified));
LOG.info("current:" + currentConfiguration); LOG.debug("current:" + currentConfiguration);
LOG.info("new :" + changed); LOG.debug("new :" + changed);
processSelectiveChanges(currentConfiguration, changed); processSelectiveChanges(currentConfiguration, changed);
currentConfiguration = changed; currentConfiguration = changed;
} else { } else {
LOG.info("file modification but no material change to configuration in " + configToMonitor + " at: " + new Date(lastModified)); info("No material change to configuration in " + configToMonitor + " at: " + new Date(lastModified));
} }
} }
@ -192,7 +255,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
try { try {
return (List<Object>) o.getClass().getMethod("getContents", new Class[]{}).invoke(o, new Object[]{}); return (List<Object>) o.getClass().getMethod("getContents", new Class[]{}).invoke(o, new Object[]{});
} catch (Exception e) { } catch (Exception e) {
LOG.info("Failed to access getContents for " + o + ", runtime modifications not supported", e); info("Failed to access getContents for " + o + ", runtime modifications not supported", e);
} }
return new ArrayList<Object>(); return new ArrayList<Object>();
} }
@ -203,7 +266,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
Object existing = current.get(currentIndex); Object existing = current.get(currentIndex);
Object candidate = modification.get(modIndex); Object candidate = modification.get(modIndex);
if (! existing.equals(candidate)) { if (! existing.equals(candidate)) {
LOG.info("modification to:" + existing + " , with: " + candidate); info("modification to:" + existing + " , with: " + candidate);
remove(existing); remove(existing);
addNew(candidate); addNew(candidate);
} }
@ -227,9 +290,9 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
if (getBrokerService().removeNetworkConnector(existingCandidate)) { if (getBrokerService().removeNetworkConnector(existingCandidate)) {
try { try {
existingCandidate.stop(); existingCandidate.stop();
LOG.info("stopped and removed networkConnector: " + existingCandidate); info("stopped and removed networkConnector: " + existingCandidate);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed to stop removed network connector: " + existingCandidate); info("Failed to stop removed network connector: " + existingCandidate);
} }
} }
} }
@ -247,11 +310,11 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
DestinationInterceptor[] destinationInterceptors = interceptorsList.toArray(new DestinationInterceptor[]{}); DestinationInterceptor[] destinationInterceptors = interceptorsList.toArray(new DestinationInterceptor[]{});
getBrokerService().setDestinationInterceptors(destinationInterceptors); getBrokerService().setDestinationInterceptors(destinationInterceptors);
((CompositeDestinationInterceptor) ((RegionBroker) getBrokerService().getRegionBroker()).getDestinationInterceptor()).setInterceptors(destinationInterceptors); ((CompositeDestinationInterceptor) ((RegionBroker) getBrokerService().getRegionBroker()).getDestinationInterceptor()).setInterceptors(destinationInterceptors);
LOG.trace("removed VirtualDestinationInterceptor from: " + interceptorsList); info("removed VirtualDestinationInterceptor from: " + interceptorsList);
} }
}); });
} else { } else {
LOG.info("No runtime support for removal of: " + o); info("No runtime support for removal of: " + o);
} }
} }
@ -284,9 +347,9 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
LOG.trace("applying networkConnector props: " + properties); LOG.trace("applying networkConnector props: " + properties);
IntrospectionSupport.setProperties(nc, properties); IntrospectionSupport.setProperties(nc, properties);
nc.start(); nc.start();
LOG.info("started new network connector: " + nc); info("started new network connector: " + nc);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed to add new networkConnector " + networkConnector, e); info("Failed to add new networkConnector " + networkConnector, e);
} }
} }
} else if (o instanceof VirtualDestinationInterceptor) { } else if (o instanceof VirtualDestinationInterceptor) {
@ -303,7 +366,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
(org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor) destinationInterceptor; (org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor) destinationInterceptor;
virtualDestinationInterceptor.setVirtualDestinations(fromDto(dto)); virtualDestinationInterceptor.setVirtualDestinations(fromDto(dto));
LOG.trace("applied updates to: " + virtualDestinationInterceptor); info("applied updates to: " + virtualDestinationInterceptor);
updatedExistingInterceptor = true; updatedExistingInterceptor = true;
} }
} }
@ -321,12 +384,12 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
DestinationInterceptor[] destinationInterceptors = interceptorsList.toArray(new DestinationInterceptor[]{}); DestinationInterceptor[] destinationInterceptors = interceptorsList.toArray(new DestinationInterceptor[]{});
getBrokerService().setDestinationInterceptors(destinationInterceptors); getBrokerService().setDestinationInterceptors(destinationInterceptors);
((CompositeDestinationInterceptor) ((RegionBroker) getBrokerService().getRegionBroker()).getDestinationInterceptor()).setInterceptors(destinationInterceptors); ((CompositeDestinationInterceptor) ((RegionBroker) getBrokerService().getRegionBroker()).getDestinationInterceptor()).setInterceptors(destinationInterceptors);
LOG.trace("applied new: " + interceptorsList); info("applied new: " + interceptorsList);
} }
} }
}); });
} else { } else {
LOG.info("No runtime support for modifications to " + o); info("No runtime support for modifications to " + o);
} }
} }
@ -381,6 +444,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
try { try {
JAXBContext context = JAXBContext.newInstance(Broker.class); JAXBContext context = JAXBContext.newInstance(Broker.class);
Unmarshaller unMarshaller = context.createUnmarshaller(); Unmarshaller unMarshaller = context.createUnmarshaller();
unMarshaller.setSchema(getSchema());
// skip beans and pull out the broker node to validate // skip beans and pull out the broker node to validate
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
@ -397,19 +461,41 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
lastModified = configToMonitor.lastModified(); lastModified = configToMonitor.lastModified();
} catch (IOException e) { } catch (IOException e) {
LOG.error("Failed to access: " + configToMonitor, e); info("Failed to access: " + configToMonitor, e);
} catch (JAXBException e) { } catch (JAXBException e) {
LOG.error("Failed to parse: " + configToMonitor, e); info("Failed to parse: " + configToMonitor, e);
} catch (ParserConfigurationException e) { } catch (ParserConfigurationException e) {
LOG.error("Failed to document parse: " + configToMonitor, e); info("Failed to document parse: " + configToMonitor, e);
} catch (SAXException e) { } catch (SAXException e) {
LOG.error("Failed to find broker element in: " + configToMonitor, e); info("Failed to find broker element in: " + configToMonitor, e);
} }
} }
return jaxbConfig; return jaxbConfig;
} }
private Schema schema;
private Schema getSchema() throws SAXException {
if (schema == null) {
SchemaFactory schemaFactory = SchemaFactory.newInstance(
XMLConstants.W3C_XML_SCHEMA_NS_URI);
schema = schemaFactory.newSchema(getClass().getResource("/activemq.xsd"));
}
return schema;
}
public void setCheckPeriod(long checkPeriod) { public void setCheckPeriod(long checkPeriod) {
this.checkPeriod = checkPeriod; this.checkPeriod = checkPeriod;
} }
public long getLastModified() {
return lastModified;
}
public Resource getConfigToMonitor() {
return configToMonitor;
}
public long getCheckPeriod() {
return checkPeriod;
}
} }

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin.jmx;
import java.util.Date;
import org.apache.activemq.plugin.RuntimeConfigurationBroker;
import org.springframework.core.io.Resource;
public class RuntimeConfigurationView implements RuntimeConfigurationViewMBean {
private final RuntimeConfigurationBroker runtimeConfigurationBroker;
public RuntimeConfigurationView(RuntimeConfigurationBroker runtimeConfigurationBroker) {
this.runtimeConfigurationBroker = runtimeConfigurationBroker;
}
@Override
public String getUrl() {
Resource value = runtimeConfigurationBroker.getConfigToMonitor();
return value != null ? value.toString() : "null" ;
}
@Override
public String getModified() {
long lastModified = runtimeConfigurationBroker.getLastModified();
if (lastModified > 0) {
return new Date(lastModified).toString();
}
return "unknown";
}
@Override
public String getCheckPeriod() {
return String.valueOf(runtimeConfigurationBroker.getCheckPeriod());
}
@Override
public String updateNow() {
return runtimeConfigurationBroker.updateNow();
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin.jmx;
import org.apache.activemq.broker.jmx.MBeanInfo;
public interface RuntimeConfigurationViewMBean {
@MBeanInfo("Monitored configuration url.")
String getUrl();
@MBeanInfo("Current last modified.")
String getModified();
@MBeanInfo("check period.")
String getCheckPeriod();
@MBeanInfo("force a reread of the configuration url.")
String updateNow();
}

View File

@ -0,0 +1,123 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.util.HashMap;
import javax.management.ObjectName;
import org.apache.activemq.plugin.RuntimeConfigurationBroker;
import org.apache.activemq.plugin.jmx.RuntimeConfigurationView;
import org.apache.activemq.plugin.jmx.RuntimeConfigurationViewMBean;
import org.apache.activemq.util.IntrospectionSupport;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
public class MBeanTest extends RuntimeConfigTestSupport {
@Test
public void testUpdateNow() throws Exception {
final String brokerConfig = "mBeanTest-manual-broker";
applyNewConfig(brokerConfig, "emptyManualUpdateConfig");
startBroker(brokerConfig);
assertTrue("broker alive", brokerService.isStarted());
assertEquals("no network connectors", 0, brokerService.getNetworkConnectors().size());
applyNewConfig(brokerConfig, "networkConnectorTest-one-nc", SLEEP);
assertEquals("no network connectors", 0, brokerService.getNetworkConnectors().size());
// apply via jmx
ObjectName objectName =
new ObjectName(brokerService.getBrokerObjectName().toString() +
RuntimeConfigurationBroker.objectNamePropsAppendage);
RuntimeConfigurationViewMBean runtimeConfigurationView =
(RuntimeConfigurationViewMBean) brokerService.getManagementContext().newProxyInstance(objectName,
RuntimeConfigurationViewMBean.class, false);
HashMap<String, String> props = new HashMap<String, String>();
IntrospectionSupport.getProperties(runtimeConfigurationView, props, null);
LOG.info("mbean attributes before: " + props);
String result = runtimeConfigurationView.updateNow();
LOG.info("Result from update: " + result);
assertTrue("got sensible result", result.contains("started"));
assertEquals("one new network connectors", 1, brokerService.getNetworkConnectors().size());
HashMap<String, String> propsAfter = new HashMap<String, String>();
IntrospectionSupport.getProperties(runtimeConfigurationView, propsAfter, null);
LOG.info("mbean attributes after: " + propsAfter);
String propOfInterest = "modified";
assertNotEquals("modified is different", props.get(propOfInterest), propsAfter.get(propOfInterest));
}
@Test
public void testUpdateFailedMod() throws Exception {
final String brokerConfig = "mBeanTest-manual-broker";
applyNewConfig(brokerConfig, "emptyManualUpdateConfig");
startBroker(brokerConfig);
assertTrue("broker alive", brokerService.isStarted());
assertEquals("no network connectors", 0, brokerService.getNetworkConnectors().size());
applyNewConfig(brokerConfig, "parseErrorConfig", SLEEP);
// apply via jmx
ObjectName objectName =
new ObjectName(brokerService.getBrokerObjectName().toString() +
RuntimeConfigurationBroker.objectNamePropsAppendage);
RuntimeConfigurationViewMBean runtimeConfigurationView =
(RuntimeConfigurationViewMBean) brokerService.getManagementContext().newProxyInstance(objectName,
RuntimeConfigurationViewMBean.class, false);
HashMap<String, String> props = new HashMap<String, String>();
IntrospectionSupport.getProperties(runtimeConfigurationView, props, null);
LOG.info("mbean attributes before: " + props);
String result = runtimeConfigurationView.updateNow();
LOG.info("Result from failed update: " + result);
assertTrue("got sensible result", result.contains("dudElement"));
HashMap<String, String> propsAfter = new HashMap<String, String>();
IntrospectionSupport.getProperties(runtimeConfigurationView, propsAfter, null);
LOG.info("mbean attributes after: " + propsAfter);
String propOfInterest = "modified";
assertEquals("modified is same", props.get(propOfInterest), propsAfter.get(propOfInterest));
// apply good change now
applyNewConfig(brokerConfig, "networkConnectorTest-one-nc", SLEEP);
result = runtimeConfigurationView.updateNow();
LOG.info("Result from update: " + result);
assertTrue("got sensible result", result.contains("started"));
assertEquals("one new network connectors", 1, brokerService.getNetworkConnectors().size());
propsAfter = new HashMap<String, String>();
IntrospectionSupport.getProperties(runtimeConfigurationView, propsAfter, null);
assertNotEquals("modified is different", props.get(propOfInterest), propsAfter.get(propOfInterest));
}
}

View File

@ -16,49 +16,21 @@
*/ */
package org.apache.activemq; package org.apache.activemq;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.spring.Utils;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.junit.After; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
public class NetworkConnectorTest { public class NetworkConnectorTest extends RuntimeConfigTestSupport {
public static final Logger LOG = LoggerFactory.getLogger(NetworkConnectorTest.class);
public static final int SLEEP = 4; // seconds
public static final String EMPTY_UPDATABLE_CONFIG = "emptyUpdatableConfig1000" ;
String configurationSeed = "networkConnectorTest"; String configurationSeed = "networkConnectorTest";
BrokerService brokerService;
public void startBroker(String configFileName) throws Exception {
brokerService = new BrokerService();
brokerService = BrokerFactory.createBroker("xbean:org/apache/activemq/" + configFileName + ".xml");
brokerService.start();
brokerService.waitUntilStarted();
}
@After
public void stopBroker() throws Exception {
brokerService.stop();
}
@Test @Test
public void testNew() throws Exception { public void testNew() throws Exception {
final String brokerConfig = configurationSeed + "-no-nc-broker"; final String brokerConfig = configurationSeed + "-no-nc-broker";
applyNewConfig(brokerConfig, EMPTY_UPDATABLE_CONFIG); applyNewConfig(brokerConfig, EMPTY_UPDATABLE_CONFIG);
startBroker(brokerConfig); startBroker(brokerConfig);
@ -67,7 +39,12 @@ public class NetworkConnectorTest {
applyNewConfig(brokerConfig, configurationSeed + "-one-nc", SLEEP); applyNewConfig(brokerConfig, configurationSeed + "-one-nc", SLEEP);
assertEquals("new network connectors", 1, brokerService.getNetworkConnectors().size()); assertTrue("new network connectors", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 1 == brokerService.getNetworkConnectors().size();
}
}));
// apply again - ensure no change // apply again - ensure no change
NetworkConnector networkConnector = brokerService.getNetworkConnectors().get(0); NetworkConnector networkConnector = brokerService.getNetworkConnectors().get(0);
@ -124,22 +101,4 @@ public class NetworkConnectorTest {
NetworkConnector remainingNetworkConnector = brokerService.getNetworkConnectors().get(0); NetworkConnector remainingNetworkConnector = brokerService.getNetworkConnectors().get(0);
assertEquals("name match", "one", remainingNetworkConnector.getName()); assertEquals("name match", "one", remainingNetworkConnector.getName());
} }
private void applyNewConfig(String configName, String newConfigName) throws Exception {
applyNewConfig(configName, newConfigName, 0l);
}
private void applyNewConfig(String configName, String newConfigName, long sleep) throws Exception {
Resource resource = Utils.resourceFromString("org/apache/activemq");
FileOutputStream current = new FileOutputStream(new File(resource.getFile(), configName + ".xml"));
FileInputStream modifications = new FileInputStream(new File(resource.getFile(), newConfigName + ".xml"));
modifications.getChannel().transferTo(0, Long.MAX_VALUE, current.getChannel());
current.flush();
LOG.info("Updated: " + current.getChannel());
if (sleep > 0) {
// wait for mods to kick in
TimeUnit.SECONDS.sleep(sleep);
}
}
} }

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.spring.Utils;
import org.junit.After;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
public class RuntimeConfigTestSupport {
public static final Logger LOG = LoggerFactory.getLogger(RuntimeConfigTestSupport.class);
public static final int SLEEP = 4; // seconds
public static final String EMPTY_UPDATABLE_CONFIG = "emptyUpdatableConfig1000" ;
BrokerService brokerService;
public void startBroker(String configFileName) throws Exception {
brokerService = createBroker(configFileName);
brokerService.start();
brokerService.waitUntilStarted();
}
public BrokerService createBroker(String configFileName) throws Exception {
brokerService = new BrokerService();
return BrokerFactory.createBroker("xbean:org/apache/activemq/" + configFileName + ".xml");
}
protected void applyNewConfig(String configName, String newConfigName) throws Exception {
applyNewConfig(configName, newConfigName, 0l);
}
protected void applyNewConfig(String configName, String newConfigName, long sleep) throws Exception {
Resource resource = Utils.resourceFromString("org/apache/activemq");
FileOutputStream current = new FileOutputStream(new File(resource.getFile(), configName + ".xml"));
FileInputStream modifications = new FileInputStream(new File(resource.getFile(), newConfigName + ".xml"));
modifications.getChannel().transferTo(0, Long.MAX_VALUE, current.getChannel());
current.flush();
LOG.info("Updated: " + current.getChannel());
if (sleep > 0) {
// wait for mods to kick in
TimeUnit.SECONDS.sleep(sleep);
}
}
@After
public void stopBroker() throws Exception {
brokerService.stop();
}
}

View File

@ -16,54 +16,27 @@
*/ */
package org.apache.activemq; package org.apache.activemq;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.util.concurrent.TimeUnit;
import javax.jms.*;
import javax.jms.Message; import javax.jms.Message;
import org.apache.activemq.broker.BrokerFactory; import javax.jms.MessageConsumer;
import org.apache.activemq.broker.BrokerService; import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.broker.region.DestinationInterceptor; import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.spring.Utils;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class VirtualDestTest { public class VirtualDestTest extends RuntimeConfigTestSupport {
public static final Logger LOG = LoggerFactory.getLogger(VirtualDestTest.class);
public static final int SLEEP = 4; // seconds
String configurationSeed = "virtualDestTest"; String configurationSeed = "virtualDestTest";
BrokerService brokerService;
public void startBroker(String configFileName) throws Exception {
brokerService = createBroker(configFileName);
brokerService.start();
brokerService.waitUntilStarted();
}
public BrokerService createBroker(String configFileName) throws Exception {
brokerService = new BrokerService();
return BrokerFactory.createBroker("xbean:org/apache/activemq/" + configFileName + ".xml");
}
@After
public void stopBroker() throws Exception {
brokerService.stop();
}
@Test @Test
public void testNew() throws Exception { public void testNew() throws Exception {
final String brokerConfig = configurationSeed + "-no-vd-broker"; final String brokerConfig = configurationSeed + "-new-no-vd-broker";
applyNewConfig(brokerConfig, NetworkConnectorTest.EMPTY_UPDATABLE_CONFIG); applyNewConfig(brokerConfig, RuntimeConfigTestSupport.EMPTY_UPDATABLE_CONFIG);
startBroker(brokerConfig); startBroker(brokerConfig);
assertTrue("broker alive", brokerService.isStarted()); assertTrue("broker alive", brokerService.isStarted());
@ -98,8 +71,8 @@ public class VirtualDestTest {
@Test @Test
public void testNewNoDefaultVirtualTopicSupport() throws Exception { public void testNewNoDefaultVirtualTopicSupport() throws Exception {
final String brokerConfig = configurationSeed + "-no-vd-broker"; final String brokerConfig = configurationSeed + "-no-vd-vt-broker";
applyNewConfig(brokerConfig, NetworkConnectorTest.EMPTY_UPDATABLE_CONFIG); applyNewConfig(brokerConfig, RuntimeConfigTestSupport.EMPTY_UPDATABLE_CONFIG);
brokerService = createBroker(brokerConfig); brokerService = createBroker(brokerConfig);
brokerService.setUseVirtualTopics(false); brokerService.setUseVirtualTopics(false);
brokerService.start(); brokerService.start();
@ -128,8 +101,8 @@ public class VirtualDestTest {
@Test @Test
public void testNewWithMirrorQueueSupport() throws Exception { public void testNewWithMirrorQueueSupport() throws Exception {
final String brokerConfig = configurationSeed + "-no-vd-broker"; final String brokerConfig = configurationSeed + "-no-vd-mq-broker";
applyNewConfig(brokerConfig, NetworkConnectorTest.EMPTY_UPDATABLE_CONFIG); applyNewConfig(brokerConfig, RuntimeConfigTestSupport.EMPTY_UPDATABLE_CONFIG);
brokerService = createBroker(brokerConfig); brokerService = createBroker(brokerConfig);
brokerService.setUseMirroredQueues(true); brokerService.setUseMirroredQueues(true);
brokerService.start(); brokerService.start();
@ -158,7 +131,7 @@ public class VirtualDestTest {
@Test @Test
public void testRemove() throws Exception { public void testRemove() throws Exception {
final String brokerConfig = configurationSeed + "-one-vd-broker"; final String brokerConfig = configurationSeed + "-one-vd-rm-broker";
applyNewConfig(brokerConfig, configurationSeed + "-one-vd"); applyNewConfig(brokerConfig, configurationSeed + "-one-vd");
startBroker(brokerConfig); startBroker(brokerConfig);
assertTrue("broker alive", brokerService.isStarted()); assertTrue("broker alive", brokerService.isStarted());
@ -173,7 +146,7 @@ public class VirtualDestTest {
exerciseVirtualTopic("A.Default"); exerciseVirtualTopic("A.Default");
applyNewConfig(brokerConfig, NetworkConnectorTest.EMPTY_UPDATABLE_CONFIG, SLEEP); applyNewConfig(brokerConfig, RuntimeConfigTestSupport.EMPTY_UPDATABLE_CONFIG, SLEEP);
// update will happen on addDestination // update will happen on addDestination
forceAddDestination("AnyDest"); forceAddDestination("AnyDest");
@ -197,7 +170,7 @@ public class VirtualDestTest {
@Test @Test
public void testMod() throws Exception { public void testMod() throws Exception {
final String brokerConfig = configurationSeed + "-one-vd-broker"; final String brokerConfig = configurationSeed + "-one-vd-mod-broker";
applyNewConfig(brokerConfig, configurationSeed + "-one-vd"); applyNewConfig(brokerConfig, configurationSeed + "-one-vd");
startBroker(brokerConfig); startBroker(brokerConfig);
assertTrue("broker alive", brokerService.isStarted()); assertTrue("broker alive", brokerService.isStarted());
@ -214,7 +187,7 @@ public class VirtualDestTest {
@Test @Test
public void testModWithMirroredQueue() throws Exception { public void testModWithMirroredQueue() throws Exception {
final String brokerConfig = configurationSeed + "-one-vd-broker"; final String brokerConfig = configurationSeed + "-one-vd-mq-mod-broker";
applyNewConfig(brokerConfig, configurationSeed + "-one-vd"); applyNewConfig(brokerConfig, configurationSeed + "-one-vd");
brokerService = createBroker(brokerConfig); brokerService = createBroker(brokerConfig);
brokerService.setUseMirroredQueues(true); brokerService.setUseMirroredQueues(true);
@ -257,21 +230,4 @@ public class VirtualDestTest {
connection.close(); connection.close();
} }
private void applyNewConfig(String configName, String newConfigName) throws Exception {
applyNewConfig(configName, newConfigName, 0l);
}
private void applyNewConfig(String configName, String newConfigName, long sleep) throws Exception {
Resource resource = Utils.resourceFromString("org/apache/activemq");
FileOutputStream current = new FileOutputStream(new File(resource.getFile(), configName + ".xml"));
FileInputStream modifications = new FileInputStream(new File(resource.getFile(), newConfigName + ".xml"));
modifications.getChannel().transferTo(0, Long.MAX_VALUE, current.getChannel());
current.flush();
LOG.info("Updated: " + current.getChannel());
if (sleep > 0) {
// wait for mods to kick in
TimeUnit.SECONDS.sleep(sleep);
}
}
} }

View File

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false" >
<plugins>
<runtimeConfigurationPlugin />
</plugins>
</broker>
</beans>

View File

@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false" >
<plugins>
<runtimeConfigurationPlugin />
</plugins>
<dudElement fromWrongNameSpace="true" />;
</broker>
</beans>