https://issues.apache.org/jira/browse/AMQ-5305 - modify <destination> element with runtime configuration plugin

This commit is contained in:
Dejan Bosanac 2014-08-28 14:08:11 +02:00
parent b76d8318d7
commit bbc039fceb
22 changed files with 1411 additions and 626 deletions

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.security.*;
import org.apache.activemq.schema.core.DtoAuthorizationPlugin;
import org.apache.activemq.schema.core.DtoAuthorizationMap;
import org.apache.activemq.schema.core.DtoAuthorizationEntry;
import java.util.LinkedList;
import java.util.List;
public class AuthorizationPluginProcessor extends DefaultConfigurationProcessor {
public AuthorizationPluginProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
super(plugin, configurationClass);
}
@Override
public void modify(Object existing, Object candidate) {
try {
// replace authorization map - need exclusive write lock to total broker
AuthorizationBroker authorizationBroker =
(AuthorizationBroker) plugin.getBrokerService().getBroker().getAdaptor(AuthorizationBroker.class);
authorizationBroker.setAuthorizationMap(fromDto(filter(candidate, DtoAuthorizationPlugin.Map.class)));
} catch (Exception e) {
plugin.info("failed to apply modified AuthorizationMap to AuthorizationBroker", e);
}
}
private AuthorizationMap fromDto(List<Object> map) {
XBeanAuthorizationMap xBeanAuthorizationMap = new XBeanAuthorizationMap();
for (Object o : map) {
if (o instanceof DtoAuthorizationPlugin.Map) {
DtoAuthorizationPlugin.Map dtoMap = (DtoAuthorizationPlugin.Map) o;
List<DestinationMapEntry> entries = new LinkedList<DestinationMapEntry>();
// revisit - would like to map getAuthorizationMap to generic getContents
for (Object authMap : filter(dtoMap.getAuthorizationMap(), DtoAuthorizationMap.AuthorizationEntries.class)) {
for (Object entry : filter(getContents(authMap), DtoAuthorizationEntry.class)) {
entries.add(fromDto(entry, new XBeanAuthorizationEntry()));
}
}
xBeanAuthorizationMap.setAuthorizationEntries(entries);
try {
xBeanAuthorizationMap.afterPropertiesSet();
} catch (Exception e) {
plugin.info("failed to update xBeanAuthorizationMap auth entries:", e);
}
for (Object entry : filter(dtoMap.getAuthorizationMap(), DtoAuthorizationMap.TempDestinationAuthorizationEntry.class)) {
// another restriction - would like to be getContents
DtoAuthorizationMap.TempDestinationAuthorizationEntry dtoEntry = (DtoAuthorizationMap.TempDestinationAuthorizationEntry) entry;
xBeanAuthorizationMap.setTempDestinationAuthorizationEntry(fromDto(dtoEntry.getTempDestinationAuthorizationEntry(), new TempDestinationAuthorizationEntry()));
}
} else {
plugin.info("No support for updates to: " + o);
}
}
return xBeanAuthorizationMap;
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
import java.util.List;
import org.apache.activemq.schema.core.DtoBroker;
public interface ConfigurationProcessor {
public void processChanges(List current, List modified);
public void processChanges(DtoBroker current, DtoBroker modified);
public void modify(Object existing, Object candidate);
public void addNew(Object o);
public void remove(Object o);
public ConfigurationProcessor findProcessor(Object o);
}

View File

@ -0,0 +1,215 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
import org.apache.activemq.util.IntrospectionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.xml.bind.JAXBElement;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.activemq.schema.core.DtoBroker;
public class DefaultConfigurationProcessor implements ConfigurationProcessor {
public static final Logger LOG = LoggerFactory.getLogger(DefaultConfigurationProcessor.class);
RuntimeConfigurationBroker plugin;
Class configurationClass;
Pattern matchPassword = Pattern.compile("password=.*,");
public DefaultConfigurationProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
this.plugin = plugin;
this.configurationClass = configurationClass;
}
@Override
public void processChanges(DtoBroker currentConfiguration, DtoBroker modifiedConfiguration) {
List current = filter(currentConfiguration, configurationClass);
List modified = filter(modifiedConfiguration, configurationClass);
if (current.equals(modified)) {
plugin.debug("no changes to " + configurationClass.getSimpleName());
return;
} else {
plugin.info("changes to " + configurationClass.getSimpleName());
}
processChanges(current, modified);
}
public void processChanges(List current, List modified) {
int modIndex = 0, currentIndex = 0;
for (; modIndex < modified.size() && currentIndex < current.size(); modIndex++, currentIndex++) {
// walk the list for mods
applyModifications(getContents(current.get(currentIndex)),
getContents(modified.get(modIndex)));
}
for (; modIndex < modified.size(); modIndex++) {
// new element; add all
for (Object nc : getContents(modified.get(modIndex))) {
ConfigurationProcessor processor = findProcessor(nc);
if (processor != null) {
processor.addNew(nc);
} else {
addNew(nc);
}
}
}
for (; currentIndex < current.size(); currentIndex++) {
// removal of element; remove all
for (Object nc : getContents(current.get(currentIndex))) {
ConfigurationProcessor processor = findProcessor(nc);
if (processor != null) {
processor.remove(nc);
} else {
remove(nc);
}
}
}
}
protected void applyModifications(List<Object> current, List<Object> modification) {
int modIndex = 0, currentIndex = 0;
for (; modIndex < modification.size() && currentIndex < current.size(); modIndex++, currentIndex++) {
Object existing = current.get(currentIndex);
Object candidate = modification.get(modIndex);
if (!existing.equals(candidate)) {
plugin.info("modification to:" + existing + " , with: " + candidate);
ConfigurationProcessor processor = findProcessor(existing);
if (processor != null) {
processor.modify(existing, candidate);
} else {
modify(existing, candidate);
}
}
}
for (; modIndex < modification.size(); modIndex++) {
Object mod = modification.get(modIndex);
ConfigurationProcessor processor = findProcessor(mod);
if (processor != null) {
processor.addNew(mod);
} else {
addNew(mod);
}
}
for (; currentIndex < current.size(); currentIndex++) {
Object mod = current.get(currentIndex);
ConfigurationProcessor processor = findProcessor(mod);
if (processor != null) {
processor.remove(mod);
} else {
remove(mod);
}
}
}
public void modify(Object existing, Object candidate) {
remove(existing);
addNew(candidate);
}
public void addNew(Object o) {
plugin.info("No runtime support for additions of " + o);
}
public void remove(Object o) {
plugin.info("No runtime support for removal of: " + o);
}
@Override
public ConfigurationProcessor findProcessor(Object o) {
plugin.info("No processor for " + o);
return null;
}
// mapping all supported updatable elements to support getContents
protected List<Object> getContents(Object o) {
List<Object> answer = new ArrayList<Object>();
try {
Object val = o.getClass().getMethod("getContents", new Class[]{}).invoke(o, new Object[]{});
if (val instanceof List) {
answer = (List<Object>) val;
} else {
answer.add(val);
}
} catch (NoSuchMethodException mappingIncomplete) {
plugin.debug(filterPasswords(o) + " has no modifiable elements");
} catch (Exception e) {
plugin.info("Failed to access getContents for " + o + ", runtime modifications not supported", e);
}
return answer;
}
protected String filterPasswords(Object toEscape) {
return matchPassword.matcher(toEscape.toString()).replaceAll("password=???,");
}
protected <T> List<Object> filter(Object obj, Class<T> type) {
return filter(getContents(obj), type);
}
protected <T> List<Object> filter(List<Object> objectList, Class<T> type) {
List<Object> result = new LinkedList<Object>();
for (Object o : objectList) {
if (o instanceof JAXBElement) {
JAXBElement element = (JAXBElement) o;
if (type.isAssignableFrom(element.getDeclaredType())) {
result.add((T) element.getValue());
}
} else if (type.isAssignableFrom(o.getClass())) {
result.add((T) o);
}
}
return result;
}
protected <T> T fromDto(Object dto, T instance) {
Properties properties = new Properties();
IntrospectionSupport.getProperties(dto, properties, null);
plugin.placeHolderUtil.filter(properties);
LOG.trace("applying props: " + filterPasswords(properties) + ", to " + instance.getClass().getSimpleName());
IntrospectionSupport.setProperties(instance, properties);
// deal with nested elements
for (Object nested : filter(dto, Object.class)) {
String elementName = nested.getClass().getSimpleName();
Method setter = JAXBUtils.findSetter(instance, elementName);
if (setter != null) {
List<Object> argument = new LinkedList<Object>();
for (Object elementContent : filter(nested, Object.class)) {
argument.add(fromDto(elementContent, JAXBUtils.inferTargetObject(elementContent)));
}
try {
setter.invoke(instance, JAXBUtils.matchType(argument, setter.getParameterTypes()[0]));
} catch (Exception e) {
plugin.info("failed to invoke " + setter + " on " + instance, e);
}
} else {
plugin.info("failed to find setter for " + elementName + " on :" + instance);
}
}
return instance;
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
import org.apache.activemq.schema.core.DtoVirtualDestinationInterceptor;
public class DestinationInterceptorProcessor extends DefaultConfigurationProcessor {
public DestinationInterceptorProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
super(plugin, configurationClass);
}
@Override
public ConfigurationProcessor findProcessor(Object o) {
if (o instanceof DtoVirtualDestinationInterceptor) {
return new VirtualDestinationInterceptorProcessor(plugin, o.getClass());
}
return super.findProcessor(o);
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
import org.apache.activemq.schema.core.DtoPolicyEntry;
import org.apache.activemq.schema.core.DtoPolicyMap;
public class DestinationPolicyProcessor extends DefaultConfigurationProcessor {
public DestinationPolicyProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
super(plugin, configurationClass);
}
@Override
public ConfigurationProcessor findProcessor(Object o) {
if (o instanceof DtoPolicyEntry) {
return new PolicyEntryProcessor(plugin, o.getClass());
} else if (o instanceof DtoPolicyMap) {
return new PolicyMapProcessor(plugin, o.getClass());
}
return super.findProcessor(o);
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.schema.core.DtoQueue;
import org.apache.activemq.schema.core.DtoTopic;
import java.util.Arrays;
import java.util.List;
public class DestinationsProcessor extends DefaultConfigurationProcessor {
public DestinationsProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
super(plugin, configurationClass);
}
@Override
public void processChanges(List current, List modified) {
for (Object destinations : modified) {
for (Object dto : getContents(destinations)) {
try {
ActiveMQDestination destination = createDestination(dto);
if (!containsDestination(destination)) {
plugin.addDestination(plugin.getBrokerService().getAdminConnectionContext(), destination, true);
plugin.info("Added destination " + destination);
}
} catch (Exception e) {
plugin.info("Failed to add a new destination for DTO: " + dto, e);
}
}
}
}
protected boolean containsDestination(ActiveMQDestination destination) throws Exception {
return Arrays.asList(plugin.getBrokerService().getRegionBroker().getDestinations()).contains(destination);
}
@Override
public void addNew(Object o) {
try {
ActiveMQDestination destination = createDestination(o);
plugin.addDestination(plugin.getBrokerService().getAdminConnectionContext(), destination, true);
plugin.info("Added destination " + destination);
} catch (Exception e) {
plugin.info("Failed to add a new destination for DTO: " + o, e);
}
}
private ActiveMQDestination createDestination(Object dto) throws Exception {
if (dto instanceof DtoQueue) {
return new ActiveMQQueue(((DtoQueue) dto).getPhysicalName());
} else if (dto instanceof DtoTopic) {
return new ActiveMQTopic(((DtoTopic) dto).getPhysicalName());
} else {
throw new Exception("Unknown destination type for DTO " + dto);
}
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
import javax.xml.bind.JAXBElement;
import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.schema.core.DtoTopic;
import org.apache.activemq.schema.core.DtoQueue;
import org.apache.activemq.schema.core.DtoAuthenticationUser;
import org.apache.activemq.security.AuthenticationUser;
public class JAXBUtils {
public static Method findSetter(Object instance, String elementName) {
String setter = "set" + elementName;
for (Method m : instance.getClass().getMethods()) {
if (setter.equals(m.getName())) {
return m;
}
}
return null;
}
public static Object inferTargetObject(Object elementContent) {
if (DtoTopic.class.isAssignableFrom(elementContent.getClass())) {
return new ActiveMQTopic();
} else if (DtoQueue.class.isAssignableFrom(elementContent.getClass())) {
return new ActiveMQQueue();
} else if (DtoAuthenticationUser.class.isAssignableFrom(elementContent.getClass())) {
return new AuthenticationUser();
} else {
return new Object();
}
}
public static Object matchType(List<Object> parameterValues, Class<?> aClass) {
Object result = parameterValues;
if (Set.class.isAssignableFrom(aClass)) {
result = new HashSet(parameterValues);
}
return result;
}
}

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.schema.core.DtoNetworkConnector;
import org.apache.activemq.util.IntrospectionSupport;
import java.util.TreeMap;
public class NetworkConnectorProcessor extends DefaultConfigurationProcessor {
public NetworkConnectorProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
super(plugin, configurationClass);
}
@Override
public void addNew(Object o) {
DtoNetworkConnector networkConnector = (DtoNetworkConnector) o;
if (networkConnector.getUri() != null) {
try {
DiscoveryNetworkConnector nc = fromDto(networkConnector, new DiscoveryNetworkConnector());
plugin.getBrokerService().addNetworkConnector(nc);
nc.start();
plugin.info("started new network connector: " + nc);
} catch (Exception e) {
plugin.info("Failed to add new networkConnector " + networkConnector, e);
}
}
}
@Override
public void remove(Object o) {
DtoNetworkConnector toRemove = (DtoNetworkConnector) o;
for (NetworkConnector existingCandidate :
plugin.getBrokerService().getNetworkConnectors()) {
if (configMatch(toRemove, existingCandidate)) {
if (plugin.getBrokerService().removeNetworkConnector(existingCandidate)) {
try {
existingCandidate.stop();
plugin.info("stopped and removed networkConnector: " + existingCandidate);
} catch (Exception e) {
plugin.info("Failed to stop removed network connector: " + existingCandidate);
}
}
}
}
}
private boolean configMatch(DtoNetworkConnector dto, NetworkConnector candidate) {
TreeMap<String, String> dtoProps = new TreeMap<String, String>();
IntrospectionSupport.getProperties(dto, dtoProps, null);
TreeMap<String, String> candidateProps = new TreeMap<String, String>();
IntrospectionSupport.getProperties(candidate, candidateProps, null);
// every dto prop must be present in the candidate
for (String key : dtoProps.keySet()) {
if (!candidateProps.containsKey(key) || !candidateProps.get(key).equals(dtoProps.get(key))) {
return false;
}
}
return true;
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
import org.apache.activemq.schema.core.DtoNetworkConnector;
public class NetworkConnectorsProcessor extends DefaultConfigurationProcessor {
public NetworkConnectorsProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
super(plugin, configurationClass);
}
@Override
public ConfigurationProcessor findProcessor(Object o) {
if (o instanceof DtoNetworkConnector) {
return new NetworkConnectorProcessor(plugin, o.getClass());
}
return super.findProcessor(o);
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
import org.apache.activemq.schema.core.DtoSimpleAuthenticationPlugin;
import org.apache.activemq.schema.core.DtoAuthorizationPlugin;
public class PluginsProcessor extends DefaultConfigurationProcessor {
public PluginsProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
super(plugin, configurationClass);
}
@Override
public ConfigurationProcessor findProcessor(Object o) {
if (o instanceof DtoSimpleAuthenticationPlugin) {
return new SimpleAuthenticationPluginProcessor(plugin, o.getClass());
} else if (o instanceof DtoAuthorizationPlugin) {
return new AuthorizationPluginProcessor(plugin, o.getClass());
}
return super.findProcessor(o);
}
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
import org.apache.activemq.broker.region.*;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import java.util.Set;
public class PolicyEntryProcessor extends DefaultConfigurationProcessor {
public PolicyEntryProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
super(plugin, configurationClass);
}
@Override
public void addNew(Object o) {
PolicyEntry addition = fromDto(o, new PolicyEntry());
PolicyMap existingMap = plugin.getBrokerService().getDestinationPolicy();
existingMap.put(addition.getDestination(), addition);
applyRetrospectively(addition);
plugin.info("added policy for: " + addition.getDestination());
}
@Override
public void modify(Object existing, Object candidate) {
PolicyMap existingMap = plugin.getBrokerService().getDestinationPolicy();
PolicyEntry updatedEntry = fromDto(candidate, new PolicyEntry());
Set existingEntry = existingMap.get(updatedEntry.getDestination());
if (existingEntry.size() == 1) {
updatedEntry = fromDto(candidate, (PolicyEntry) existingEntry.iterator().next());
applyRetrospectively(updatedEntry);
plugin.info("updated policy for: " + updatedEntry.getDestination());
} else {
plugin.info("cannot modify policy matching multiple destinations: " + existingEntry + ", destination:" + updatedEntry.getDestination());
}
}
protected void applyRetrospectively(PolicyEntry updatedEntry) {
RegionBroker regionBroker = (RegionBroker) plugin.getBrokerService().getRegionBroker();
for (Destination destination : regionBroker.getDestinations(updatedEntry.getDestination())) {
Destination target = destination;
if (destination instanceof DestinationFilter) {
target = ((DestinationFilter)destination).getNext();
}
if (target.getActiveMQDestination().isQueue()) {
updatedEntry.update((Queue) target);
} else if (target.getActiveMQDestination().isTopic()) {
updatedEntry.update((Topic) target);
}
plugin.debug("applied update to:" + target);
}
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
import java.util.List;
import org.apache.activemq.schema.core.DtoPolicyMap;
import org.apache.activemq.schema.core.DtoPolicyEntry;
public class PolicyMapProcessor extends DefaultConfigurationProcessor {
public PolicyMapProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
super(plugin, configurationClass);
}
@Override
public void modify(Object existing, Object candidate) {
List<Object> existingEntries = filter(existing, DtoPolicyMap.PolicyEntries.class);
List<Object> candidateEntries = filter(candidate, DtoPolicyMap.PolicyEntries.class);
// walk the map for mods
applyModifications(getContents(existingEntries.get(0)), getContents(candidateEntries.get(0)));
}
@Override
public ConfigurationProcessor findProcessor(Object o) {
if (o instanceof DtoPolicyEntry) {
return new PolicyEntryProcessor(plugin, o.getClass());
}
return super.findProcessor(o);
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
import org.apache.activemq.schema.core.DtoBroker;
public class ProcessorFactory {
public static ConfigurationProcessor createProcessor(RuntimeConfigurationBroker plugin, Class dtoClass) {
if (dtoClass.equals(DtoBroker.Plugins.class)) {
return new PluginsProcessor(plugin, dtoClass);
} else if (dtoClass.equals(DtoBroker.NetworkConnectors.class)) {
return new NetworkConnectorsProcessor(plugin, dtoClass);
} else if (dtoClass.equals(DtoBroker.DestinationPolicy.class)) {
return new DestinationPolicyProcessor(plugin, dtoClass);
} else if (dtoClass.equals(DtoBroker.DestinationInterceptors.class)) {
return new DestinationInterceptorProcessor(plugin, dtoClass);
} else if (dtoClass.equals(DtoBroker.Destinations.class)) {
return new DestinationsProcessor(plugin, dtoClass);
} else {
return new DefaultConfigurationProcessor(plugin, dtoClass);
}
}
}

View File

@ -0,0 +1,183 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.spring.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.core.io.Resource;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class PropertiesPlaceHolderUtil {
public static final Logger LOG = LoggerFactory.getLogger(PropertiesPlaceHolderUtil.class);
static final Pattern pattern = Pattern.compile("\\$\\{([^\\}]+)\\}");
final Properties properties;
public PropertiesPlaceHolderUtil(Properties properties) {
this.properties = properties;
}
public void filter(Properties toFilter) {
for (Map.Entry<Object, Object> entry : toFilter.entrySet()) {
String val = (String) entry.getValue();
String newVal = filter(val);
if (!val.equals(newVal)) {
toFilter.put(entry.getKey(), newVal);
}
}
}
public String filter(String str) {
int start = 0;
while (true) {
Matcher matcher = pattern.matcher(str);
if (!matcher.find(start)) {
break;
}
String group = matcher.group(1);
String property = properties.getProperty(group);
if (property != null) {
str = matcher.replaceFirst(Matcher.quoteReplacement(property));
} else {
start = matcher.end();
}
}
return replaceBytePostfix(str);
}
static Pattern[] byteMatchers = new Pattern[] {
Pattern.compile("^\\s*(\\d+)\\s*(b)?\\s*$", Pattern.CASE_INSENSITIVE),
Pattern.compile("^\\s*(\\d+)\\s*k(b)?\\s*$", Pattern.CASE_INSENSITIVE),
Pattern.compile("^\\s*(\\d+)\\s*m(b)?\\s*$", Pattern.CASE_INSENSITIVE),
Pattern.compile("^\\s*(\\d+)\\s*g(b)?\\s*$", Pattern.CASE_INSENSITIVE)};
// xbean can Xb, Xkb, Xmb, Xg etc
private String replaceBytePostfix(String str) {
try {
for (int i=0; i< byteMatchers.length; i++) {
Matcher matcher = byteMatchers[i].matcher(str);
if (matcher.matches()) {
long value = Long.parseLong(matcher.group(1));
for (int j=1; j<=i; j++) {
value *= 1024;
}
return String.valueOf(value);
}
}
} catch (NumberFormatException ignored) {
LOG.debug("nfe on: " + str, ignored);
}
return str;
}
public void mergeProperties(Document doc, Properties initialProperties, BrokerContext brokerContext) {
// find resources
// <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
// <property name="locations" || name="properties">
// ...
// </property>
// </bean>
LinkedList<String> resources = new LinkedList<String>();
LinkedList<String> propertiesClazzes = new LinkedList<String>();
NodeList beans = doc.getElementsByTagNameNS("*", "bean");
for (int i = 0; i < beans.getLength(); i++) {
Node bean = beans.item(0);
if (bean.hasAttributes() && bean.getAttributes().getNamedItem("class").getTextContent().contains("PropertyPlaceholderConfigurer")) {
if (bean.hasChildNodes()) {
NodeList beanProps = bean.getChildNodes();
for (int j = 0; j < beanProps.getLength(); j++) {
Node beanProp = beanProps.item(j);
if (Node.ELEMENT_NODE == beanProp.getNodeType() && beanProp.hasAttributes() && beanProp.getAttributes().getNamedItem("name") != null) {
String propertyName = beanProp.getAttributes().getNamedItem("name").getTextContent();
if ("locations".equals(propertyName)) {
// interested in value or list/value of locations property
Element beanPropElement = (Element) beanProp;
NodeList values = beanPropElement.getElementsByTagNameNS("*", "value");
for (int k = 0; k < values.getLength(); k++) {
Node value = values.item(k);
resources.add(value.getFirstChild().getTextContent());
}
} else if ("properties".equals(propertyName)) {
// bean or beanFactory
Element beanPropElement = (Element) beanProp;
NodeList values = beanPropElement.getElementsByTagNameNS("*", "bean");
for (int k = 0; k < values.getLength(); k++) {
Node value = values.item(k);
if (value.hasAttributes()) {
Node beanClassTypeNode = value.getAttributes().getNamedItem("class");
if (beanClassTypeNode != null) {
propertiesClazzes.add(beanClassTypeNode.getFirstChild().getTextContent());
}
}
}
}
}
}
}
}
}
for (String value : propertiesClazzes) {
try {
Object springBean = getClass().getClassLoader().loadClass(value).newInstance();
if (springBean instanceof FactoryBean) {
// can't access the factory or created properties from spring context so we got to recreate
initialProperties.putAll((Properties) FactoryBean.class.getMethod("getObject", (Class<?>[]) null).invoke(springBean));
}
} catch (Throwable e) {
LOG.debug("unexpected exception processing properties bean class: " + propertiesClazzes, e);
}
}
List<Resource> propResources = new LinkedList<Resource>();
for (String value : resources) {
try {
if (!value.isEmpty()) {
propResources.add(Utils.resourceFromString(filter(value)));
}
} catch (MalformedURLException e) {
LOG.info("failed to resolve resource: " + value, e);
}
}
for (Resource resource : propResources) {
Properties properties = new Properties();
try {
properties.load(resource.getInputStream());
} catch (IOException e) {
LOG.info("failed to load properties resource: " + resource, e);
}
initialProperties.putAll(properties);
}
}
}

View File

@ -16,23 +16,24 @@
*/
package org.apache.activemq.plugin;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.plugin.jmx.RuntimeConfigurationView;
import org.apache.activemq.schema.core.DtoBroker;
import org.apache.activemq.spring.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.xml.sax.SAXException;
import javax.management.JMException;
import javax.management.ObjectName;
import javax.xml.XMLConstants;
@ -47,63 +48,13 @@ import javax.xml.transform.Source;
import javax.xml.transform.stream.StreamSource;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.region.*;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.virtual.CompositeQueue;
import org.apache.activemq.broker.region.virtual.CompositeTopic;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.plugin.jmx.RuntimeConfigurationView;
import org.apache.activemq.schema.core.DtoAuthenticationUser;
import org.apache.activemq.schema.core.DtoAuthorizationEntry;
import org.apache.activemq.schema.core.DtoAuthorizationMap;
import org.apache.activemq.schema.core.DtoAuthorizationPlugin;
import org.apache.activemq.schema.core.DtoBroker;
import org.apache.activemq.schema.core.DtoCompositeQueue;
import org.apache.activemq.schema.core.DtoCompositeTopic;
import org.apache.activemq.schema.core.DtoNetworkConnector;
import org.apache.activemq.schema.core.DtoPolicyEntry;
import org.apache.activemq.schema.core.DtoPolicyMap;
import org.apache.activemq.schema.core.DtoQueue;
import org.apache.activemq.schema.core.DtoSimpleAuthenticationPlugin;
import org.apache.activemq.schema.core.DtoTopic;
import org.apache.activemq.schema.core.DtoVirtualDestinationInterceptor;
import org.apache.activemq.schema.core.DtoVirtualTopic;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.AuthorizationBroker;
import org.apache.activemq.security.AuthorizationMap;
import org.apache.activemq.security.SimpleAuthenticationBroker;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.security.TempDestinationAuthorizationEntry;
import org.apache.activemq.security.XBeanAuthorizationEntry;
import org.apache.activemq.security.XBeanAuthorizationMap;
import org.apache.activemq.spring.Utils;
import org.apache.activemq.util.IntrospectionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.xml.PluggableSchemaResolver;
import org.springframework.core.io.Resource;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
public class RuntimeConfigurationBroker extends BrokerFilter {
@ -117,8 +68,8 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
private Resource configToMonitor;
private DtoBroker currentConfiguration;
private Runnable monitorTask;
private ConcurrentLinkedQueue<Runnable> addDestinationWork = new ConcurrentLinkedQueue<Runnable>();
private ConcurrentLinkedQueue<Runnable> addConnectionWork = new ConcurrentLinkedQueue<Runnable>();
protected ConcurrentLinkedQueue<Runnable> addDestinationWork = new ConcurrentLinkedQueue<Runnable>();
protected ConcurrentLinkedQueue<Runnable> addConnectionWork = new ConcurrentLinkedQueue<Runnable>();
private ObjectName objectName;
private String infoString;
private Schema schema;
@ -259,7 +210,11 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
}
}
private void info(String s) {
protected void debug(String s) {
LOG.debug(s);
}
protected void info(String s) {
LOG.info(filterPasswords(s));
if (infoString != null) {
infoString += s;
@ -267,7 +222,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
}
}
private void info(String s, Throwable t) {
protected void info(String s, Throwable t) {
LOG.info(filterPasswords(s), t);
if (infoString != null) {
infoString += s;
@ -295,352 +250,15 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
DtoBroker.DestinationPolicy.class,
DtoBroker.NetworkConnectors.class,
DtoBroker.DestinationInterceptors.class,
DtoBroker.Plugins.class}) {
DtoBroker.Plugins.class,
DtoBroker.Destinations.class}) {
processChanges(currentConfiguration, modifiedConfiguration, upDatable);
}
}
private void processChanges(DtoBroker currentConfiguration, DtoBroker modifiedConfiguration, Class upDatable) {
List current = filter(currentConfiguration, upDatable);
List modified = filter(modifiedConfiguration, upDatable);
if (current.equals(modified)) {
LOG.debug("no changes to " + upDatable.getSimpleName());
return;
} else {
info("changes to " + upDatable.getSimpleName());
}
int modIndex = 0, currentIndex = 0;
for (; modIndex < modified.size() && currentIndex < current.size(); modIndex++, currentIndex++) {
// walk the list for mods
applyModifications(getContents(current.get(currentIndex)),
getContents(modified.get(modIndex)));
}
for (; modIndex < modified.size(); modIndex++) {
// new element; add all
for (Object nc : getContents(modified.get(modIndex))) {
addNew(nc);
}
}
for (; currentIndex < current.size(); currentIndex++) {
// removal of element; remove all
for (Object nc : getContents(current.get(currentIndex))) {
remove(nc);
}
}
}
// mapping all supported updatable elements to support getContents
private List<Object> getContents(Object o) {
List<Object> answer = new ArrayList<Object>();
try {
Object val = o.getClass().getMethod("getContents", new Class[]{}).invoke(o, new Object[]{});
if (val instanceof List) {
answer = (List<Object>) val;
} else {
answer.add(val);
}
} catch (NoSuchMethodException mappingIncomplete) {
LOG.debug(filterPasswords(o) + " has no modifiable elements");
} catch (Exception e) {
info("Failed to access getContents for " + o + ", runtime modifications not supported", e);
}
return answer;
}
private void applyModifications(List<Object> current, List<Object> modification) {
int modIndex = 0, currentIndex = 0;
for (; modIndex < modification.size() && currentIndex < current.size(); modIndex++, currentIndex++) {
Object existing = current.get(currentIndex);
Object candidate = modification.get(modIndex);
if (!existing.equals(candidate)) {
info("modification to:" + existing + " , with: " + candidate);
modify(existing, candidate);
}
}
for (; modIndex < modification.size(); modIndex++) {
addNew(modification.get(modIndex));
}
for (; currentIndex < current.size(); currentIndex++) {
remove(current.get(currentIndex));
}
}
private void modify(Object existing, Object candidate) {
if (candidate instanceof DtoAuthorizationPlugin) {
try {
// replace authorization map - need exclusive write lock to total broker
AuthorizationBroker authorizationBroker =
(AuthorizationBroker) getBrokerService().getBroker().getAdaptor(AuthorizationBroker.class);
authorizationBroker.setAuthorizationMap(fromDto(filter(candidate, DtoAuthorizationPlugin.Map.class)));
} catch (Exception e) {
info("failed to apply modified AuthorizationMap to AuthorizationBroker", e);
}
} else if (candidate instanceof DtoSimpleAuthenticationPlugin) {
try {
final SimpleAuthenticationPlugin updatedPlugin = fromDto(candidate, new SimpleAuthenticationPlugin());
final SimpleAuthenticationBroker authenticationBroker =
(SimpleAuthenticationBroker) getBrokerService().getBroker().getAdaptor(SimpleAuthenticationBroker.class);
addConnectionWork.add(new Runnable() {
public void run() {
authenticationBroker.setUserGroups(updatedPlugin.getUserGroups());
authenticationBroker.setUserPasswords(updatedPlugin.getUserPasswords());
authenticationBroker.setAnonymousAccessAllowed(updatedPlugin.isAnonymousAccessAllowed());
authenticationBroker.setAnonymousUser(updatedPlugin.getAnonymousUser());
authenticationBroker.setAnonymousGroup(updatedPlugin.getAnonymousGroup());
}
});
} catch (Exception e) {
info("failed to apply SimpleAuthenticationPlugin modifications to SimpleAuthenticationBroker", e);
}
} else if (candidate instanceof DtoPolicyMap) {
List<Object> existingEntries = filter(existing, DtoPolicyMap.PolicyEntries.class);
List<Object> candidateEntries = filter(candidate, DtoPolicyMap.PolicyEntries.class);
// walk the map for mods
applyModifications(getContents(existingEntries.get(0)), getContents(candidateEntries.get(0)));
} else if (candidate instanceof DtoPolicyEntry) {
PolicyMap existingMap = getBrokerService().getDestinationPolicy();
PolicyEntry updatedEntry = fromDto(candidate, new PolicyEntry());
Set existingEntry = existingMap.get(updatedEntry.getDestination());
if (existingEntry.size() == 1) {
updatedEntry = fromDto(candidate, (PolicyEntry) existingEntry.iterator().next());
applyRetrospectively(updatedEntry);
info("updated policy for: " + updatedEntry.getDestination());
} else {
info("cannot modify policy matching multiple destinations: " + existingEntry + ", destination:" + updatedEntry.getDestination());
}
} else {
remove(existing);
addNew(candidate);
}
}
private void applyRetrospectively(PolicyEntry updatedEntry) {
RegionBroker regionBroker = (RegionBroker) getBrokerService().getRegionBroker();
for (Destination destination : regionBroker.getDestinations(updatedEntry.getDestination())) {
Destination target = destination;
if (destination instanceof DestinationFilter) {
target = ((DestinationFilter)destination).getNext();
}
if (target.getActiveMQDestination().isQueue()) {
updatedEntry.update((Queue) target);
} else if (target.getActiveMQDestination().isTopic()) {
updatedEntry.update((Topic) target);
}
LOG.debug("applied update to:" + target);
}
}
private AuthorizationMap fromDto(List<Object> map) {
XBeanAuthorizationMap xBeanAuthorizationMap = new XBeanAuthorizationMap();
for (Object o : map) {
if (o instanceof DtoAuthorizationPlugin.Map) {
DtoAuthorizationPlugin.Map dtoMap = (DtoAuthorizationPlugin.Map) o;
List<DestinationMapEntry> entries = new LinkedList<DestinationMapEntry>();
// revisit - would like to map getAuthorizationMap to generic getContents
for (Object authMap : filter(dtoMap.getAuthorizationMap(), DtoAuthorizationMap.AuthorizationEntries.class)) {
for (Object entry : filter(getContents(authMap), DtoAuthorizationEntry.class)) {
entries.add(fromDto(entry, new XBeanAuthorizationEntry()));
}
}
xBeanAuthorizationMap.setAuthorizationEntries(entries);
try {
xBeanAuthorizationMap.afterPropertiesSet();
} catch (Exception e) {
info("failed to update xBeanAuthorizationMap auth entries:", e);
}
for (Object entry : filter(dtoMap.getAuthorizationMap(), DtoAuthorizationMap.TempDestinationAuthorizationEntry.class)) {
// another restriction - would like to be getContents
DtoAuthorizationMap.TempDestinationAuthorizationEntry dtoEntry = (DtoAuthorizationMap.TempDestinationAuthorizationEntry) entry;
xBeanAuthorizationMap.setTempDestinationAuthorizationEntry(fromDto(dtoEntry.getTempDestinationAuthorizationEntry(), new TempDestinationAuthorizationEntry()));
}
} else {
info("No support for updates to: " + o);
}
}
return xBeanAuthorizationMap;
}
private void remove(Object o) {
if (o instanceof DtoNetworkConnector) {
DtoNetworkConnector toRemove = (DtoNetworkConnector) o;
for (NetworkConnector existingCandidate :
getBrokerService().getNetworkConnectors()) {
if (configMatch(toRemove, existingCandidate)) {
if (getBrokerService().removeNetworkConnector(existingCandidate)) {
try {
existingCandidate.stop();
info("stopped and removed networkConnector: " + existingCandidate);
} catch (Exception e) {
info("Failed to stop removed network connector: " + existingCandidate);
}
}
}
}
} else if (o instanceof DtoVirtualDestinationInterceptor) {
// whack it
addDestinationWork.add(new Runnable() {
public void run() {
List<DestinationInterceptor> interceptorsList = new ArrayList<DestinationInterceptor>();
for (DestinationInterceptor candidate : getBrokerService().getDestinationInterceptors()) {
if (!(candidate instanceof VirtualDestinationInterceptor)) {
interceptorsList.add(candidate);
}
}
DestinationInterceptor[] destinationInterceptors = interceptorsList.toArray(new DestinationInterceptor[]{});
getBrokerService().setDestinationInterceptors(destinationInterceptors);
((CompositeDestinationInterceptor) ((RegionBroker) getBrokerService().getRegionBroker()).getDestinationInterceptor()).setInterceptors(destinationInterceptors);
info("removed VirtualDestinationInterceptor from: " + interceptorsList);
}
});
} else {
info("No runtime support for removal of: " + o);
}
}
private boolean configMatch(DtoNetworkConnector dto, NetworkConnector candidate) {
TreeMap<String, String> dtoProps = new TreeMap<String, String>();
IntrospectionSupport.getProperties(dto, dtoProps, null);
TreeMap<String, String> candidateProps = new TreeMap<String, String>();
IntrospectionSupport.getProperties(candidate, candidateProps, null);
// every dto prop must be present in the candidate
for (String key : dtoProps.keySet()) {
if (!candidateProps.containsKey(key) || !candidateProps.get(key).equals(dtoProps.get(key))) {
return false;
}
}
return true;
}
private void addNew(Object o) {
if (o instanceof DtoNetworkConnector) {
DtoNetworkConnector networkConnector = (DtoNetworkConnector) o;
if (networkConnector.getUri() != null) {
try {
DiscoveryNetworkConnector nc = fromDto(networkConnector, new DiscoveryNetworkConnector());
getBrokerService().addNetworkConnector(nc);
nc.start();
info("started new network connector: " + nc);
} catch (Exception e) {
info("Failed to add new networkConnector " + networkConnector, e);
}
}
} else if (o instanceof DtoVirtualDestinationInterceptor) {
final DtoVirtualDestinationInterceptor dto = (DtoVirtualDestinationInterceptor) o;
addDestinationWork.add(new Runnable() {
public void run() {
boolean updatedExistingInterceptor = false;
RegionBroker regionBroker = (RegionBroker) getBrokerService().getRegionBroker();
for (DestinationInterceptor destinationInterceptor : getBrokerService().getDestinationInterceptors()) {
if (destinationInterceptor instanceof VirtualDestinationInterceptor) {
// update existing interceptor
final VirtualDestinationInterceptor virtualDestinationInterceptor =
(VirtualDestinationInterceptor) destinationInterceptor;
virtualDestinationInterceptor.setVirtualDestinations(fromDto(dto));
info("applied updates to: " + virtualDestinationInterceptor);
updatedExistingInterceptor = true;
}
}
if (!updatedExistingInterceptor) {
// add
VirtualDestinationInterceptor virtualDestinationInterceptor =
new VirtualDestinationInterceptor();
virtualDestinationInterceptor.setVirtualDestinations(fromDto(dto));
List<DestinationInterceptor> interceptorsList = new ArrayList<DestinationInterceptor>();
interceptorsList.addAll(Arrays.asList(getBrokerService().getDestinationInterceptors()));
interceptorsList.add(virtualDestinationInterceptor);
DestinationInterceptor[] destinationInterceptors = interceptorsList.toArray(new DestinationInterceptor[]{});
getBrokerService().setDestinationInterceptors(destinationInterceptors);
((CompositeDestinationInterceptor) regionBroker.getDestinationInterceptor()).setInterceptors(destinationInterceptors);
info("applied new: " + interceptorsList);
}
regionBroker.reapplyInterceptor();
}
});
} else if (o instanceof DtoPolicyEntry) {
PolicyEntry addition = fromDto(o, new PolicyEntry());
PolicyMap existingMap = getBrokerService().getDestinationPolicy();
existingMap.put(addition.getDestination(), addition);
applyRetrospectively(addition);
info("added policy for: " + addition.getDestination());
} else {
info("No runtime support for additions of " + o);
}
}
private VirtualDestination[] fromDto(DtoVirtualDestinationInterceptor virtualDestinationInterceptor) {
List<VirtualDestination> answer = new ArrayList<VirtualDestination>();
for (Object vd : filter(virtualDestinationInterceptor, DtoVirtualDestinationInterceptor.VirtualDestinations.class)) {
for (Object vt : filter(vd, DtoVirtualTopic.class)) {
answer.add(fromDto(vt, new VirtualTopic()));
}
for (Object vt : filter(vd, DtoCompositeTopic.class)) {
answer.add(fromDto(vt, new CompositeTopic()));
}
for (Object vt : filter(vd, DtoCompositeQueue.class)) {
answer.add(fromDto(vt, new CompositeQueue()));
}
}
VirtualDestination[] array = new VirtualDestination[answer.size()];
answer.toArray(array);
return array;
}
private <T> T fromDto(Object dto, T instance) {
Properties properties = new Properties();
IntrospectionSupport.getProperties(dto, properties, null);
replacePlaceHolders(properties);
LOG.trace("applying props: " + filterPasswords(properties) + ", to " + instance.getClass().getSimpleName());
IntrospectionSupport.setProperties(instance, properties);
// deal with nested elements
for (Object nested : filter(dto, Object.class)) {
String elementName = nested.getClass().getSimpleName();
Method setter = findSetter(instance, elementName);
if (setter != null) {
List<Object> argument = new LinkedList<Object>();
for (Object elementContent : filter(nested, Object.class)) {
argument.add(fromDto(elementContent, inferTargetObject(elementContent)));
}
try {
setter.invoke(instance, matchType(argument, setter.getParameterTypes()[0]));
} catch (Exception e) {
info("failed to invoke " + setter + " on " + instance, e);
}
} else {
info("failed to find setter for " + elementName + " on :" + instance);
}
}
return instance;
ConfigurationProcessor processor = ProcessorFactory.createProcessor(this, upDatable);
processor.processChanges(currentConfiguration, modifiedConfiguration);
}
Pattern matchPassword = Pattern.compile("password=.*,");
@ -648,56 +266,6 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
return matchPassword.matcher(toEscape.toString()).replaceAll("password=???,");
}
private Object matchType(List<Object> parameterValues, Class<?> aClass) {
Object result = parameterValues;
if (Set.class.isAssignableFrom(aClass)) {
result = new HashSet(parameterValues);
}
return result;
}
private Object inferTargetObject(Object elementContent) {
if (DtoTopic.class.isAssignableFrom(elementContent.getClass())) {
return new ActiveMQTopic();
} else if (DtoQueue.class.isAssignableFrom(elementContent.getClass())) {
return new ActiveMQQueue();
} else if (DtoAuthenticationUser.class.isAssignableFrom(elementContent.getClass())) {
return new AuthenticationUser();
} else {
info("update not supported for dto: " + elementContent);
return new Object();
}
}
private Method findSetter(Object instance, String elementName) {
String setter = "set" + elementName;
for (Method m : instance.getClass().getMethods()) {
if (setter.equals(m.getName())) {
return m;
}
}
return null;
}
private <T> List<Object> filter(Object obj, Class<T> type) {
return filter(getContents(obj), type);
}
private <T> List<Object> filter(List<Object> objectList, Class<T> type) {
List<Object> result = new LinkedList<Object>();
for (Object o : objectList) {
if (o instanceof JAXBElement) {
JAXBElement element = (JAXBElement) o;
if (type.isAssignableFrom(element.getDeclaredType())) {
result.add((T) element.getValue());
}
} else if (type.isAssignableFrom(o.getClass())) {
result.add((T) o);
}
}
return result;
}
private DtoBroker loadConfiguration(Resource configToMonitor) {
DtoBroker jaxbConfig = null;
if (configToMonitor != null) {
@ -748,103 +316,10 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
if (brokerContext != null) {
Properties initialProperties = new Properties(System.getProperties());
placeHolderUtil = new PropertiesPlaceHolderUtil(initialProperties);
mergeProperties(doc, initialProperties, brokerContext);
placeHolderUtil.mergeProperties(doc, initialProperties, brokerContext);
}
}
private void mergeProperties(Document doc, Properties initialProperties, BrokerContext brokerContext) {
// find resources
// <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
// <property name="locations" || name="properties">
// ...
// </property>
// </bean>
LinkedList<String> resources = new LinkedList<String>();
LinkedList<String> propertiesClazzes = new LinkedList<String>();
NodeList beans = doc.getElementsByTagNameNS("*", "bean");
for (int i = 0; i < beans.getLength(); i++) {
Node bean = beans.item(0);
if (bean.hasAttributes() && bean.getAttributes().getNamedItem("class").getTextContent().contains("PropertyPlaceholderConfigurer")) {
if (bean.hasChildNodes()) {
NodeList beanProps = bean.getChildNodes();
for (int j = 0; j < beanProps.getLength(); j++) {
Node beanProp = beanProps.item(j);
if (Node.ELEMENT_NODE == beanProp.getNodeType() && beanProp.hasAttributes() && beanProp.getAttributes().getNamedItem("name") != null) {
String propertyName = beanProp.getAttributes().getNamedItem("name").getTextContent();
if ("locations".equals(propertyName)) {
// interested in value or list/value of locations property
Element beanPropElement = (Element) beanProp;
NodeList values = beanPropElement.getElementsByTagNameNS("*", "value");
for (int k = 0; k < values.getLength(); k++) {
Node value = values.item(k);
resources.add(value.getFirstChild().getTextContent());
}
} else if ("properties".equals(propertyName)) {
// bean or beanFactory
Element beanPropElement = (Element) beanProp;
NodeList values = beanPropElement.getElementsByTagNameNS("*", "bean");
for (int k = 0; k < values.getLength(); k++) {
Node value = values.item(k);
if (value.hasAttributes()) {
Node beanClassTypeNode = value.getAttributes().getNamedItem("class");
if (beanClassTypeNode != null) {
propertiesClazzes.add(beanClassTypeNode.getFirstChild().getTextContent());
}
}
}
}
}
}
}
}
}
for (String value : propertiesClazzes) {
try {
Object springBean = getClass().getClassLoader().loadClass(value).newInstance();
if (springBean instanceof FactoryBean) {
// can't access the factory or created properties from spring context so we got to recreate
initialProperties.putAll((Properties) FactoryBean.class.getMethod("getObject", (Class<?>[]) null).invoke(springBean));
}
} catch (Throwable e) {
LOG.debug("unexpected exception processing properties bean class: " + propertiesClazzes, e);
}
}
List<Resource> propResources = new LinkedList<Resource>();
for (String value : resources) {
try {
if (!value.isEmpty()) {
propResources.add(Utils.resourceFromString(replacePlaceHolders(value)));
}
} catch (MalformedURLException e) {
info("failed to resolve resource: " + value, e);
}
}
for (Resource resource : propResources) {
Properties properties = new Properties();
try {
properties.load(resource.getInputStream());
} catch (IOException e) {
info("failed to load properties resource: " + resource, e);
}
initialProperties.putAll(properties);
}
}
private void replacePlaceHolders(Properties properties) {
if (placeHolderUtil != null) {
placeHolderUtil.filter(properties);
}
}
private String replacePlaceHolders(String s) {
if (placeHolderUtil != null) {
s = placeHolderUtil.filter(s);
}
return s;
}
private Schema getSchema() throws SAXException, IOException {
if (schema == null) {
SchemaFactory schemaFactory = SchemaFactory.newInstance(
@ -874,67 +349,4 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
this.checkPeriod = checkPeriod;
}
static public class PropertiesPlaceHolderUtil {
static final Pattern pattern = Pattern.compile("\\$\\{([^\\}]+)\\}");
final Properties properties;
public PropertiesPlaceHolderUtil(Properties properties) {
this.properties = properties;
}
public void filter(Properties toFilter) {
for (Map.Entry<Object, Object> entry : toFilter.entrySet()) {
String val = (String) entry.getValue();
String newVal = filter(val);
if (!val.equals(newVal)) {
toFilter.put(entry.getKey(), newVal);
}
}
}
public String filter(String str) {
int start = 0;
while (true) {
Matcher matcher = pattern.matcher(str);
if (!matcher.find(start)) {
break;
}
String group = matcher.group(1);
String property = properties.getProperty(group);
if (property != null) {
str = matcher.replaceFirst(Matcher.quoteReplacement(property));
} else {
start = matcher.end();
}
}
return replaceBytePostfix(str);
}
static Pattern[] byteMatchers = new Pattern[] {
Pattern.compile("^\\s*(\\d+)\\s*(b)?\\s*$", Pattern.CASE_INSENSITIVE),
Pattern.compile("^\\s*(\\d+)\\s*k(b)?\\s*$", Pattern.CASE_INSENSITIVE),
Pattern.compile("^\\s*(\\d+)\\s*m(b)?\\s*$", Pattern.CASE_INSENSITIVE),
Pattern.compile("^\\s*(\\d+)\\s*g(b)?\\s*$", Pattern.CASE_INSENSITIVE)};
// xbean can Xb, Xkb, Xmb, Xg etc
private String replaceBytePostfix(String str) {
try {
for (int i=0; i< byteMatchers.length; i++) {
Matcher matcher = byteMatchers[i].matcher(str);
if (matcher.matches()) {
long value = Long.parseLong(matcher.group(1));
for (int j=1; j<=i; j++) {
value *= 1024;
}
return String.valueOf(value);
}
}
} catch (NumberFormatException ignored) {
LOG.debug("nfe on: " + str, ignored);
}
return str;
}
}
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
import org.apache.activemq.security.SimpleAuthenticationBroker;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
public class SimpleAuthenticationPluginProcessor extends DefaultConfigurationProcessor {
public SimpleAuthenticationPluginProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
super(plugin, configurationClass);
}
@Override
public void modify(Object existing, Object candidate) {
try {
final SimpleAuthenticationPlugin updatedPlugin = fromDto(candidate, new SimpleAuthenticationPlugin());
final SimpleAuthenticationBroker authenticationBroker =
(SimpleAuthenticationBroker) plugin.getBrokerService().getBroker().getAdaptor(SimpleAuthenticationBroker.class);
plugin.addConnectionWork.add(new Runnable() {
public void run() {
authenticationBroker.setUserGroups(updatedPlugin.getUserGroups());
authenticationBroker.setUserPasswords(updatedPlugin.getUserPasswords());
authenticationBroker.setAnonymousAccessAllowed(updatedPlugin.isAnonymousAccessAllowed());
authenticationBroker.setAnonymousUser(updatedPlugin.getAnonymousUser());
authenticationBroker.setAnonymousGroup(updatedPlugin.getAnonymousGroup());
}
});
} catch (Exception e) {
plugin.info("failed to apply SimpleAuthenticationPlugin modifications to SimpleAuthenticationBroker", e);
}
}
}

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.virtual.*;
import org.apache.activemq.schema.core.DtoVirtualDestinationInterceptor;
import org.apache.activemq.schema.core.DtoVirtualTopic;
import org.apache.activemq.schema.core.DtoCompositeTopic;
import org.apache.activemq.schema.core.DtoCompositeQueue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class VirtualDestinationInterceptorProcessor extends DefaultConfigurationProcessor {
public VirtualDestinationInterceptorProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
super(plugin, configurationClass);
}
@Override
public void addNew(Object o) {
final DtoVirtualDestinationInterceptor dto = (DtoVirtualDestinationInterceptor) o;
plugin.addDestinationWork.add(new Runnable() {
public void run() {
boolean updatedExistingInterceptor = false;
RegionBroker regionBroker = (RegionBroker) plugin.getBrokerService().getRegionBroker();
for (DestinationInterceptor destinationInterceptor : plugin.getBrokerService().getDestinationInterceptors()) {
if (destinationInterceptor instanceof VirtualDestinationInterceptor) {
// update existing interceptor
final VirtualDestinationInterceptor virtualDestinationInterceptor =
(VirtualDestinationInterceptor) destinationInterceptor;
virtualDestinationInterceptor.setVirtualDestinations(fromDto(dto));
plugin.info("applied updates to: " + virtualDestinationInterceptor);
updatedExistingInterceptor = true;
}
}
if (!updatedExistingInterceptor) {
// add
VirtualDestinationInterceptor virtualDestinationInterceptor =
new VirtualDestinationInterceptor();
virtualDestinationInterceptor.setVirtualDestinations(fromDto(dto));
List<DestinationInterceptor> interceptorsList = new ArrayList<DestinationInterceptor>();
interceptorsList.addAll(Arrays.asList(plugin.getBrokerService().getDestinationInterceptors()));
interceptorsList.add(virtualDestinationInterceptor);
DestinationInterceptor[] destinationInterceptors = interceptorsList.toArray(new DestinationInterceptor[]{});
plugin.getBrokerService().setDestinationInterceptors(destinationInterceptors);
((CompositeDestinationInterceptor) regionBroker.getDestinationInterceptor()).setInterceptors(destinationInterceptors);
plugin.info("applied new: " + interceptorsList);
}
regionBroker.reapplyInterceptor();
}
});
}
@Override
public void remove(Object o) {
// whack it
plugin.addDestinationWork.add(new Runnable() {
public void run() {
List<DestinationInterceptor> interceptorsList = new ArrayList<DestinationInterceptor>();
for (DestinationInterceptor candidate : plugin.getBrokerService().getDestinationInterceptors()) {
if (!(candidate instanceof VirtualDestinationInterceptor)) {
interceptorsList.add(candidate);
}
}
DestinationInterceptor[] destinationInterceptors = interceptorsList.toArray(new DestinationInterceptor[]{});
plugin.getBrokerService().setDestinationInterceptors(destinationInterceptors);
((CompositeDestinationInterceptor) ((RegionBroker) plugin.getBrokerService().getRegionBroker()).getDestinationInterceptor()).setInterceptors(destinationInterceptors);
plugin.info("removed VirtualDestinationInterceptor from: " + interceptorsList);
}
});
}
private VirtualDestination[] fromDto(DtoVirtualDestinationInterceptor virtualDestinationInterceptor) {
List<VirtualDestination> answer = new ArrayList<VirtualDestination>();
for (Object vd : filter(virtualDestinationInterceptor, DtoVirtualDestinationInterceptor.VirtualDestinations.class)) {
for (Object vt : filter(vd, DtoVirtualTopic.class)) {
answer.add(fromDto(vt, new VirtualTopic()));
}
for (Object vt : filter(vd, DtoCompositeTopic.class)) {
answer.add(fromDto(vt, new CompositeTopic()));
}
for (Object vt : filter(vd, DtoCompositeQueue.class)) {
answer.add(fromDto(vt, new CompositeQueue()));
}
}
VirtualDestination[] array = new VirtualDestination[answer.size()];
answer.toArray(array);
return array;
}
}

View File

@ -36,6 +36,10 @@
<jxb:property name="Contents" />
</jxb:bindings>
<jxb:bindings node="xs:element[@name='broker']/xs:complexType/xs:choice/xs:choice/xs:element[@name='destinations']/xs:complexType/xs:choice">
<jxb:property name="Contents" />
</jxb:bindings>
<jxb:bindings node="xs:element[@name='queue']/xs:complexType/xs:choice">
<jxb:property name="Contents" />
</jxb:bindings>

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Set;
import static org.junit.Assert.assertTrue;
public class DestinationsTest extends RuntimeConfigTestSupport {
public static final Logger LOG = LoggerFactory.getLogger(DestinationsTest.class);
@Test
public void testMod() throws Exception {
String configurationSeed = "destinationTest";
final String brokerConfig = configurationSeed + "-destinations";
applyNewConfig(brokerConfig, configurationSeed + "-original");
startBroker(brokerConfig);
assertTrue("broker alive", brokerService.isStarted());
printDestinations();
assertTrue("contains original", containsDestination(new ActiveMQQueue("ORIGINAL")));
LOG.info("Adding destinations");
applyNewConfig(brokerConfig, configurationSeed + "-add", SLEEP);
printDestinations();
assertTrue("contains original", containsDestination(new ActiveMQQueue("ORIGINAL")));
assertTrue("contains before", containsDestination(new ActiveMQTopic("BEFORE")));
assertTrue("contains after", containsDestination(new ActiveMQQueue("AFTER")));
LOG.info("Removing destinations");
applyNewConfig(brokerConfig, configurationSeed + "-remove", SLEEP);
printDestinations();
assertTrue("contains original", containsDestination(new ActiveMQQueue("ORIGINAL")));
assertTrue("contains before", containsDestination(new ActiveMQTopic("BEFORE")));
assertTrue("contains after", containsDestination(new ActiveMQQueue("AFTER")));
}
protected boolean containsDestination(ActiveMQDestination destination) throws Exception {
return Arrays.asList(brokerService.getRegionBroker().getDestinations()).contains(destination);
}
protected void printDestinations() throws Exception {
ActiveMQDestination[] destinations = brokerService.getRegionBroker().getDestinations();
for (ActiveMQDestination destination : destinations) {
LOG.info("Broker destination: " + destination.toString());
}
}
}

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
<destinations>
<topic physicalName="BEFORE"/>
<queue physicalName="ORIGINAL"/>
<queue physicalName="AFTER"/>
</destinations>
<plugins>
<runtimeConfigurationPlugin checkPeriod="1000"/>
</plugins>
</broker>
</beans>

View File

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
<destinations>
<queue physicalName="ORIGINAL"/>
</destinations>
<plugins>
<runtimeConfigurationPlugin checkPeriod="1000"/>
</plugins>
</broker>
</beans>

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
<destinations>
<topic physicalName="BEFORE"/>
<queue physicalName="AFTER"/>
</destinations>
<plugins>
<runtimeConfigurationPlugin checkPeriod="1000"/>
</plugins>
</broker>
</beans>