Adding a new JavaRuntimeConfigurationBroker which allows dynamic
changes to parts of the broker through a Java api instead of
just through xml configuration.  This is useful if starting a broker
with java config and not using xml.  It is also useful for temporary
changes that shouldn't be persisted.
This commit is contained in:
Christopher L. Shannon (cshannon) 2015-08-10 15:19:26 +00:00
parent a01578ad4c
commit 43c3cae2c0
21 changed files with 1932 additions and 332 deletions

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
import javax.management.ObjectName;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AbstractRuntimeConfigurationBroker extends BrokerFilter {
public static final Logger LOG = LoggerFactory.getLogger(AbstractRuntimeConfigurationBroker.class);
protected final ReentrantReadWriteLock addDestinationBarrier = new ReentrantReadWriteLock();
protected final ReentrantReadWriteLock addConnectionBarrier = new ReentrantReadWriteLock();
protected Runnable monitorTask;
protected ConcurrentLinkedQueue<Runnable> addDestinationWork = new ConcurrentLinkedQueue<Runnable>();
protected ConcurrentLinkedQueue<Runnable> addConnectionWork = new ConcurrentLinkedQueue<Runnable>();
protected ObjectName objectName;
protected String infoString;
public AbstractRuntimeConfigurationBroker(Broker next) {
super(next);
}
@Override
public void start() throws Exception {
super.start();
}
@Override
public void stop() throws Exception {
if (monitorTask != null) {
try {
this.getBrokerService().getScheduler().cancel(monitorTask);
} catch (Exception letsNotStopStop) {
LOG.warn("Failed to cancel config monitor task", letsNotStopStop);
}
}
unregisterMbean();
super.stop();
}
protected void registerMbean() {
}
protected void unregisterMbean() {
}
// modification to virtual destinations interceptor needs exclusive access to destination add
@Override
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception {
Runnable work = addDestinationWork.poll();
if (work != null) {
try {
addDestinationBarrier.writeLock().lockInterruptibly();
do {
work.run();
work = addDestinationWork.poll();
} while (work != null);
return super.addDestination(context, destination, createIfTemporary);
} finally {
addDestinationBarrier.writeLock().unlock();
}
} else {
try {
addDestinationBarrier.readLock().lockInterruptibly();
return super.addDestination(context, destination, createIfTemporary);
} finally {
addDestinationBarrier.readLock().unlock();
}
}
}
// modification to authentication plugin needs exclusive access to connection add
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
Runnable work = addConnectionWork.poll();
if (work != null) {
try {
addConnectionBarrier.writeLock().lockInterruptibly();
do {
work.run();
work = addConnectionWork.poll();
} while (work != null);
super.addConnection(context, info);
} finally {
addConnectionBarrier.writeLock().unlock();
}
} else {
try {
addConnectionBarrier.readLock().lockInterruptibly();
super.addConnection(context, info);
} finally {
addConnectionBarrier.readLock().unlock();
}
}
}
protected void debug(String s) {
LOG.debug(s);
}
protected void info(String s) {
LOG.info(filterPasswords(s));
if (infoString != null) {
infoString += s;
infoString += ";";
}
}
protected void info(String s, Throwable t) {
LOG.info(filterPasswords(s), t);
if (infoString != null) {
infoString += s;
infoString += ", " + t;
infoString += ";";
}
}
Pattern matchPassword = Pattern.compile("password=.*,");
protected String filterPasswords(Object toEscape) {
return matchPassword.matcher(toEscape.toString()).replaceAll("password=???,");
}
}

View File

@ -16,23 +16,10 @@
*/
package org.apache.activemq.plugin;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.plugin.jmx.RuntimeConfigurationView;
import org.apache.activemq.schema.core.DtoBroker;
import org.apache.activemq.spring.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.xml.sax.SAXException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Properties;
import javax.management.JMException;
import javax.management.ObjectName;
@ -48,30 +35,29 @@ import javax.xml.transform.Source;
import javax.xml.transform.stream.StreamSource;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
public class RuntimeConfigurationBroker extends BrokerFilter {
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.plugin.jmx.RuntimeConfigurationView;
import org.apache.activemq.schema.core.DtoBroker;
import org.apache.activemq.spring.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.xml.sax.SAXException;
public class RuntimeConfigurationBroker extends AbstractRuntimeConfigurationBroker {
public static final Logger LOG = LoggerFactory.getLogger(RuntimeConfigurationBroker.class);
public static final String objectNamePropsAppendage = ",service=RuntimeConfiguration,name=Plugin";
private final ReentrantReadWriteLock addDestinationBarrier = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock addConnectionBarrier = new ReentrantReadWriteLock();
PropertiesPlaceHolderUtil placeHolderUtil = null;
private long checkPeriod;
private long lastModified = -1;
private Resource configToMonitor;
private DtoBroker currentConfiguration;
private Runnable monitorTask;
protected ConcurrentLinkedQueue<Runnable> addDestinationWork = new ConcurrentLinkedQueue<Runnable>();
protected ConcurrentLinkedQueue<Runnable> addConnectionWork = new ConcurrentLinkedQueue<Runnable>();
private ObjectName objectName;
private String infoString;
private Schema schema;
public RuntimeConfigurationBroker(Broker next) {
@ -99,19 +85,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
}
@Override
public void stop() throws Exception {
if (monitorTask != null) {
try {
this.getBrokerService().getScheduler().cancel(monitorTask);
} catch (Exception letsNotStopStop) {
LOG.warn("Failed to cancel config monitor task", letsNotStopStop);
}
}
unregisterMbean();
super.stop();
}
private void registerMbean() {
protected void registerMbean() {
if (getBrokerService().isUseJmx()) {
ManagementContext managementContext = getBrokerService().getManagementContext();
try {
@ -123,7 +97,8 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
}
}
private void unregisterMbean() {
@Override
protected void unregisterMbean() {
if (objectName != null) {
try {
getBrokerService().getManagementContext().unregisterMBean(objectName);
@ -132,56 +107,6 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
}
}
// modification to virtual destinations interceptor needs exclusive access to destination add
@Override
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception {
Runnable work = addDestinationWork.poll();
if (work != null) {
try {
addDestinationBarrier.writeLock().lockInterruptibly();
do {
work.run();
work = addDestinationWork.poll();
} while (work != null);
return super.addDestination(context, destination, createIfTemporary);
} finally {
addDestinationBarrier.writeLock().unlock();
}
} else {
try {
addDestinationBarrier.readLock().lockInterruptibly();
return super.addDestination(context, destination, createIfTemporary);
} finally {
addDestinationBarrier.readLock().unlock();
}
}
}
// modification to authentication plugin needs exclusive access to connection add
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
Runnable work = addConnectionWork.poll();
if (work != null) {
try {
addConnectionBarrier.writeLock().lockInterruptibly();
do {
work.run();
work = addConnectionWork.poll();
} while (work != null);
super.addConnection(context, info);
} finally {
addConnectionBarrier.writeLock().unlock();
}
} else {
try {
addConnectionBarrier.readLock().lockInterruptibly();
super.addConnection(context, info);
} finally {
addConnectionBarrier.readLock().unlock();
}
}
}
public String updateNow() {
LOG.info("Manual configuration update triggered");
infoString = "";
@ -210,26 +135,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
}
}
protected void debug(String s) {
LOG.debug(s);
}
protected void info(String s) {
LOG.info(filterPasswords(s));
if (infoString != null) {
infoString += s;
infoString += ";";
}
}
protected void info(String s, Throwable t) {
LOG.info(filterPasswords(s), t);
if (infoString != null) {
infoString += s;
infoString += ", " + t;
infoString += ";";
}
}
private void applyModifications(Resource configToMonitor) {
DtoBroker changed = loadConfiguration(configToMonitor);
@ -261,10 +167,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
processor.processChanges(currentConfiguration, modifiedConfiguration);
}
Pattern matchPassword = Pattern.compile("password=.*,");
private String filterPasswords(Object toEscape) {
return matchPassword.matcher(toEscape.toString()).replaceAll("password=???,");
}
private DtoBroker loadConfiguration(Resource configToMonitor) {
DtoBroker jaxbConfig = null;

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
public abstract class UpdateVirtualDestinationsTask implements Runnable {
private final AbstractRuntimeConfigurationBroker plugin;
public UpdateVirtualDestinationsTask(
AbstractRuntimeConfigurationBroker plugin) {
super();
this.plugin = plugin;
}
@Override
public void run() {
boolean updatedExistingInterceptor = false;
RegionBroker regionBroker = (RegionBroker) plugin.getBrokerService()
.getRegionBroker();
for (DestinationInterceptor destinationInterceptor : plugin
.getBrokerService().getDestinationInterceptors()) {
if (destinationInterceptor instanceof VirtualDestinationInterceptor) {
// update existing interceptor
final VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) destinationInterceptor;
virtualDestinationInterceptor
.setVirtualDestinations(getVirtualDestinations());
plugin.info("applied updates to: "
+ virtualDestinationInterceptor);
updatedExistingInterceptor = true;
}
}
if (!updatedExistingInterceptor) {
// add
VirtualDestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor();
virtualDestinationInterceptor.setVirtualDestinations(getVirtualDestinations());
List<DestinationInterceptor> interceptorsList = new ArrayList<DestinationInterceptor>();
interceptorsList.addAll(Arrays.asList(plugin.getBrokerService()
.getDestinationInterceptors()));
interceptorsList.add(virtualDestinationInterceptor);
DestinationInterceptor[] destinationInterceptors = interceptorsList
.toArray(new DestinationInterceptor[] {});
plugin.getBrokerService().setDestinationInterceptors(
destinationInterceptors);
((CompositeDestinationInterceptor) regionBroker
.getDestinationInterceptor())
.setInterceptors(destinationInterceptors);
plugin.info("applied new: " + interceptorsList);
}
regionBroker.reapplyInterceptor();
}
protected abstract VirtualDestination[] getVirtualDestinations();
}

View File

@ -16,18 +16,21 @@
*/
package org.apache.activemq.plugin;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.virtual.*;
import org.apache.activemq.broker.region.virtual.CompositeQueue;
import org.apache.activemq.broker.region.virtual.CompositeTopic;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.schema.core.DtoCompositeQueue;
import org.apache.activemq.schema.core.DtoCompositeTopic;
import org.apache.activemq.schema.core.DtoVirtualDestinationInterceptor;
import org.apache.activemq.schema.core.DtoVirtualTopic;
import org.apache.activemq.schema.core.DtoCompositeTopic;
import org.apache.activemq.schema.core.DtoCompositeQueue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class VirtualDestinationInterceptorProcessor extends DefaultConfigurationProcessor {
@ -38,49 +41,23 @@ public class VirtualDestinationInterceptorProcessor extends DefaultConfiguration
@Override
public void addNew(Object o) {
final DtoVirtualDestinationInterceptor dto = (DtoVirtualDestinationInterceptor) o;
plugin.addDestinationWork.add(new Runnable() {
public void run() {
boolean updatedExistingInterceptor = false;
RegionBroker regionBroker = (RegionBroker) plugin.getBrokerService().getRegionBroker();
plugin.addDestinationWork.add(new UpdateVirtualDestinationsTask(plugin) {
for (DestinationInterceptor destinationInterceptor : plugin.getBrokerService().getDestinationInterceptors()) {
if (destinationInterceptor instanceof VirtualDestinationInterceptor) {
// update existing interceptor
final VirtualDestinationInterceptor virtualDestinationInterceptor =
(VirtualDestinationInterceptor) destinationInterceptor;
virtualDestinationInterceptor.setVirtualDestinations(fromDto(dto));
plugin.info("applied updates to: " + virtualDestinationInterceptor);
updatedExistingInterceptor = true;
}
}
if (!updatedExistingInterceptor) {
// add
VirtualDestinationInterceptor virtualDestinationInterceptor =
new VirtualDestinationInterceptor();
virtualDestinationInterceptor.setVirtualDestinations(fromDto(dto));
List<DestinationInterceptor> interceptorsList = new ArrayList<DestinationInterceptor>();
interceptorsList.addAll(Arrays.asList(plugin.getBrokerService().getDestinationInterceptors()));
interceptorsList.add(virtualDestinationInterceptor);
DestinationInterceptor[] destinationInterceptors = interceptorsList.toArray(new DestinationInterceptor[]{});
plugin.getBrokerService().setDestinationInterceptors(destinationInterceptors);
((CompositeDestinationInterceptor) regionBroker.getDestinationInterceptor()).setInterceptors(destinationInterceptors);
plugin.info("applied new: " + interceptorsList);
}
regionBroker.reapplyInterceptor();
@Override
protected VirtualDestination[] getVirtualDestinations() {
return fromDto(dto);
}
});
}
@Override
public void remove(Object o) {
// whack it
plugin.addDestinationWork.add(new Runnable() {
@Override
public void run() {
List<DestinationInterceptor> interceptorsList = new ArrayList<DestinationInterceptor>();
for (DestinationInterceptor candidate : plugin.getBrokerService().getDestinationInterceptors()) {
@ -113,4 +90,6 @@ public class VirtualDestinationInterceptorProcessor extends DefaultConfiguration
answer.toArray(array);
return array;
}
}

View File

@ -0,0 +1,190 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin.java;
import java.util.Arrays;
import java.util.Set;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.plugin.AbstractRuntimeConfigurationBroker;
import org.apache.activemq.plugin.UpdateVirtualDestinationsTask;
import org.apache.activemq.security.AuthorizationBroker;
import org.apache.activemq.security.AuthorizationMap;
import org.apache.activemq.security.SimpleAuthenticationBroker;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JavaRuntimeConfigurationBroker extends AbstractRuntimeConfigurationBroker {
/**
* @param next
*/
public JavaRuntimeConfigurationBroker(Broker next) {
super(next);
}
public static final Logger LOG = LoggerFactory.getLogger(JavaRuntimeConfigurationBroker.class);
//Virtual Destinations
public void setVirtualDestinations(final VirtualDestination[] virtualDestinations) {
this.addDestinationWork.add(new UpdateVirtualDestinationsTask(this) {
@Override
protected VirtualDestination[] getVirtualDestinations() {
return virtualDestinations;
}
});
}
//New Destinations
public void setDestinations(final ActiveMQDestination[] destinations) {
for (ActiveMQDestination destination : destinations) {
try {
if (!containsDestination(destination)) {
this.addDestination(this.getBrokerService().getAdminConnectionContext(), destination, true);
this.info("Added destination " + destination);
}
} catch (Exception e) {
this.info("Failed to add a new destination for: " + destination, e);
}
}
}
protected boolean containsDestination(ActiveMQDestination destination) throws Exception {
return Arrays.asList(this.getBrokerService().getRegionBroker().getDestinations()).contains(destination);
}
public void addNewDestination(ActiveMQDestination destination) {
try {
this.addDestination(this.getBrokerService().getAdminConnectionContext(), destination, true);
this.info("Added destination " + destination);
} catch (Exception e) {
this.info("Failed to add a new destination for: " + destination, e);
}
}
//Network Connectors
public void addNetworkConnector(final DiscoveryNetworkConnector nc) {
try {
if (!getBrokerService().getNetworkConnectors().contains(nc)) {
getBrokerService().addNetworkConnector(nc);
nc.start();
info("started new network connector: " + nc);
} else {
info("skipping network connector add, already exists: " + nc);
}
} catch (Exception e) {
info("Failed to add new networkConnector " + nc, e);
}
}
public void updateNetworkConnector(final DiscoveryNetworkConnector nc) {
removeNetworkConnector(nc);
addNetworkConnector(nc);
}
public void removeNetworkConnector(final DiscoveryNetworkConnector existingCandidate) {
if (getBrokerService().removeNetworkConnector(existingCandidate)) {
try {
existingCandidate.stop();
info("stopped and removed networkConnector: " + existingCandidate);
} catch (Exception e) {
info("Failed to stop removed network connector: " + existingCandidate);
}
}
}
//Policy entries
public void addNewPolicyEntry(PolicyEntry addition) {
PolicyMap existingMap = getBrokerService().getDestinationPolicy();
existingMap.put(addition.getDestination(), addition);
applyRetrospectively(addition);
info("added policy for: " + addition.getDestination());
}
public void modifyPolicyEntry(PolicyEntry existing) {
PolicyMap existingMap = this.getBrokerService().getDestinationPolicy();
Set<?> existingEntry = existingMap.get(existing.getDestination());
if (existingEntry.size() == 1) {
applyRetrospectively(existing);
this.info("updated policy for: " + existing.getDestination());
} else {
this.info("cannot modify policy matching multiple destinations: " + existingEntry + ", destination:" + existing.getDestination());
}
}
protected void applyRetrospectively(PolicyEntry updatedEntry) {
RegionBroker regionBroker = (RegionBroker) this.getBrokerService().getRegionBroker();
for (Destination destination : regionBroker.getDestinations(updatedEntry.getDestination())) {
Destination target = destination;
if (destination instanceof DestinationFilter) {
target = ((DestinationFilter)destination).getNext();
}
if (target.getActiveMQDestination().isQueue()) {
updatedEntry.update((Queue) target);
} else if (target.getActiveMQDestination().isTopic()) {
updatedEntry.update((Topic) target);
}
this.debug("applied update to:" + target);
}
}
//authentication plugin
public void updateSimpleAuthenticationPlugin(final SimpleAuthenticationPlugin updatedPlugin) {
try {
final SimpleAuthenticationBroker authenticationBroker =
(SimpleAuthenticationBroker) getBrokerService().getBroker().getAdaptor(SimpleAuthenticationBroker.class);
addConnectionWork.add(new Runnable() {
@Override
public void run() {
authenticationBroker.setUserGroups(updatedPlugin.getUserGroups());
authenticationBroker.setUserPasswords(updatedPlugin.getUserPasswords());
authenticationBroker.setAnonymousAccessAllowed(updatedPlugin.isAnonymousAccessAllowed());
authenticationBroker.setAnonymousUser(updatedPlugin.getAnonymousUser());
authenticationBroker.setAnonymousGroup(updatedPlugin.getAnonymousGroup());
}
});
} catch (Exception e) {
info("failed to apply SimpleAuthenticationPlugin modifications to SimpleAuthenticationBroker", e);
}
}
//authorization map
public void updateAuthorizationMap(final AuthorizationMap authorizationMap) {
try {
// replace authorization map - need exclusive write lock to total broker
AuthorizationBroker authorizationBroker =
(AuthorizationBroker) getBrokerService().getBroker().getAdaptor(AuthorizationBroker.class);
authorizationBroker.setAuthorizationMap(authorizationMap);
} catch (Exception e) {
info("failed to apply modified AuthorizationMap to AuthorizationBroker", e);
}
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin.java;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class JavaRuntimeConfigurationPlugin implements BrokerPlugin {
public static final Logger LOG = LoggerFactory.getLogger(JavaRuntimeConfigurationPlugin.class);
private JavaRuntimeConfigurationBroker runtimeConfigurationBroker;
@Override
public Broker installPlugin(Broker broker) throws Exception {
LOG.info("installing javaRuntimeConfiguration plugin");
runtimeConfigurationBroker = new JavaRuntimeConfigurationBroker(broker);
return runtimeConfigurationBroker;
}
public JavaRuntimeConfigurationBroker getBroker() {
return runtimeConfigurationBroker;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.plugin.jmx;
import java.util.Date;
import org.apache.activemq.plugin.RuntimeConfigurationBroker;
import org.springframework.core.io.Resource;

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import static org.junit.Assert.fail;
import javax.jms.JMSException;
import javax.jms.Session;
public abstract class AbstractAuthorizationTest extends RuntimeConfigTestSupport {
protected void assertDeniedTemp(String userPass) {
try {
assertAllowedTemp(userPass);
fail("Expected not allowed exception");
} catch (Exception expected) {
LOG.debug("got:" + expected, expected);
}
}
protected void assertAllowedTemp(String userPass) throws Exception {
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(userPass, userPass);
connection.start();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createConsumer(session.createTemporaryQueue());
} finally {
connection.close();
}
}
protected void assertDenied(String userPass, String destination) {
try {
assertAllowed(userPass, destination);
fail("Expected not allowed exception");
} catch (JMSException expected) {
LOG.debug("got:" + expected, expected);
}
}
protected void assertAllowed(String userPass, String dest) throws JMSException {
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(userPass, userPass);
connection.start();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createConsumer(session.createQueue(dest));
} finally {
connection.close();
}
}
}

View File

@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.util.Collections;
import java.util.Map;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
*
*
*/
public abstract class AbstractVirtualDestTest extends RuntimeConfigTestSupport {
protected void forceAddDestination(String dest) throws Exception {
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createConsumer(session.createQueue("Consumer.A." + dest));
connection.close();
}
protected void exerciseVirtualTopic(String topic) throws Exception {
exerciseVirtualTopic("Consumer.A.", topic);
}
protected void exerciseVirtualTopic(String prefix, String topic) throws Exception {
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(session.createQueue(prefix + topic));
LOG.info("new consumer for: " + consumer.getDestination());
MessageProducer producer = session.createProducer(session.createTopic(topic));
final String body = "To vt:" + topic;
Message message = sendAndReceiveMessage(session, consumer, producer, body);
assertNotNull("got message", message);
assertEquals("got expected message", body, ((TextMessage) message).getText());
connection.close();
}
protected void exerciseCompositeQueue(String dest, String consumerQ) throws Exception {
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(session.createQueue(consumerQ));
LOG.info("new consumer for: " + consumer.getDestination());
MessageProducer producer = session.createProducer(session.createQueue(dest));
final String body = "To cq:" + dest;
Message message = sendAndReceiveMessage(session, consumer, producer, body);
assertNotNull("got message", message);
assertEquals("got expected message", body, ((TextMessage) message).getText());
connection.close();
}
protected void exerciseFilteredCompositeQueue(String dest, String consumerDestination, String acceptedHeaderValue) throws Exception {
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(session.createQueue(consumerDestination));
LOG.info("new consumer for: " + consumer.getDestination());
MessageProducer producer = session.createProducer(session.createQueue(dest));
// positive test
String body = "To filtered cq:" + dest;
Message message = sendAndReceiveMessage(session, consumer, producer, body, Collections.singletonMap("odd", acceptedHeaderValue));
assertNotNull("The message did not reach the destination even though it should pass through the filter.", message);
assertEquals("Did not get expected message", body, ((TextMessage) message).getText());
// negative test
message = sendAndReceiveMessage(session, consumer, producer, "Not to filtered cq:" + dest, Collections.singletonMap("odd", "somethingElse"));
assertNull("The message reached the destination, but it should have been removed by the filter.", message);
connection.close();
}
protected Message sendAndReceiveMessage(Session session,
ActiveMQMessageConsumer consumer, MessageProducer producer,
final String messageBody) throws Exception {
return sendAndReceiveMessage(session, consumer, producer, messageBody,
null);
}
protected Message sendAndReceiveMessage(Session session,
ActiveMQMessageConsumer consumer, MessageProducer producer,
final String messageBody, Map<String, String> propertiesMap)
throws Exception {
TextMessage messageToSend = session.createTextMessage(messageBody);
if (propertiesMap != null) {
for (String headerKey : propertiesMap.keySet()) {
messageToSend.setStringProperty(headerKey,
propertiesMap.get(headerKey));
}
}
producer.send(messageToSend);
LOG.info("sent to: " + producer.getDestination());
Message message = null;
for (int i = 0; i < 10 && message == null; i++) {
message = consumer.receive(1000);
}
return message;
}
}

View File

@ -16,18 +16,11 @@
*/
package org.apache.activemq;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Test;
public class AuthorizationTest extends RuntimeConfigTestSupport {
public class AuthorizationTest extends AbstractAuthorizationTest {
private static final int RECEIVE_TIMEOUT = 1000;
String configurationSeed = "authorizationTest";
@ -104,45 +97,4 @@ public class AuthorizationTest extends RuntimeConfigTestSupport {
assertAllowedTemp("guest");
}
private void assertDeniedTemp(String userPass) {
try {
assertAllowedTemp(userPass);
fail("Expected not allowed exception");
} catch (Exception expected) {
LOG.debug("got:" + expected, expected);
}
}
private void assertAllowedTemp(String userPass) throws Exception {
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(userPass, userPass);
connection.start();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createConsumer(session.createTemporaryQueue());
} finally {
connection.close();
}
}
private void assertDenied(String userPass, String destination) {
try {
assertAllowed(userPass, destination);
fail("Expected not allowed exception");
} catch (JMSException expected) {
LOG.debug("got:" + expected, expected);
}
}
private void assertAllowed(String userPass, String dest) throws JMSException {
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(userPass, userPass);
connection.start();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createConsumer(session.createQueue(dest));
} finally {
connection.close();
}
}
}

View File

@ -16,7 +16,10 @@
*/
package org.apache.activemq;
import org.apache.activemq.broker.region.Destination;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@ -24,11 +27,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Set;
import static org.junit.Assert.assertTrue;
public class DestinationsTest extends RuntimeConfigTestSupport {
public static final Logger LOG = LoggerFactory.getLogger(DestinationsTest.class);

View File

@ -16,19 +16,19 @@
*/
package org.apache.activemq;
import java.util.HashMap;
import javax.management.ObjectName;
import org.apache.activemq.plugin.RuntimeConfigurationBroker;
import org.apache.activemq.plugin.jmx.RuntimeConfigurationView;
import org.apache.activemq.plugin.jmx.RuntimeConfigurationViewMBean;
import org.apache.activemq.util.IntrospectionSupport;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import javax.management.ObjectName;
import org.apache.activemq.plugin.RuntimeConfigurationBroker;
import org.apache.activemq.plugin.jmx.RuntimeConfigurationViewMBean;
import org.apache.activemq.util.IntrospectionSupport;
import org.junit.Test;
public class MBeanTest extends RuntimeConfigTestSupport {
@Test

View File

@ -16,17 +16,14 @@
*/
package org.apache.activemq;
import java.util.List;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.junit.Test;
public class NetworkConnectorTest extends RuntimeConfigTestSupport {
String configurationSeed = "networkConnectorTest";
@ -63,7 +60,6 @@ public class NetworkConnectorTest extends RuntimeConfigTestSupport {
}
@Test
public void testMod() throws Exception {

View File

@ -37,7 +37,7 @@ public class RuntimeConfigTestSupport {
public static final int SLEEP = 4; // seconds
public static final String EMPTY_UPDATABLE_CONFIG = "emptyUpdatableConfig1000" ;
BrokerService brokerService;
protected BrokerService brokerService;
@Rule
public TestWatcher watchman = new TestWatcher() {

View File

@ -16,22 +16,18 @@
*/
package org.apache.activemq;
import java.util.Collections;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.util.Wait;
import org.junit.Test;
import static org.junit.Assert.*;
public class VirtualDestTest extends RuntimeConfigTestSupport {
public class VirtualDestTest extends AbstractVirtualDestTest {
String configurationSeed = "virtualDestTest";
@ -183,6 +179,7 @@ public class VirtualDestTest extends RuntimeConfigTestSupport {
forceAddDestination("AnyDest");
assertTrue("getDestinationInterceptors empty on time", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() {
return 0 == brokerService.getDestinationInterceptors().length;
}
@ -235,7 +232,7 @@ public class VirtualDestTest extends RuntimeConfigTestSupport {
assertEquals("still one interceptor", 1, brokerService.getDestinationInterceptors().length);
}
@Test
public void testNewFilteredComposite() throws Exception {
final String brokerConfig = configurationSeed + "-new-filtered-composite-vd-broker";
@ -246,7 +243,7 @@ public class VirtualDestTest extends RuntimeConfigTestSupport {
applyNewConfig(brokerConfig, configurationSeed + "-add-filtered-composite-vd", SLEEP);
exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "yes");
}
}
@Test
public void testModFilteredComposite() throws Exception {
@ -259,96 +256,9 @@ public class VirtualDestTest extends RuntimeConfigTestSupport {
applyNewConfig(brokerConfig, configurationSeed + "-mod-filtered-composite-vd", SLEEP);
exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "no");
exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "no");
}
private void forceAddDestination(String dest) throws Exception {
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createConsumer(session.createQueue("Consumer.A." + dest));
connection.close();
}
private void exerciseVirtualTopic(String topic) throws Exception {
exerciseVirtualTopic("Consumer.A.", topic);
}
private void exerciseVirtualTopic(String prefix, String topic) throws Exception {
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(session.createQueue(prefix + topic));
LOG.info("new consumer for: " + consumer.getDestination());
MessageProducer producer = session.createProducer(session.createTopic(topic));
final String body = "To vt:" + topic;
Message message = sendAndReceiveMessage(session, consumer, producer, body);
assertNotNull("got message", message);
assertEquals("got expected message", body, ((TextMessage) message).getText());
connection.close();
}
private void exerciseCompositeQueue(String dest, String consumerQ) throws Exception {
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(session.createQueue(consumerQ));
LOG.info("new consumer for: " + consumer.getDestination());
MessageProducer producer = session.createProducer(session.createQueue(dest));
final String body = "To cq:" + dest;
Message message = sendAndReceiveMessage(session, consumer, producer, body);
assertNotNull("got message", message);
assertEquals("got expected message", body, ((TextMessage) message).getText());
connection.close();
}
private void exerciseFilteredCompositeQueue(String dest, String consumerDestination, String acceptedHeaderValue) throws Exception {
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(session.createQueue(consumerDestination));
LOG.info("new consumer for: " + consumer.getDestination());
MessageProducer producer = session.createProducer(session.createQueue(dest));
// positive test
String body = "To filtered cq:" + dest;
Message message = sendAndReceiveMessage(session, consumer, producer, body, Collections.singletonMap("odd", acceptedHeaderValue));
assertNotNull("The message did not reach the destination even though it should pass through the filter.", message);
assertEquals("Did not get expected message", body, ((TextMessage) message).getText());
// negative test
message = sendAndReceiveMessage(session, consumer, producer, "Not to filtered cq:" + dest, Collections.singletonMap("odd", "somethingElse"));
assertNull("The message reached the destination, but it should have been removed by the filter.", message);
connection.close();
}
private Message sendAndReceiveMessage(Session session,
ActiveMQMessageConsumer consumer, MessageProducer producer,
final String messageBody) throws Exception {
return sendAndReceiveMessage(session, consumer, producer, messageBody, null);
}
private Message sendAndReceiveMessage(Session session,
ActiveMQMessageConsumer consumer, MessageProducer producer,
final String messageBody, Map<String, String> propertiesMap)
throws Exception {
TextMessage messageToSend = session.createTextMessage(messageBody);
if (propertiesMap != null) {
for (String headerKey : propertiesMap.keySet()) {
messageToSend.setStringProperty(headerKey, propertiesMap.get(headerKey));
}
}
producer.send(messageToSend);
LOG.info("sent to: " + producer.getDestination());
Message message = null;
for (int i = 0; i < 10 && message == null; i++) {
message = consumer.receive(1000);
}
return message;
}
}

View File

@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.java;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RuntimeConfigTestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.AuthorizationEntry;
import org.apache.activemq.security.AuthorizationPlugin;
import org.apache.activemq.security.DefaultAuthorizationMap;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.security.TempDestinationAuthorizationEntry;
import org.junit.Test;
public class JavaAuthenticationTest extends RuntimeConfigTestSupport {
public static final int SLEEP = 2; // seconds
private JavaRuntimeConfigurationBroker javaConfigBroker;
private SimpleAuthenticationPlugin authenticationPlugin;
public void startBroker(BrokerService brokerService) throws Exception {
this.brokerService = brokerService;
authenticationPlugin = new SimpleAuthenticationPlugin();
authenticationPlugin.setAnonymousAccessAllowed(false);
authenticationPlugin.setAnonymousGroup("ag");
authenticationPlugin.setAnonymousUser("au");
List<AuthenticationUser> users = new ArrayList<>();
users.add(new AuthenticationUser("test_user_password", "test_user_password", "users"));
authenticationPlugin.setUsers(users);
AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin();
DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap();
authorizationPlugin.setMap(authorizationMap);
@SuppressWarnings("rawtypes")
List<DestinationMapEntry> entries = new ArrayList<>();
entries.add(buildQueueAuthorizationEntry(">", "admins", "admins", "admins"));
entries.add(buildQueueAuthorizationEntry("USERS.>", "users", "users", "users"));
entries.add(buildTopicAuthorizationEntry(">", "admins", "admins", "admins"));
entries.add(buildTopicAuthorizationEntry("USERS.>", "users", "users", "users"));
entries.add(buildTopicAuthorizationEntry("ActiveMQ.Advisory.>", "guests,users", "guests,users", "guests,users"));
TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry();
tempEntry.setRead("tempDestinationAdmins");
tempEntry.setWrite("tempDestinationAdmins");
tempEntry.setAdmin("tempDestinationAdmins");
authorizationMap.setAuthorizationEntries(entries);
authorizationMap.setTempDestinationAuthorizationEntry(tempEntry);
brokerService.setPlugins(new BrokerPlugin[]{new JavaRuntimeConfigurationPlugin(),
authenticationPlugin, authorizationPlugin});
brokerService.setPersistent(false);
brokerService.start();
brokerService.waitUntilStarted();
javaConfigBroker =
(JavaRuntimeConfigurationBroker) brokerService.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class);
}
@Test
public void testMod() throws Exception {
BrokerService brokerService = new BrokerService();
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
assertAllowed("test_user_password", "USERS.A");
assertDenied("another_test_user_password", "USERS.A");
// anonymous
assertDenied(null, "USERS.A");
List<AuthenticationUser> users = new ArrayList<>();
users.add(new AuthenticationUser("test_user_password", "test_user_password", "users"));
users.add(new AuthenticationUser("another_test_user_password", "another_test_user_password", "users"));
authenticationPlugin.setAnonymousGroup("users");
authenticationPlugin.setUsers(users);
authenticationPlugin.setAnonymousAccessAllowed(true);
javaConfigBroker.updateSimpleAuthenticationPlugin(authenticationPlugin);
TimeUnit.SECONDS.sleep(SLEEP);
assertAllowed("test_user_password", "USERS.A");
assertAllowed("another_test_user_password", "USERS.A");
assertAllowed(null, "USERS.A");
}
private void assertDenied(String userPass, String destination) {
try {
assertAllowed(userPass, destination);
fail("Expected not allowed exception");
} catch (JMSException expected) {
LOG.debug("got:" + expected, expected);
}
}
private void assertAllowed(String userPass, String dest) throws JMSException {
ActiveMQConnection connection = (ActiveMQConnection) new ActiveMQConnectionFactory("vm://localhost").createConnection(userPass, userPass);
connection.start();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createConsumer(session.createQueue(dest));
} finally {
connection.close();
}
}
private AuthorizationEntry buildQueueAuthorizationEntry(String queue, String read, String write, String admin) throws Exception {
AuthorizationEntry entry = new AuthorizationEntry();
entry.setQueue(queue);
entry.setRead(read);
entry.setWrite(write);
entry.setAdmin(admin);
return entry;
}
private AuthorizationEntry buildTopicAuthorizationEntry(String topic, String read, String write, String admin) throws Exception {
AuthorizationEntry entry = new AuthorizationEntry();
entry.setTopic(topic);
entry.setRead(read);
entry.setWrite(write);
entry.setAdmin(admin);
return entry;
}
}

View File

@ -0,0 +1,244 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.java;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.AbstractAuthorizationTest;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
import org.apache.activemq.security.AuthorizationEntry;
import org.apache.activemq.security.AuthorizationPlugin;
import org.apache.activemq.security.DefaultAuthorizationMap;
import org.apache.activemq.security.JaasAuthenticationPlugin;
import org.apache.activemq.security.TempDestinationAuthorizationEntry;
import org.junit.Test;
public class JavaAuthorizationTest extends AbstractAuthorizationTest {
public static final int SLEEP = 2; // seconds
String configurationSeed = "authorizationTest";
private JavaRuntimeConfigurationBroker javaConfigBroker;
public void startBroker(BrokerService brokerService) throws Exception {
this.brokerService = brokerService;
JaasAuthenticationPlugin authenticationPlugin = new JaasAuthenticationPlugin();
authenticationPlugin.setConfiguration("activemq-domain");
AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin();
DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap();
authorizationPlugin.setMap(authorizationMap);
brokerService.setPlugins(new BrokerPlugin[]{new JavaRuntimeConfigurationPlugin(),
authenticationPlugin, authorizationPlugin});
brokerService.setPersistent(false);
brokerService.start();
brokerService.waitUntilStarted();
javaConfigBroker =
(JavaRuntimeConfigurationBroker) brokerService.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class);
}
@Test
public void testMod() throws Exception {
DefaultAuthorizationMap authorizationMap = buildUsersMap();
BrokerService brokerService = new BrokerService();
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
javaConfigBroker.updateAuthorizationMap(authorizationMap);
assertAllowed("user", "USERS.A");
assertDenied("user", "GUESTS.A");
assertDeniedTemp("guest");
// applyNewConfig(brokerConfig, configurationSeed + "-users-guests", SLEEP);
authorizationMap = buildUsersGuestsMap();
javaConfigBroker.updateAuthorizationMap(authorizationMap);
TimeUnit.SECONDS.sleep(SLEEP);
assertAllowed("user", "USERS.A");
assertAllowed("guest", "GUESTS.A");
assertDenied("user", "GUESTS.A");
assertAllowedTemp("guest");
}
@Test
public void testModRm() throws Exception {
DefaultAuthorizationMap authorizationMap = buildUsersGuestsMap();
BrokerService brokerService = new BrokerService();
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
javaConfigBroker.updateAuthorizationMap(authorizationMap);
TimeUnit.SECONDS.sleep(SLEEP);
assertAllowed("user", "USERS.A");
assertAllowed("guest", "GUESTS.A");
assertDenied("user", "GUESTS.A");
assertAllowedTemp("guest");
authorizationMap = buildUsersMap();
javaConfigBroker.updateAuthorizationMap(authorizationMap);
TimeUnit.SECONDS.sleep(SLEEP);
assertAllowed("user", "USERS.A");
assertDenied("user", "GUESTS.A");
assertDeniedTemp("guest");
}
@Test
public void testWildcard() throws Exception {
DefaultAuthorizationMap authorizationMap = buildWildcardUsersGuestsMap();
BrokerService brokerService = new BrokerService();
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
javaConfigBroker.updateAuthorizationMap(authorizationMap);
TimeUnit.SECONDS.sleep(SLEEP);
final String ALL_USERS = "ALL.USERS.>";
final String ALL_GUESTS = "ALL.GUESTS.>";
assertAllowed("user", ALL_USERS);
assertAllowed("guest", ALL_GUESTS);
assertDenied("user", ALL_USERS + "," + ALL_GUESTS);
assertDenied("guest", ALL_GUESTS + "," + ALL_USERS);
final String ALL_PREFIX = "ALL.>";
assertDenied("user", ALL_PREFIX);
assertDenied("guest", ALL_PREFIX);
assertAllowed("user", "ALL.USERS.A");
assertAllowed("user", "ALL.USERS.A,ALL.USERS.B");
assertAllowed("guest", "ALL.GUESTS.A");
assertAllowed("guest", "ALL.GUESTS.A,ALL.GUESTS.B");
assertDenied("user", "USERS.>");
assertDenied("guest", "GUESTS.>");
assertAllowedTemp("guest");
}
/**
* @return
* @throws Exception
*/
private DefaultAuthorizationMap buildWildcardUsersGuestsMap() throws Exception {
DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap();
@SuppressWarnings("rawtypes")
List<DestinationMapEntry> entries = new ArrayList<>();
entries.add(buildQueueAuthorizationEntry(">", "admins", "admins", "admins"));
entries.add(buildQueueAuthorizationEntry("ALL.USERS.>", "users", "users", "users"));
entries.add(buildQueueAuthorizationEntry("ALL.GUESTS.>", "guests", "guests,users", "guests,users"));
entries.add(buildTopicAuthorizationEntry(">", "admins", "admins", "admins"));
entries.add(buildTopicAuthorizationEntry("ALL.USERS.>", "users", "users", "users"));
entries.add(buildTopicAuthorizationEntry("ALL.GUESTS.>", "guests", "guests,users", "guests,users"));
entries.add(buildTopicAuthorizationEntry("ActiveMQ.Advisory.>", "guests,users", "guests,users", "guests,users"));
TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry();
tempEntry.setRead("tempDestinationAdmins,guests");
tempEntry.setWrite("tempDestinationAdmins,guests");
tempEntry.setAdmin("tempDestinationAdmins,guests");
authorizationMap.setAuthorizationEntries(entries);
authorizationMap.setTempDestinationAuthorizationEntry(tempEntry);
return authorizationMap;
}
private DefaultAuthorizationMap buildUsersMap() throws Exception {
DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap();
@SuppressWarnings("rawtypes")
List<DestinationMapEntry> entries = new ArrayList<>();
entries.add(buildQueueAuthorizationEntry(">", "admins", "admins", "admins"));
entries.add(buildQueueAuthorizationEntry("USERS.>", "users", "users", "users"));
entries.add(buildTopicAuthorizationEntry(">", "admins", "admins", "admins"));
entries.add(buildTopicAuthorizationEntry("USERS.>", "users", "users", "users"));
entries.add(buildTopicAuthorizationEntry("ActiveMQ.Advisory.>", "guests,users", "guests,users", "guests,users"));
TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry();
tempEntry.setRead("tempDestinationAdmins");
tempEntry.setWrite("tempDestinationAdmins");
tempEntry.setAdmin("tempDestinationAdmins");
authorizationMap.setAuthorizationEntries(entries);
authorizationMap.setTempDestinationAuthorizationEntry(tempEntry);
return authorizationMap;
}
private DefaultAuthorizationMap buildUsersGuestsMap() throws Exception {
DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap();
@SuppressWarnings("rawtypes")
List<DestinationMapEntry> entries = new ArrayList<>();
entries.add(buildQueueAuthorizationEntry(">", "admins", "admins", "admins"));
entries.add(buildQueueAuthorizationEntry("USERS.>", "users", "users", "users"));
entries.add(buildQueueAuthorizationEntry("GUESTS.>", "guests", "guests,users", "guests,users"));
entries.add(buildTopicAuthorizationEntry(">", "admins", "admins", "admins"));
entries.add(buildTopicAuthorizationEntry("USERS.>", "users", "users", "users"));
entries.add(buildTopicAuthorizationEntry("GUESTS.>", "guests", "guests,users", "guests,users"));
entries.add(buildTopicAuthorizationEntry("ActiveMQ.Advisory.>", "guests,users", "guests,users", "guests,users"));
TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry();
tempEntry.setRead("tempDestinationAdmins,guests");
tempEntry.setWrite("tempDestinationAdmins,guests");
tempEntry.setAdmin("tempDestinationAdmins,guests");
authorizationMap.setAuthorizationEntries(entries);
authorizationMap.setTempDestinationAuthorizationEntry(tempEntry);
return authorizationMap;
}
private AuthorizationEntry buildQueueAuthorizationEntry(String queue, String read, String write, String admin) throws Exception {
AuthorizationEntry entry = new AuthorizationEntry();
entry.setQueue(queue);
entry.setRead(read);
entry.setWrite(write);
entry.setAdmin(admin);
return entry;
}
private AuthorizationEntry buildTopicAuthorizationEntry(String topic, String read, String write, String admin) throws Exception {
AuthorizationEntry entry = new AuthorizationEntry();
entry.setTopic(topic);
entry.setRead(read);
entry.setWrite(write);
entry.setAdmin(admin);
return entry;
}
}

View File

@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.java;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.RuntimeConfigTestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
import org.apache.activemq.util.Wait;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JavaDestinationsTest extends RuntimeConfigTestSupport {
public static final Logger LOG = LoggerFactory.getLogger(JavaDestinationsTest.class);
private JavaRuntimeConfigurationBroker javaConfigBroker;
public void startBroker(BrokerService brokerService) throws Exception {
this.brokerService = brokerService;
brokerService.setPlugins(new BrokerPlugin[]{new JavaRuntimeConfigurationPlugin()});
brokerService.setPersistent(false);
brokerService.start();
brokerService.waitUntilStarted();
javaConfigBroker =
(JavaRuntimeConfigurationBroker) brokerService.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class);
}
@Test
public void testMod() throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setDestinations(new ActiveMQDestination[] {new ActiveMQQueue("ORIGINAL")});
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
printDestinations();
assertTrue("contains original", containsDestination(new ActiveMQQueue("ORIGINAL")));
LOG.info("Adding destinations");
//apply new config
javaConfigBroker.setDestinations(new ActiveMQDestination[] {
new ActiveMQTopic("BEFORE"), new ActiveMQQueue("ORIGINAL"), new ActiveMQQueue("AFTER")});
printDestinations();
assertTrue("contains destinations", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return containsDestination(new ActiveMQQueue("ORIGINAL"))
&& containsDestination(new ActiveMQTopic("BEFORE"))
&& containsDestination(new ActiveMQQueue("AFTER"));
}
}, TimeUnit.MILLISECONDS.convert(SLEEP, TimeUnit.SECONDS)));
LOG.info("Removing destinations");
//apply new config
javaConfigBroker.setDestinations(new ActiveMQDestination[] {
new ActiveMQTopic("BEFORE"), new ActiveMQQueue("AFTER")});
printDestinations();
assertTrue("contains destinations", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return containsDestination(new ActiveMQQueue("ORIGINAL"))
&& containsDestination(new ActiveMQTopic("BEFORE"))
&& containsDestination(new ActiveMQQueue("AFTER"));
}
}, TimeUnit.MILLISECONDS.convert(SLEEP, TimeUnit.SECONDS)));
}
protected boolean containsDestination(ActiveMQDestination destination) throws Exception {
return Arrays.asList(brokerService.getRegionBroker().getDestinations()).contains(destination);
}
protected void printDestinations() throws Exception {
ActiveMQDestination[] destinations = brokerService.getRegionBroker().getDestinations();
for (ActiveMQDestination destination : destinations) {
LOG.info("Broker destination: " + destination.toString());
}
}
}

View File

@ -0,0 +1,170 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.java;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.net.URI;
import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.RuntimeConfigTestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
import org.apache.activemq.util.Wait;
import org.junit.Test;
public class JavaNetworkConnectorTest extends RuntimeConfigTestSupport {
public static final int SLEEP = 2; // seconds
private JavaRuntimeConfigurationBroker javaConfigBroker;
public void startBroker(BrokerService brokerService) throws Exception {
this.brokerService = brokerService;
brokerService.setPlugins(new BrokerPlugin[]{new JavaRuntimeConfigurationPlugin()});
brokerService.setPersistent(false);
brokerService.start();
brokerService.waitUntilStarted();
javaConfigBroker =
(JavaRuntimeConfigurationBroker) brokerService.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class);
}
@Test
public void testNew() throws Exception {
final BrokerService brokerService = new BrokerService();
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
assertEquals("no network connectors", 0, brokerService.getNetworkConnectors().size());
DiscoveryNetworkConnector nc = createNetworkConnector();
javaConfigBroker.addNetworkConnector(nc);
assertTrue("new network connectors", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 1 == brokerService.getNetworkConnectors().size();
}
}));
NetworkConnector networkConnector = brokerService.getNetworkConnectors().get(0);
javaConfigBroker.addNetworkConnector(nc);
TimeUnit.SECONDS.sleep(SLEEP);
assertEquals("no new network connectors", 1, brokerService.getNetworkConnectors().size());
assertSame("same instance", networkConnector, brokerService.getNetworkConnectors().get(0));
// verify nested elements
assertEquals("has exclusions", 2, networkConnector.getExcludedDestinations().size());
assertEquals("one statically included", 1, networkConnector.getStaticallyIncludedDestinations().size());
assertEquals("one dynamically included", 1, networkConnector.getDynamicallyIncludedDestinations().size());
assertEquals("one durable", 1, networkConnector.getDurableDestinations().size());
}
@Test
public void testMod() throws Exception {
final BrokerService brokerService = new BrokerService();
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
assertEquals("no network connectors", 0, brokerService.getNetworkConnectors().size());
DiscoveryNetworkConnector nc = createNetworkConnector();
javaConfigBroker.addNetworkConnector(nc);
TimeUnit.SECONDS.sleep(SLEEP);
assertEquals("one network connectors", 1, brokerService.getNetworkConnectors().size());
// track the original
NetworkConnector networkConnector = brokerService.getNetworkConnectors().get(0);
assertEquals("network ttl is default", 1, networkConnector.getNetworkTTL());
nc.setNetworkTTL(2);
javaConfigBroker.updateNetworkConnector(nc);
TimeUnit.SECONDS.sleep(SLEEP);
assertEquals("still one network connectors", 1, brokerService.getNetworkConnectors().size());
NetworkConnector modNetworkConnector = brokerService.getNetworkConnectors().get(0);
assertEquals("got ttl update", 2, modNetworkConnector.getNetworkTTL());
// apply again - ensure no change
javaConfigBroker.updateNetworkConnector(nc);
assertEquals("no new network connectors", 1, brokerService.getNetworkConnectors().size());
assertSame("same instance", modNetworkConnector, brokerService.getNetworkConnectors().get(0));
}
@Test
public void testRemove() throws Exception {
final BrokerService brokerService = new BrokerService();
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
assertEquals("no network connectors", 0, brokerService.getNetworkConnectors().size());
DiscoveryNetworkConnector nc1 = new DiscoveryNetworkConnector();
nc1.setUri(new URI("static:(tcp://localhost:5555)"));
nc1.setNetworkTTL(1);
nc1.setName("one");
DiscoveryNetworkConnector nc2 = new DiscoveryNetworkConnector();
nc2.setUri(new URI("static:(tcp://localhost:5555)"));
nc2.setNetworkTTL(1);
nc2.setName("one");
javaConfigBroker.addNetworkConnector(nc1);
javaConfigBroker.addNetworkConnector(nc2);
TimeUnit.SECONDS.sleep(SLEEP);
assertEquals("correct network connectors", 2, brokerService.getNetworkConnectors().size());
javaConfigBroker.removeNetworkConnector(nc2);
assertTrue("expected mod on time", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 1 == brokerService.getNetworkConnectors().size();
}
}));
NetworkConnector remainingNetworkConnector = brokerService.getNetworkConnectors().get(0);
assertEquals("name match", "one", remainingNetworkConnector.getName());
}
private DiscoveryNetworkConnector createNetworkConnector() throws Exception {
DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector();
nc.setUri(new URI("static:(tcp://localhost:5555)"));
nc.setNetworkTTL(1);
nc.setName("one");
nc.setExcludedDestinations(Arrays.asList(new ActiveMQTopic("LAN.>"), new ActiveMQQueue("LAN.>")));
nc.setDynamicallyIncludedDestinations(Arrays.<ActiveMQDestination>asList(new ActiveMQQueue("DynamicallyIncluded.*")));
nc.setStaticallyIncludedDestinations(Arrays.<ActiveMQDestination>asList(new ActiveMQTopic("StaticallyIncluded.*")));
nc.setDurableDestinations(new HashSet<>(Arrays.<ActiveMQDestination>asList(new ActiveMQTopic("durableDest"))));
return nc;
}
}

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.java;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RuntimeConfigTestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
import org.junit.Test;
public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
public static final int SLEEP = 2; // seconds
private JavaRuntimeConfigurationBroker javaConfigBroker;
public void startBroker(BrokerService brokerService) throws Exception {
this.brokerService = brokerService;
brokerService.setPlugins(new BrokerPlugin[]{new JavaRuntimeConfigurationPlugin()});
brokerService.setPersistent(false);
brokerService.start();
brokerService.waitUntilStarted();
javaConfigBroker =
(JavaRuntimeConfigurationBroker) brokerService.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class);
}
@Test
public void testMod() throws Exception {
BrokerService brokerService = new BrokerService();
PolicyMap policyMap = new PolicyMap();
PolicyEntry entry = new PolicyEntry();
entry.setQueue(">");
entry.setMemoryLimit(1024);
policyMap.setPolicyEntries(Arrays.asList(entry));
brokerService.setDestinationPolicy(policyMap);
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
verifyQueueLimit("Before", 1024);
//Reapply new limit
entry.setMemoryLimit(4194304);
javaConfigBroker.modifyPolicyEntry(entry);
TimeUnit.SECONDS.sleep(SLEEP);
verifyQueueLimit("After", 4194304);
// change to existing dest
verifyQueueLimit("Before", 4194304);
}
@Test
public void testAddNdMod() throws Exception {
BrokerService brokerService = new BrokerService();
PolicyMap policyMap = new PolicyMap();
PolicyEntry entry = new PolicyEntry();
entry.setQueue(">");
entry.setMemoryLimit(1024);
policyMap.setPolicyEntries(Arrays.asList(entry));
brokerService.setDestinationPolicy(policyMap);
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
verifyQueueLimit("Before", 1024);
verifyTopicLimit("Before", brokerService.getSystemUsage().getMemoryUsage().getLimit());
entry.setMemoryLimit(2048);
javaConfigBroker.modifyPolicyEntry(entry);
TimeUnit.SECONDS.sleep(SLEEP);
PolicyEntry newEntry = new PolicyEntry();
newEntry.setTopic(">");
newEntry.setMemoryLimit(2048);
javaConfigBroker.addNewPolicyEntry(newEntry);
TimeUnit.SECONDS.sleep(SLEEP);
verifyTopicLimit("After", 2048l);
verifyQueueLimit("After", 2048);
// change to existing dest
verifyTopicLimit("Before", 2048l);
}
private void verifyQueueLimit(String dest, int memoryLimit) throws Exception {
ActiveMQConnection connection = (ActiveMQConnection) new ActiveMQConnectionFactory("vm://localhost").createConnection();
try {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createConsumer(session.createQueue(dest));
assertEquals(memoryLimit, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).getMemoryUsage().getLimit());
} finally {
connection.close();
}
}
private void verifyTopicLimit(String dest, long memoryLimit) throws Exception {
ActiveMQConnection connection = (ActiveMQConnection) new ActiveMQConnectionFactory("vm://localhost").createConnection();
try {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createConsumer(session.createTopic(dest));
assertEquals(memoryLimit, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQTopic(dest)).getMemoryUsage().getLimit());
} finally {
connection.close();
}
}
}

View File

@ -0,0 +1,376 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.java;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.AbstractVirtualDestTest;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.virtual.CompositeQueue;
import org.apache.activemq.broker.region.virtual.FilteredDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
import org.apache.activemq.util.Wait;
import org.junit.Test;
public class JavaVirtualDestTest extends AbstractVirtualDestTest {
public static final int SLEEP = 2; // seconds
private JavaRuntimeConfigurationBroker javaConfigBroker;
public void startBroker(BrokerService brokerService) throws Exception {
this.brokerService = brokerService;
brokerService.setPlugins(new BrokerPlugin[]{new JavaRuntimeConfigurationPlugin()});
brokerService.setPersistent(false);
brokerService.start();
brokerService.waitUntilStarted();
javaConfigBroker =
(JavaRuntimeConfigurationBroker) brokerService.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class);
}
@Test
public void testNew() throws Exception {
startBroker(new BrokerService());
assertTrue("broker alive", brokerService.isStarted());
// default config has support for VirtualTopic.>
DestinationInterceptor[] interceptors = brokerService.getDestinationInterceptors();
assertEquals("one interceptor", 1, interceptors.length);
assertTrue("it is virtual topic interceptor", interceptors[0] instanceof VirtualDestinationInterceptor);
VirtualDestinationInterceptor defaultValue = (VirtualDestinationInterceptor) interceptors[0];
assertEquals("default names in place", "VirtualTopic.>",
defaultValue.getVirtualDestinations()[0].getVirtualDestination().getPhysicalName());
exerciseVirtualTopic("VirtualTopic.Default");
javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{buildVirtualTopic("A.>", false)});
TimeUnit.SECONDS.sleep(SLEEP);
assertEquals("one interceptor", 1, interceptors.length);
assertTrue("it is virtual topic interceptor", interceptors[0] instanceof VirtualDestinationInterceptor);
// update will happen on addDestination
exerciseVirtualTopic("A.Default");
VirtualDestinationInterceptor newValue = (VirtualDestinationInterceptor) interceptors[0];
assertEquals("new names in place", "A.>",
defaultValue.getVirtualDestinations()[0].getVirtualDestination().getPhysicalName());
// apply again - ensure no change
javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{buildVirtualTopic("A.>", false)});
TimeUnit.SECONDS.sleep(SLEEP);
assertSame("same instance", newValue, brokerService.getDestinationInterceptors()[0]);
}
@Test
public void testNewComposite() throws Exception {
startBroker(new BrokerService());
assertTrue("broker alive", brokerService.isStarted());
CompositeQueue queue = buildCompositeQueue("VirtualDestination.CompositeQueue",
Arrays.asList(new ActiveMQQueue("VirtualDestination.QueueConsumer"),
new ActiveMQTopic("VirtualDestination.TopicConsumer")));
javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{queue});
TimeUnit.SECONDS.sleep(SLEEP);
exerciseCompositeQueue("VirtualDestination.CompositeQueue", "VirtualDestination.QueueConsumer");
}
@Test
public void testModComposite() throws Exception {
BrokerService brokerService = new BrokerService();
CompositeQueue queue = buildCompositeQueue("VirtualDestination.CompositeQueue",
Arrays.asList(new ActiveMQQueue("VirtualDestination.QueueConsumer"),
new ActiveMQTopic("VirtualDestination.TopicConsumer")));
brokerService.setDestinationInterceptors(new DestinationInterceptor[] {
buildInterceptor(new VirtualDestination[]{queue})});
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
exerciseCompositeQueue("VirtualDestination.CompositeQueue", "VirtualDestination.QueueConsumer");
//Apply updated config
CompositeQueue newConfig = buildCompositeQueue("VirtualDestination.CompositeQueue", false,
Arrays.asList(new ActiveMQQueue("VirtualDestination.QueueConsumer"),
new ActiveMQTopic("VirtualDestination.TopicConsumer")));
javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{newConfig});
TimeUnit.SECONDS.sleep(SLEEP);
exerciseCompositeQueue("VirtualDestination.CompositeQueue", "VirtualDestination.QueueConsumer");
exerciseCompositeQueue("VirtualDestination.CompositeQueue", "VirtualDestination.CompositeQueue");
}
@Test
public void testNewNoDefaultVirtualTopicSupport() throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setUseVirtualTopics(false);
startBroker(brokerService);
TimeUnit.SECONDS.sleep(SLEEP);
assertTrue("broker alive", brokerService.isStarted());
DestinationInterceptor[] interceptors = brokerService.getDestinationInterceptors();
assertEquals("one interceptor", 0, interceptors.length);
//apply new config
javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{buildVirtualTopic("A.>", false)});
TimeUnit.SECONDS.sleep(SLEEP);
// update will happen on addDestination
exerciseVirtualTopic("A.Default");
interceptors = brokerService.getDestinationInterceptors();
assertEquals("one interceptor", 1, interceptors.length);
assertTrue("it is virtual topic interceptor", interceptors[0] instanceof VirtualDestinationInterceptor);
//apply new config again, make sure still just 1 interceptor
javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{buildVirtualTopic("A.>", false)});
TimeUnit.SECONDS.sleep(SLEEP);
// update will happen on addDestination
exerciseVirtualTopic("A.Default");
interceptors = brokerService.getDestinationInterceptors();
assertEquals("one interceptor", 1, interceptors.length);
assertTrue("it is virtual topic interceptor", interceptors[0] instanceof VirtualDestinationInterceptor);
}
@Test
public void testNewWithMirrorQueueSupport() throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setUseMirroredQueues(true);
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
TimeUnit.SECONDS.sleep(SLEEP);
assertTrue("broker alive", brokerService.isStarted());
DestinationInterceptor[] interceptors = brokerService.getDestinationInterceptors();
assertEquals("expected interceptor", 2, interceptors.length);
//apply new config
javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{buildVirtualTopic("A.>", false)});
TimeUnit.SECONDS.sleep(SLEEP);
// update will happen on addDestination
exerciseVirtualTopic("A.Default");
interceptors = brokerService.getDestinationInterceptors();
assertEquals("expected interceptor", 2, interceptors.length);
assertTrue("it is virtual topic interceptor", interceptors[0] instanceof VirtualDestinationInterceptor);
VirtualDestinationInterceptor newValue = (VirtualDestinationInterceptor) interceptors[0];
// apply again - ensure no change
javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{buildVirtualTopic("A.>", false)});
assertSame("same instance", newValue, brokerService.getDestinationInterceptors()[0]);
}
@Test
public void testRemove() throws Exception {
final BrokerService brokerService = new BrokerService();
brokerService.setDestinationInterceptors(new DestinationInterceptor[] {
buildInterceptor(new VirtualDestination[]{buildVirtualTopic("A.>", false)})});
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
DestinationInterceptor[] interceptors = brokerService.getDestinationInterceptors();
assertEquals("one interceptor", 1, interceptors.length);
assertTrue("it is virtual topic interceptor", interceptors[0] instanceof VirtualDestinationInterceptor);
exerciseVirtualTopic("A.Default");
VirtualDestinationInterceptor defaultValue = (VirtualDestinationInterceptor) interceptors[0];
assertEquals("configured names in place", "A.>",
defaultValue.getVirtualDestinations()[0].getVirtualDestination().getPhysicalName());
exerciseVirtualTopic("A.Default");
//apply empty config - this removes all virtual destinations from the interceptor
javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{});
TimeUnit.SECONDS.sleep(SLEEP);
// update will happen on addDestination
forceAddDestination("AnyDest");
assertTrue("getVirtualDestinations empty on time", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() {
return 0 == ((VirtualDestinationInterceptor)brokerService.getDestinationInterceptors()[0]).
getVirtualDestinations().length;
}
}));
// reverse the remove, add again
javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{buildVirtualTopic("A.>", false)});
TimeUnit.SECONDS.sleep(SLEEP);
// update will happen on addDestination
exerciseVirtualTopic("A.NewOne");
interceptors = brokerService.getDestinationInterceptors();
assertEquals("expected interceptor", 1, interceptors.length);
assertTrue("it is virtual topic interceptor", interceptors[0] instanceof VirtualDestinationInterceptor);
}
@Test
public void testMod() throws Exception {
final BrokerService brokerService = new BrokerService();
brokerService.setDestinationInterceptors(new DestinationInterceptor[] {
buildInterceptor(new VirtualDestination[]{buildVirtualTopic("A.>", false)})});
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
assertEquals("one interceptor", 1, brokerService.getDestinationInterceptors().length);
exerciseVirtualTopic("A.Default");
//apply new config
javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{buildVirtualTopic("B.>", false)});
TimeUnit.SECONDS.sleep(SLEEP);
exerciseVirtualTopic("B.Default");
assertEquals("still one interceptor", 1, brokerService.getDestinationInterceptors().length);
}
@Test
public void testModWithMirroredQueue() throws Exception {
final BrokerService brokerService = new BrokerService();
brokerService.setUseMirroredQueues(true);
brokerService.setDestinationInterceptors(new DestinationInterceptor[] {
buildInterceptor(new VirtualDestination[]{buildVirtualTopic("A.>", false)})});
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
TimeUnit.SECONDS.sleep(SLEEP);
assertEquals("one interceptor", 1, brokerService.getDestinationInterceptors().length);
exerciseVirtualTopic("A.Default");
//apply new config
javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{buildVirtualTopic("B.>", false)});
TimeUnit.SECONDS.sleep(SLEEP);
exerciseVirtualTopic("B.Default");
assertEquals("still one interceptor", 1, brokerService.getDestinationInterceptors().length);
}
@Test
public void testNewFilteredComposite() throws Exception {
final BrokerService brokerService = new BrokerService();
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
FilteredDestination filteredDestination = new FilteredDestination();
filteredDestination.setSelector("odd = 'yes'");
filteredDestination.setQueue("VirtualDestination.QueueConsumer");
CompositeQueue queue = buildCompositeQueue("VirtualDestination.FilteredCompositeQueue",
Arrays.asList(filteredDestination));
//apply new config
javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{queue});
TimeUnit.SECONDS.sleep(SLEEP);
exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "yes");
}
@Test
public void testModFilteredComposite() throws Exception {
final BrokerService brokerService = new BrokerService();
FilteredDestination filteredDestination = new FilteredDestination();
filteredDestination.setSelector("odd = 'yes'");
filteredDestination.setQueue("VirtualDestination.QueueConsumer");
CompositeQueue queue = buildCompositeQueue("VirtualDestination.FilteredCompositeQueue",
Arrays.asList(filteredDestination));
brokerService.setDestinationInterceptors(new DestinationInterceptor[] {
buildInterceptor(new VirtualDestination[]{queue})});
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "yes");
filteredDestination = new FilteredDestination();
filteredDestination.setSelector("odd = 'no'");
filteredDestination.setQueue("VirtualDestination.QueueConsumer");
queue = buildCompositeQueue("VirtualDestination.FilteredCompositeQueue",
Arrays.asList(filteredDestination));
//apply new config
javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{queue});
TimeUnit.SECONDS.sleep(SLEEP);
exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "no");
exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "no");
}
protected static CompositeQueue buildCompositeQueue(String name, Collection<?> forwardTo) {
return buildCompositeQueue(name, true, forwardTo);
}
protected static CompositeQueue buildCompositeQueue(String name, boolean forwardOnly,
Collection<?> forwardTo) {
CompositeQueue queue = new CompositeQueue();
queue.setForwardOnly(forwardOnly);
queue.setName(name);
queue.setForwardTo(forwardTo);
return queue;
}
protected static VirtualTopic buildVirtualTopic(String name, boolean selectorAware) {
VirtualTopic virtualTopic = new VirtualTopic();
virtualTopic.setSelectorAware(selectorAware);
virtualTopic.setName(name);
return virtualTopic;
}
protected static VirtualDestinationInterceptor buildInterceptor(VirtualDestination[] virtualDestinations) {
VirtualDestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor();
virtualDestinationInterceptor.setVirtualDestinations(virtualDestinations);
return virtualDestinationInterceptor;
}
}