mirror of https://github.com/apache/activemq.git
The runtime plugins will now find the exact policy to update which means that a destination can match more than one policy and the policy can still be updated at runtime. The java runtime broker also supports the ability to replace or add a policy entry based on a flag on a new method call.
This commit is contained in:
parent
b7787bf6fb
commit
9e7fae0d83
|
@ -143,11 +143,11 @@ public class AbstractRuntimeConfigurationBroker extends BrokerFilter {
|
|||
}
|
||||
}
|
||||
|
||||
protected void debug(String s) {
|
||||
public void debug(String s) {
|
||||
LOG.debug(s);
|
||||
}
|
||||
|
||||
protected void info(String s) {
|
||||
public void info(String s) {
|
||||
LOG.info(filterPasswords(s));
|
||||
if (infoString != null) {
|
||||
infoString += s;
|
||||
|
@ -155,7 +155,7 @@ public class AbstractRuntimeConfigurationBroker extends BrokerFilter {
|
|||
}
|
||||
}
|
||||
|
||||
protected void info(String s, Throwable t) {
|
||||
public void info(String s, Throwable t) {
|
||||
LOG.info(filterPasswords(s), t);
|
||||
if (infoString != null) {
|
||||
infoString += s;
|
||||
|
|
|
@ -16,15 +16,13 @@
|
|||
*/
|
||||
package org.apache.activemq.plugin;
|
||||
|
||||
import org.apache.activemq.broker.region.*;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
|
||||
import java.util.Set;
|
||||
import org.apache.activemq.plugin.util.PolicyEntryUtil;
|
||||
|
||||
public class PolicyEntryProcessor extends DefaultConfigurationProcessor {
|
||||
|
||||
public PolicyEntryProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
|
||||
public PolicyEntryProcessor(RuntimeConfigurationBroker plugin, Class<?> configurationClass) {
|
||||
super(plugin, configurationClass);
|
||||
}
|
||||
|
||||
|
@ -39,33 +37,22 @@ public class PolicyEntryProcessor extends DefaultConfigurationProcessor {
|
|||
|
||||
@Override
|
||||
public void modify(Object existing, Object candidate) {
|
||||
PolicyMap existingMap = plugin.getBrokerService().getDestinationPolicy();
|
||||
|
||||
PolicyEntry updatedEntry = fromDto(candidate, new PolicyEntry());
|
||||
|
||||
Set existingEntry = existingMap.get(updatedEntry.getDestination());
|
||||
if (existingEntry.size() == 1) {
|
||||
updatedEntry = fromDto(candidate, (PolicyEntry) existingEntry.iterator().next());
|
||||
//Look up an existing entry that matches the candidate
|
||||
//First just look up by the destination type to see if anything matches
|
||||
PolicyEntry existingEntry = PolicyEntryUtil.findEntryByDestination(plugin, updatedEntry);
|
||||
if (existingEntry != null) {
|
||||
//if found, update the policy and apply the updates to existing destinations
|
||||
updatedEntry = fromDto(candidate, existingEntry);
|
||||
applyRetrospectively(updatedEntry);
|
||||
plugin.info("updated policy for: " + updatedEntry.getDestination());
|
||||
} else {
|
||||
plugin.info("cannot modify policy matching multiple destinations: " + existingEntry + ", destination:" + updatedEntry.getDestination());
|
||||
plugin.info("cannot find policy entry candidate to update: " + updatedEntry + ", destination:" + updatedEntry.getDestination());
|
||||
}
|
||||
}
|
||||
|
||||
protected void applyRetrospectively(PolicyEntry updatedEntry) {
|
||||
RegionBroker regionBroker = (RegionBroker) plugin.getBrokerService().getRegionBroker();
|
||||
for (Destination destination : regionBroker.getDestinations(updatedEntry.getDestination())) {
|
||||
Destination target = destination;
|
||||
if (destination instanceof DestinationFilter) {
|
||||
target = ((DestinationFilter)destination).getNext();
|
||||
}
|
||||
if (target.getActiveMQDestination().isQueue()) {
|
||||
updatedEntry.update((Queue) target);
|
||||
} else if (target.getActiveMQDestination().isTopic()) {
|
||||
updatedEntry.update((Topic) target);
|
||||
}
|
||||
plugin.debug("applied update to:" + target);
|
||||
}
|
||||
PolicyEntryUtil.applyRetrospectively(plugin, updatedEntry);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,14 +17,8 @@
|
|||
package org.apache.activemq.plugin.java;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.activemq.broker.Broker;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.DestinationFilter;
|
||||
import org.apache.activemq.broker.region.Queue;
|
||||
import org.apache.activemq.broker.region.RegionBroker;
|
||||
import org.apache.activemq.broker.region.Topic;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.broker.region.virtual.VirtualDestination;
|
||||
|
@ -32,6 +26,7 @@ import org.apache.activemq.command.ActiveMQDestination;
|
|||
import org.apache.activemq.network.DiscoveryNetworkConnector;
|
||||
import org.apache.activemq.plugin.AbstractRuntimeConfigurationBroker;
|
||||
import org.apache.activemq.plugin.UpdateVirtualDestinationsTask;
|
||||
import org.apache.activemq.plugin.util.PolicyEntryUtil;
|
||||
import org.apache.activemq.security.AuthorizationBroker;
|
||||
import org.apache.activemq.security.AuthorizationMap;
|
||||
import org.apache.activemq.security.SimpleAuthenticationBroker;
|
||||
|
@ -142,32 +137,75 @@ public class JavaRuntimeConfigurationBroker extends AbstractRuntimeConfiguration
|
|||
info("added policy for: " + addition.getDestination());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This method will modify an existing policy entry that matches the destination
|
||||
* set on the PolicyEntry passed in.
|
||||
*
|
||||
* The PolicyEntry reference must already be in the PolicyMap or it won't be updated.
|
||||
* To modify the entry the best way is to look up the existing PolicyEntry from the
|
||||
* PolicyMap, make changes to it, and pass it to this method to apply.
|
||||
*
|
||||
* To create or replace an existing entry (if the destination matches), see
|
||||
* {@link #modifyPolicyEntry(PolicyEntry, boolean)
|
||||
*
|
||||
*
|
||||
* @param existing
|
||||
*/
|
||||
public void modifyPolicyEntry(PolicyEntry existing) {
|
||||
modifyPolicyEntry(existing, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method will modify an existing policy entry that matches the destination
|
||||
* set on the PolicyEntry passed in. If createOrReplace is true, a new policy
|
||||
* will be created if it doesn't exist and a policy will be replaced in the PolicyMap,
|
||||
* versus modified, if it is a different reference but the destinations for the Policy match.
|
||||
*
|
||||
* If createOrReplace is false, the policy update will only be applied if
|
||||
* the PolicyEntry reference already exists in the PolicyMap.
|
||||
*
|
||||
* @param existing
|
||||
* @param createIfAbsent
|
||||
*/
|
||||
public void modifyPolicyEntry(PolicyEntry existing, boolean createOrReplace) {
|
||||
PolicyMap existingMap = this.getBrokerService().getDestinationPolicy();
|
||||
|
||||
Set<?> existingEntry = existingMap.get(existing.getDestination());
|
||||
if (existingEntry.size() == 1) {
|
||||
applyRetrospectively(existing);
|
||||
this.info("updated policy for: " + existing.getDestination());
|
||||
//First just look up by the destination type to see if anything matches
|
||||
PolicyEntry existingEntry = PolicyEntryUtil.findEntryByDestination(this, existing);
|
||||
|
||||
//handle createOrReplace
|
||||
if (createOrReplace) {
|
||||
//if not found at all, go ahead and insert the policy entry
|
||||
if (existingEntry == null) {
|
||||
existingMap.put(existing.getDestination(), existing);
|
||||
existingEntry = existing;
|
||||
//If found but the objects are different, remove the old policy entry
|
||||
//and replace it with the new one
|
||||
} else if (!existing.equals(existingEntry)) {
|
||||
synchronized(existingMap) {
|
||||
existingMap.remove(existingEntry.getDestination(), existingEntry);
|
||||
existingMap.put(existing.getDestination(), existing);
|
||||
}
|
||||
existingEntry = existing;
|
||||
}
|
||||
}
|
||||
|
||||
//Make sure that at this point the passed in object and the entry in
|
||||
//the map are the same
|
||||
if (existingEntry != null && existingEntry.equals(existing)) {
|
||||
applyRetrospectively(existingEntry);
|
||||
this.info("updated policy for: " + existingEntry.getDestination());
|
||||
} else {
|
||||
this.info("cannot modify policy matching multiple destinations: " + existingEntry + ", destination:" + existing.getDestination());
|
||||
throw new IllegalArgumentException("The policy can not be updated because it either does not exist or the PolicyEntry"
|
||||
+ " reference does not match an existing PolicyEntry in the PolicyMap. To replace an"
|
||||
+ " entry (versus modifying) or add, set createOrReplace to true. "
|
||||
+ existing + ", destination:" + existing.getDestination());
|
||||
}
|
||||
}
|
||||
|
||||
protected void applyRetrospectively(PolicyEntry updatedEntry) {
|
||||
RegionBroker regionBroker = (RegionBroker) this.getBrokerService().getRegionBroker();
|
||||
for (Destination destination : regionBroker.getDestinations(updatedEntry.getDestination())) {
|
||||
Destination target = destination;
|
||||
if (destination instanceof DestinationFilter) {
|
||||
target = ((DestinationFilter)destination).getNext();
|
||||
}
|
||||
if (target.getActiveMQDestination().isQueue()) {
|
||||
updatedEntry.update((Queue) target);
|
||||
} else if (target.getActiveMQDestination().isTopic()) {
|
||||
updatedEntry.update((Topic) target);
|
||||
}
|
||||
this.debug("applied update to:" + target);
|
||||
}
|
||||
PolicyEntryUtil.applyRetrospectively(this, updatedEntry);
|
||||
}
|
||||
|
||||
//authentication plugin
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.plugin.util;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.DestinationFilter;
|
||||
import org.apache.activemq.broker.region.Queue;
|
||||
import org.apache.activemq.broker.region.RegionBroker;
|
||||
import org.apache.activemq.broker.region.Topic;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.plugin.AbstractRuntimeConfigurationBroker;
|
||||
|
||||
|
||||
public class PolicyEntryUtil {
|
||||
|
||||
|
||||
/**
|
||||
* Find a matching PolicyEntry by looking up the Set of entries from the map and
|
||||
* then comparing the destination to find the exact match. This lets us be able to
|
||||
* find the correct policy entry to update even though there might be multiple that
|
||||
* are returned from the get method of the PolicyMap.
|
||||
*
|
||||
* @param runtimeBroker
|
||||
* @param entry
|
||||
* @return
|
||||
*/
|
||||
public static PolicyEntry findEntryByDestination(AbstractRuntimeConfigurationBroker runtimeBroker,
|
||||
PolicyEntry entry) {
|
||||
|
||||
PolicyMap existingMap = runtimeBroker.getBrokerService().getDestinationPolicy();
|
||||
@SuppressWarnings("unchecked")
|
||||
Set<PolicyEntry> existingEntries = existingMap.get(entry.getDestination());
|
||||
|
||||
//First just look up by the destination type to see if anything matches
|
||||
PolicyEntry existingEntry = null;
|
||||
for (PolicyEntry ee: existingEntries) {
|
||||
if (ee.getDestination().equals(entry.getDestination())) {
|
||||
existingEntry = ee;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return existingEntry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility to properly apply an updated policy entry to all existing destinations that
|
||||
* match this entry. The destination will only be updated if the policy is the exact
|
||||
* policy (most specific) that matches the destination.
|
||||
*
|
||||
* @param runtimeBroker
|
||||
* @param updatedEntry
|
||||
*/
|
||||
public static void applyRetrospectively(AbstractRuntimeConfigurationBroker runtimeBroker, PolicyEntry updatedEntry) {
|
||||
RegionBroker regionBroker = (RegionBroker) runtimeBroker.getBrokerService().getRegionBroker();
|
||||
for (Destination destination : regionBroker.getDestinations(updatedEntry.getDestination())) {
|
||||
//Look up the policy that applies to the destination
|
||||
PolicyEntry specificyPolicy = regionBroker.getDestinationPolicy().getEntryFor(
|
||||
destination.getActiveMQDestination());
|
||||
|
||||
//only update the destination if it matches the specific policy being updated
|
||||
//currently just an identity check which is what we want
|
||||
if (updatedEntry.equals(specificyPolicy)){
|
||||
Destination target = destination;
|
||||
if (destination instanceof DestinationFilter) {
|
||||
target = ((DestinationFilter)destination).getNext();
|
||||
}
|
||||
if (target.getActiveMQDestination().isQueue()) {
|
||||
updatedEntry.update((Queue) target);
|
||||
} else if (target.getActiveMQDestination().isTopic()) {
|
||||
updatedEntry.update((Topic) target);
|
||||
}
|
||||
runtimeBroker.debug("applied update to:" + target);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq;
|
||||
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.junit.Test;
|
||||
|
@ -62,6 +63,43 @@ public class PolicyEntryTest extends RuntimeConfigTestSupport {
|
|||
verifyTopicLimit("Before", 2048l);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testModParentPolicy() throws Exception {
|
||||
final String brokerConfig = configurationSeed + "-policy-ml-broker";
|
||||
applyNewConfig(brokerConfig, configurationSeed + "-policy-ml-parent");
|
||||
startBroker(brokerConfig);
|
||||
assertTrue("broker alive", brokerService.isStarted());
|
||||
|
||||
verifyQueueLimit("queue.test", 1024);
|
||||
verifyQueueLimit("queue.child.test", 2048);
|
||||
applyNewConfig(brokerConfig, configurationSeed + "-policy-ml-parent-mod", SLEEP);
|
||||
verifyQueueLimit("queue.test2", 4194304);
|
||||
|
||||
// change to existing dest
|
||||
verifyQueueLimit("queue.test", 4194304);
|
||||
//verify no change
|
||||
verifyQueueLimit("queue.child.test", 2048);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testModChildPolicy() throws Exception {
|
||||
final String brokerConfig = configurationSeed + "-policy-ml-broker";
|
||||
applyNewConfig(brokerConfig, configurationSeed + "-policy-ml-parent");
|
||||
startBroker(brokerConfig);
|
||||
assertTrue("broker alive", brokerService.isStarted());
|
||||
|
||||
verifyQueueLimit("queue.test", 1024);
|
||||
verifyQueueLimit("queue.child.test", 2048);
|
||||
applyNewConfig(brokerConfig, configurationSeed + "-policy-ml-child-mod", SLEEP);
|
||||
//verify no change
|
||||
verifyQueueLimit("queue.test", 1024);
|
||||
|
||||
// change to existing dest
|
||||
verifyQueueLimit("queue.child.test", 4194304);
|
||||
//new dest change
|
||||
verifyQueueLimit("queue.child.test2", 4194304);
|
||||
}
|
||||
|
||||
private void verifyQueueLimit(String dest, int memoryLimit) throws Exception {
|
||||
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection();
|
||||
try {
|
||||
|
|
|
@ -80,6 +80,215 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
|
|||
verifyQueueLimit("Before", 4194304);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testModNewPolicyObject() throws Exception {
|
||||
BrokerService brokerService = new BrokerService();
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
PolicyEntry entry = new PolicyEntry();
|
||||
entry.setQueue(">");
|
||||
entry.setMemoryLimit(1024);
|
||||
policyMap.setPolicyEntries(Arrays.asList(entry));
|
||||
brokerService.setDestinationPolicy(policyMap);
|
||||
|
||||
|
||||
startBroker(brokerService);
|
||||
assertTrue("broker alive", brokerService.isStarted());
|
||||
|
||||
verifyQueueLimit("Before", 1024);
|
||||
|
||||
//Reapply new limit with new object that matches
|
||||
//the same destination, so it should still apply
|
||||
PolicyEntry entry2 = new PolicyEntry();
|
||||
entry2.setQueue(">");
|
||||
entry2.setMemoryLimit(4194304);
|
||||
javaConfigBroker.modifyPolicyEntry(entry2, true);
|
||||
TimeUnit.SECONDS.sleep(SLEEP);
|
||||
|
||||
// These should change because the policy entry passed in
|
||||
//matched an existing entry but was not the same reference.
|
||||
//Since createOrReplace is true, we replace the entry with
|
||||
//this new entry and apply
|
||||
verifyQueueLimit("Before", 4194304);
|
||||
verifyQueueLimit("After", 4194304);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that a new policy is added and applied
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testCreate() throws Exception {
|
||||
BrokerService brokerService = new BrokerService();
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
policyMap.setPolicyEntries(Arrays.asList());
|
||||
brokerService.setDestinationPolicy(policyMap);
|
||||
|
||||
startBroker(brokerService);
|
||||
assertTrue("broker alive", brokerService.isStarted());
|
||||
verifyQueueLimit("Before", (int)brokerService.getSystemUsage().getMemoryUsage().getLimit());
|
||||
|
||||
PolicyEntry entry = new PolicyEntry();
|
||||
entry.setQueue(">");
|
||||
entry.setMemoryLimit(1024);
|
||||
|
||||
//The true flag should add the new policy
|
||||
javaConfigBroker.modifyPolicyEntry(entry, true);
|
||||
TimeUnit.SECONDS.sleep(SLEEP);
|
||||
|
||||
//Make sure the new policy is added and applied
|
||||
verifyQueueLimit("Before", 1024);
|
||||
verifyQueueLimit("After", 1024);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that a new policy is not added
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testCreateFalse() throws Exception {
|
||||
BrokerService brokerService = new BrokerService();
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
policyMap.setPolicyEntries(Arrays.asList());
|
||||
brokerService.setDestinationPolicy(policyMap);
|
||||
|
||||
startBroker(brokerService);
|
||||
assertTrue("broker alive", brokerService.isStarted());
|
||||
verifyQueueLimit("Before", (int)brokerService.getSystemUsage().getMemoryUsage().getLimit());
|
||||
|
||||
PolicyEntry entry = new PolicyEntry();
|
||||
entry.setQueue(">");
|
||||
entry.setMemoryLimit(1024);
|
||||
//The default should NOT add this policy since it won't match an existing policy to modify
|
||||
boolean caughtException = false;
|
||||
try {
|
||||
javaConfigBroker.modifyPolicyEntry(entry);
|
||||
} catch (IllegalArgumentException e) {
|
||||
caughtException = true;
|
||||
}
|
||||
assertTrue(caughtException);
|
||||
TimeUnit.SECONDS.sleep(SLEEP);
|
||||
|
||||
//Make sure there was no change
|
||||
verifyQueueLimit("Before", (int)brokerService.getSystemUsage().getMemoryUsage().getLimit());
|
||||
verifyQueueLimit("After", (int)brokerService.getSystemUsage().getMemoryUsage().getLimit());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testModNewPolicyObjectCreateOrReplaceFalse() throws Exception {
|
||||
BrokerService brokerService = new BrokerService();
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
PolicyEntry entry = new PolicyEntry();
|
||||
entry.setQueue(">");
|
||||
entry.setMemoryLimit(1024);
|
||||
policyMap.setPolicyEntries(Arrays.asList(entry));
|
||||
brokerService.setDestinationPolicy(policyMap);
|
||||
|
||||
|
||||
startBroker(brokerService);
|
||||
assertTrue("broker alive", brokerService.isStarted());
|
||||
|
||||
verifyQueueLimit("Before", 1024);
|
||||
|
||||
//Reapply new limit with new object that matches
|
||||
//the same destination, but createOrReplace is false
|
||||
PolicyEntry entry2 = new PolicyEntry();
|
||||
entry2.setQueue(">");
|
||||
entry2.setMemoryLimit(4194304);
|
||||
boolean caughtException = false;
|
||||
try {
|
||||
javaConfigBroker.modifyPolicyEntry(entry2, false);
|
||||
} catch (IllegalArgumentException e) {
|
||||
caughtException = true;
|
||||
}
|
||||
assertTrue(caughtException);
|
||||
TimeUnit.SECONDS.sleep(SLEEP);
|
||||
|
||||
// These should not change because the policy entry passed in
|
||||
//matched an existing entry but was not the same reference.
|
||||
//Since createOrReplace is false, it should noo be updated
|
||||
verifyQueueLimit("Before", 1024);
|
||||
verifyQueueLimit("After", 1024);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testModWithChildPolicy() throws Exception {
|
||||
BrokerService brokerService = new BrokerService();
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
PolicyEntry entry = new PolicyEntry();
|
||||
entry.setQueue("queue.>");
|
||||
entry.setMemoryLimit(1024);
|
||||
PolicyEntry entry2 = new PolicyEntry();
|
||||
entry2.setQueue("queue.child.>");
|
||||
entry2.setMemoryLimit(2048);
|
||||
policyMap.setPolicyEntries(Arrays.asList(entry, entry2));
|
||||
brokerService.setDestinationPolicy(policyMap);
|
||||
|
||||
startBroker(brokerService);
|
||||
assertTrue("broker alive", brokerService.isStarted());
|
||||
|
||||
brokerService.getBroker().addDestination(
|
||||
brokerService.getAdminConnectionContext(), new ActiveMQQueue("queue.test"), false);
|
||||
brokerService.getBroker().addDestination(
|
||||
brokerService.getAdminConnectionContext(), new ActiveMQQueue("queue.child.test"), false);
|
||||
|
||||
//check destinations before policy updates
|
||||
verifyQueueLimit("queue.test", 1024);
|
||||
verifyQueueLimit("queue.child.test", 2048);
|
||||
|
||||
//Reapply new limit to policy 2
|
||||
entry2.setMemoryLimit(4194304);
|
||||
javaConfigBroker.modifyPolicyEntry(entry2);
|
||||
TimeUnit.SECONDS.sleep(SLEEP);
|
||||
|
||||
//verify new dest and existing are changed
|
||||
verifyQueueLimit("queue.child.test", 4194304);
|
||||
verifyQueueLimit("queue.child.test2", 4194304);
|
||||
|
||||
//verify that destination at a higher level policy is not affected
|
||||
verifyQueueLimit("queue.test", 1024);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testModParentPolicy() throws Exception {
|
||||
BrokerService brokerService = new BrokerService();
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
|
||||
PolicyEntry entry = new PolicyEntry();
|
||||
entry.setQueue("queue.>");
|
||||
entry.setMemoryLimit(1024);
|
||||
PolicyEntry entry2 = new PolicyEntry();
|
||||
entry2.setQueue("queue.child.>");
|
||||
entry2.setMemoryLimit(2048);
|
||||
policyMap.setPolicyEntries(Arrays.asList(entry, entry2));
|
||||
brokerService.setDestinationPolicy(policyMap);
|
||||
|
||||
startBroker(brokerService);
|
||||
assertTrue("broker alive", brokerService.isStarted());
|
||||
|
||||
brokerService.getBroker().addDestination(
|
||||
brokerService.getAdminConnectionContext(), new ActiveMQQueue("queue.test"), false);
|
||||
brokerService.getBroker().addDestination(
|
||||
brokerService.getAdminConnectionContext(), new ActiveMQQueue("queue.child.test"), false);
|
||||
|
||||
//check destinations before policy updates
|
||||
verifyQueueLimit("queue.test", 1024);
|
||||
verifyQueueLimit("queue.child.test", 2048);
|
||||
|
||||
//Reapply new limit to policy
|
||||
entry.setMemoryLimit(4194304);
|
||||
javaConfigBroker.modifyPolicyEntry(entry);
|
||||
TimeUnit.SECONDS.sleep(SLEEP);
|
||||
|
||||
//verify new dest and existing are not changed
|
||||
verifyQueueLimit("queue.child.test", 2048);
|
||||
verifyQueueLimit("queue.child.test2", 2048);
|
||||
|
||||
//verify that destination at a higher level policy is changed
|
||||
verifyQueueLimit("queue.test", 4194304);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddNdMod() throws Exception {
|
||||
BrokerService brokerService = new BrokerService();
|
||||
|
@ -113,6 +322,53 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
|
|||
verifyTopicLimit("Before", 2048l);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddNdModWithMultiplePolicies() throws Exception {
|
||||
BrokerService brokerService = new BrokerService();
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
PolicyEntry entry = new PolicyEntry();
|
||||
entry.setQueue(">");
|
||||
entry.setMemoryLimit(1024);
|
||||
policyMap.setPolicyEntries(Arrays.asList(entry));
|
||||
brokerService.setDestinationPolicy(policyMap);
|
||||
|
||||
startBroker(brokerService);
|
||||
assertTrue("broker alive", brokerService.isStarted());
|
||||
|
||||
verifyQueueLimit("Before", 1024);
|
||||
verifyTopicLimit("Before", brokerService.getSystemUsage().getMemoryUsage().getLimit());
|
||||
|
||||
entry.setMemoryLimit(2048);
|
||||
javaConfigBroker.modifyPolicyEntry(entry);
|
||||
TimeUnit.SECONDS.sleep(SLEEP);
|
||||
|
||||
PolicyEntry newEntry = new PolicyEntry();
|
||||
newEntry.setTopic("test2.>");
|
||||
newEntry.setMemoryLimit(2048);
|
||||
PolicyEntry newEntry2 = new PolicyEntry();
|
||||
newEntry2.setTopic("test2.test.>");
|
||||
newEntry2.setMemoryLimit(4000);
|
||||
javaConfigBroker.addNewPolicyEntry(newEntry);
|
||||
javaConfigBroker.addNewPolicyEntry(newEntry2);
|
||||
TimeUnit.SECONDS.sleep(SLEEP);
|
||||
|
||||
verifyTopicLimit("test2.after", 2048l);
|
||||
verifyTopicLimit("test2.test.after", 4000l);
|
||||
//check existing modified entry
|
||||
verifyQueueLimit("After", 2048);
|
||||
|
||||
// change to existing dest
|
||||
PolicyEntry newEntry3 = new PolicyEntry();
|
||||
newEntry3.setTopic(">");
|
||||
newEntry3.setMemoryLimit(5000);
|
||||
javaConfigBroker.addNewPolicyEntry(newEntry3);
|
||||
verifyTopicLimit("Before", 5000l);
|
||||
|
||||
//reverify children
|
||||
verifyTopicLimit("test2.after", 2048l);
|
||||
verifyTopicLimit("test2.test.after", 4000l);
|
||||
}
|
||||
|
||||
private void verifyQueueLimit(String dest, int memoryLimit) throws Exception {
|
||||
ActiveMQConnection connection = (ActiveMQConnection) new ActiveMQConnectionFactory("vm://localhost").createConnection();
|
||||
try {
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<beans
|
||||
xmlns="http://www.springframework.org/schema/beans"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
|
||||
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
|
||||
|
||||
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
|
||||
<plugins>
|
||||
<runtimeConfigurationPlugin checkPeriod="1000" />
|
||||
</plugins>
|
||||
<destinationPolicy>
|
||||
<policyMap>
|
||||
<policyEntries>
|
||||
<policyEntry queue="queue.>" memoryLimit="1024b"/>
|
||||
<policyEntry queue="queue.child.>" memoryLimit="4mb"/>
|
||||
</policyEntries>
|
||||
</policyMap>
|
||||
</destinationPolicy>
|
||||
</broker>
|
||||
</beans>
|
|
@ -0,0 +1,37 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<beans
|
||||
xmlns="http://www.springframework.org/schema/beans"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
|
||||
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
|
||||
|
||||
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
|
||||
<plugins>
|
||||
<runtimeConfigurationPlugin checkPeriod="1000" />
|
||||
</plugins>
|
||||
<destinationPolicy>
|
||||
<policyMap>
|
||||
<policyEntries>
|
||||
<policyEntry queue="queue.>" memoryLimit="4mb"/>
|
||||
<policyEntry queue="queue.child.>" memoryLimit="2048b"/>
|
||||
</policyEntries>
|
||||
</policyMap>
|
||||
</destinationPolicy>
|
||||
</broker>
|
||||
</beans>
|
|
@ -0,0 +1,37 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<beans
|
||||
xmlns="http://www.springframework.org/schema/beans"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
|
||||
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
|
||||
|
||||
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
|
||||
<plugins>
|
||||
<runtimeConfigurationPlugin checkPeriod="1000" />
|
||||
</plugins>
|
||||
<destinationPolicy>
|
||||
<policyMap>
|
||||
<policyEntries>
|
||||
<policyEntry queue="queue.>" memoryLimit="1024b"/>
|
||||
<policyEntry queue="queue.child.>" memoryLimit="2048b"/>
|
||||
</policyEntries>
|
||||
</policyMap>
|
||||
</destinationPolicy>
|
||||
</broker>
|
||||
</beans>
|
Loading…
Reference in New Issue