From 9e7fae0d83c584f98e99024ba6d20e53f14b81f7 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Mon, 14 Dec 2015 18:51:11 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6088 The runtime plugins will now find the exact policy to update which means that a destination can match more than one policy and the policy can still be updated at runtime. The java runtime broker also supports the ability to replace or add a policy entry based on a flag on a new method call. --- .../AbstractRuntimeConfigurationBroker.java | 6 +- .../activemq/plugin/PolicyEntryProcessor.java | 33 +-- .../java/JavaRuntimeConfigurationBroker.java | 86 ++++-- .../activemq/plugin/util/PolicyEntryUtil.java | 93 +++++++ .../org/apache/activemq/PolicyEntryTest.java | 38 +++ .../activemq/java/JavaPolicyEntryTest.java | 256 ++++++++++++++++++ .../policyEntryTest-policy-ml-child-mod.xml | 37 +++ .../policyEntryTest-policy-ml-parent-mod.xml | 37 +++ .../policyEntryTest-policy-ml-parent.xml | 37 +++ 9 files changed, 573 insertions(+), 50 deletions(-) create mode 100644 activemq-runtime-config/src/main/java/org/apache/activemq/plugin/util/PolicyEntryUtil.java create mode 100644 activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-ml-child-mod.xml create mode 100644 activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-ml-parent-mod.xml create mode 100644 activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-ml-parent.xml diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AbstractRuntimeConfigurationBroker.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AbstractRuntimeConfigurationBroker.java index a430339b08..c672579b33 100644 --- a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AbstractRuntimeConfigurationBroker.java +++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AbstractRuntimeConfigurationBroker.java @@ -143,11 +143,11 @@ public class AbstractRuntimeConfigurationBroker extends BrokerFilter { } } - protected void debug(String s) { + public void debug(String s) { LOG.debug(s); } - protected void info(String s) { + public void info(String s) { LOG.info(filterPasswords(s)); if (infoString != null) { infoString += s; @@ -155,7 +155,7 @@ public class AbstractRuntimeConfigurationBroker extends BrokerFilter { } } - protected void info(String s, Throwable t) { + public void info(String s, Throwable t) { LOG.info(filterPasswords(s), t); if (infoString != null) { infoString += s; diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/PolicyEntryProcessor.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/PolicyEntryProcessor.java index 4d0dc421b9..2bf7535197 100644 --- a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/PolicyEntryProcessor.java +++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/PolicyEntryProcessor.java @@ -16,15 +16,13 @@ */ package org.apache.activemq.plugin; -import org.apache.activemq.broker.region.*; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; - -import java.util.Set; +import org.apache.activemq.plugin.util.PolicyEntryUtil; public class PolicyEntryProcessor extends DefaultConfigurationProcessor { - public PolicyEntryProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) { + public PolicyEntryProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) { super(plugin, configurationClass); } @@ -39,33 +37,22 @@ public class PolicyEntryProcessor extends DefaultConfigurationProcessor { @Override public void modify(Object existing, Object candidate) { - PolicyMap existingMap = plugin.getBrokerService().getDestinationPolicy(); - PolicyEntry updatedEntry = fromDto(candidate, new PolicyEntry()); - Set existingEntry = existingMap.get(updatedEntry.getDestination()); - if (existingEntry.size() == 1) { - updatedEntry = fromDto(candidate, (PolicyEntry) existingEntry.iterator().next()); + //Look up an existing entry that matches the candidate + //First just look up by the destination type to see if anything matches + PolicyEntry existingEntry = PolicyEntryUtil.findEntryByDestination(plugin, updatedEntry); + if (existingEntry != null) { + //if found, update the policy and apply the updates to existing destinations + updatedEntry = fromDto(candidate, existingEntry); applyRetrospectively(updatedEntry); plugin.info("updated policy for: " + updatedEntry.getDestination()); } else { - plugin.info("cannot modify policy matching multiple destinations: " + existingEntry + ", destination:" + updatedEntry.getDestination()); + plugin.info("cannot find policy entry candidate to update: " + updatedEntry + ", destination:" + updatedEntry.getDestination()); } } protected void applyRetrospectively(PolicyEntry updatedEntry) { - RegionBroker regionBroker = (RegionBroker) plugin.getBrokerService().getRegionBroker(); - for (Destination destination : regionBroker.getDestinations(updatedEntry.getDestination())) { - Destination target = destination; - if (destination instanceof DestinationFilter) { - target = ((DestinationFilter)destination).getNext(); - } - if (target.getActiveMQDestination().isQueue()) { - updatedEntry.update((Queue) target); - } else if (target.getActiveMQDestination().isTopic()) { - updatedEntry.update((Topic) target); - } - plugin.debug("applied update to:" + target); - } + PolicyEntryUtil.applyRetrospectively(plugin, updatedEntry); } } diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationBroker.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationBroker.java index ca4fd50391..8e4ed2988e 100644 --- a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationBroker.java +++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationBroker.java @@ -17,14 +17,8 @@ package org.apache.activemq.plugin.java; import java.util.Arrays; -import java.util.Set; import org.apache.activemq.broker.Broker; -import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.broker.region.DestinationFilter; -import org.apache.activemq.broker.region.Queue; -import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.virtual.VirtualDestination; @@ -32,6 +26,7 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.network.DiscoveryNetworkConnector; import org.apache.activemq.plugin.AbstractRuntimeConfigurationBroker; import org.apache.activemq.plugin.UpdateVirtualDestinationsTask; +import org.apache.activemq.plugin.util.PolicyEntryUtil; import org.apache.activemq.security.AuthorizationBroker; import org.apache.activemq.security.AuthorizationMap; import org.apache.activemq.security.SimpleAuthenticationBroker; @@ -142,32 +137,75 @@ public class JavaRuntimeConfigurationBroker extends AbstractRuntimeConfiguration info("added policy for: " + addition.getDestination()); } + + /** + * This method will modify an existing policy entry that matches the destination + * set on the PolicyEntry passed in. + * + * The PolicyEntry reference must already be in the PolicyMap or it won't be updated. + * To modify the entry the best way is to look up the existing PolicyEntry from the + * PolicyMap, make changes to it, and pass it to this method to apply. + * + * To create or replace an existing entry (if the destination matches), see + * {@link #modifyPolicyEntry(PolicyEntry, boolean) + * + * + * @param existing + */ public void modifyPolicyEntry(PolicyEntry existing) { + modifyPolicyEntry(existing, false); + } + + /** + * This method will modify an existing policy entry that matches the destination + * set on the PolicyEntry passed in. If createOrReplace is true, a new policy + * will be created if it doesn't exist and a policy will be replaced in the PolicyMap, + * versus modified, if it is a different reference but the destinations for the Policy match. + * + * If createOrReplace is false, the policy update will only be applied if + * the PolicyEntry reference already exists in the PolicyMap. + * + * @param existing + * @param createIfAbsent + */ + public void modifyPolicyEntry(PolicyEntry existing, boolean createOrReplace) { PolicyMap existingMap = this.getBrokerService().getDestinationPolicy(); - Set existingEntry = existingMap.get(existing.getDestination()); - if (existingEntry.size() == 1) { - applyRetrospectively(existing); - this.info("updated policy for: " + existing.getDestination()); + //First just look up by the destination type to see if anything matches + PolicyEntry existingEntry = PolicyEntryUtil.findEntryByDestination(this, existing); + + //handle createOrReplace + if (createOrReplace) { + //if not found at all, go ahead and insert the policy entry + if (existingEntry == null) { + existingMap.put(existing.getDestination(), existing); + existingEntry = existing; + //If found but the objects are different, remove the old policy entry + //and replace it with the new one + } else if (!existing.equals(existingEntry)) { + synchronized(existingMap) { + existingMap.remove(existingEntry.getDestination(), existingEntry); + existingMap.put(existing.getDestination(), existing); + } + existingEntry = existing; + } + } + + //Make sure that at this point the passed in object and the entry in + //the map are the same + if (existingEntry != null && existingEntry.equals(existing)) { + applyRetrospectively(existingEntry); + this.info("updated policy for: " + existingEntry.getDestination()); } else { - this.info("cannot modify policy matching multiple destinations: " + existingEntry + ", destination:" + existing.getDestination()); + throw new IllegalArgumentException("The policy can not be updated because it either does not exist or the PolicyEntry" + + " reference does not match an existing PolicyEntry in the PolicyMap. To replace an" + + " entry (versus modifying) or add, set createOrReplace to true. " + + existing + ", destination:" + existing.getDestination()); } } protected void applyRetrospectively(PolicyEntry updatedEntry) { - RegionBroker regionBroker = (RegionBroker) this.getBrokerService().getRegionBroker(); - for (Destination destination : regionBroker.getDestinations(updatedEntry.getDestination())) { - Destination target = destination; - if (destination instanceof DestinationFilter) { - target = ((DestinationFilter)destination).getNext(); - } - if (target.getActiveMQDestination().isQueue()) { - updatedEntry.update((Queue) target); - } else if (target.getActiveMQDestination().isTopic()) { - updatedEntry.update((Topic) target); - } - this.debug("applied update to:" + target); - } + PolicyEntryUtil.applyRetrospectively(this, updatedEntry); } //authentication plugin diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/util/PolicyEntryUtil.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/util/PolicyEntryUtil.java new file mode 100644 index 0000000000..7f38f5bd43 --- /dev/null +++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/util/PolicyEntryUtil.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.plugin.util; + +import java.util.Set; + +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DestinationFilter; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.plugin.AbstractRuntimeConfigurationBroker; + + +public class PolicyEntryUtil { + + + /** + * Find a matching PolicyEntry by looking up the Set of entries from the map and + * then comparing the destination to find the exact match. This lets us be able to + * find the correct policy entry to update even though there might be multiple that + * are returned from the get method of the PolicyMap. + * + * @param runtimeBroker + * @param entry + * @return + */ + public static PolicyEntry findEntryByDestination(AbstractRuntimeConfigurationBroker runtimeBroker, + PolicyEntry entry) { + + PolicyMap existingMap = runtimeBroker.getBrokerService().getDestinationPolicy(); + @SuppressWarnings("unchecked") + Set existingEntries = existingMap.get(entry.getDestination()); + + //First just look up by the destination type to see if anything matches + PolicyEntry existingEntry = null; + for (PolicyEntry ee: existingEntries) { + if (ee.getDestination().equals(entry.getDestination())) { + existingEntry = ee; + break; + } + } + return existingEntry; + } + + /** + * Utility to properly apply an updated policy entry to all existing destinations that + * match this entry. The destination will only be updated if the policy is the exact + * policy (most specific) that matches the destination. + * + * @param runtimeBroker + * @param updatedEntry + */ + public static void applyRetrospectively(AbstractRuntimeConfigurationBroker runtimeBroker, PolicyEntry updatedEntry) { + RegionBroker regionBroker = (RegionBroker) runtimeBroker.getBrokerService().getRegionBroker(); + for (Destination destination : regionBroker.getDestinations(updatedEntry.getDestination())) { + //Look up the policy that applies to the destination + PolicyEntry specificyPolicy = regionBroker.getDestinationPolicy().getEntryFor( + destination.getActiveMQDestination()); + + //only update the destination if it matches the specific policy being updated + //currently just an identity check which is what we want + if (updatedEntry.equals(specificyPolicy)){ + Destination target = destination; + if (destination instanceof DestinationFilter) { + target = ((DestinationFilter)destination).getNext(); + } + if (target.getActiveMQDestination().isQueue()) { + updatedEntry.update((Queue) target); + } else if (target.getActiveMQDestination().isTopic()) { + updatedEntry.update((Topic) target); + } + runtimeBroker.debug("applied update to:" + target); + } + } + } +} diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java index 0c144525f2..120a4c62a3 100644 --- a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java @@ -17,6 +17,7 @@ package org.apache.activemq; import javax.jms.Session; + import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.junit.Test; @@ -62,6 +63,43 @@ public class PolicyEntryTest extends RuntimeConfigTestSupport { verifyTopicLimit("Before", 2048l); } + @Test + public void testModParentPolicy() throws Exception { + final String brokerConfig = configurationSeed + "-policy-ml-broker"; + applyNewConfig(brokerConfig, configurationSeed + "-policy-ml-parent"); + startBroker(brokerConfig); + assertTrue("broker alive", brokerService.isStarted()); + + verifyQueueLimit("queue.test", 1024); + verifyQueueLimit("queue.child.test", 2048); + applyNewConfig(brokerConfig, configurationSeed + "-policy-ml-parent-mod", SLEEP); + verifyQueueLimit("queue.test2", 4194304); + + // change to existing dest + verifyQueueLimit("queue.test", 4194304); + //verify no change + verifyQueueLimit("queue.child.test", 2048); + } + + @Test + public void testModChildPolicy() throws Exception { + final String brokerConfig = configurationSeed + "-policy-ml-broker"; + applyNewConfig(brokerConfig, configurationSeed + "-policy-ml-parent"); + startBroker(brokerConfig); + assertTrue("broker alive", brokerService.isStarted()); + + verifyQueueLimit("queue.test", 1024); + verifyQueueLimit("queue.child.test", 2048); + applyNewConfig(brokerConfig, configurationSeed + "-policy-ml-child-mod", SLEEP); + //verify no change + verifyQueueLimit("queue.test", 1024); + + // change to existing dest + verifyQueueLimit("queue.child.test", 4194304); + //new dest change + verifyQueueLimit("queue.child.test2", 4194304); + } + private void verifyQueueLimit(String dest, int memoryLimit) throws Exception { ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(); try { diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java index 94a5496c22..accae06621 100644 --- a/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java @@ -80,6 +80,215 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport { verifyQueueLimit("Before", 4194304); } + @Test + public void testModNewPolicyObject() throws Exception { + BrokerService brokerService = new BrokerService(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry entry = new PolicyEntry(); + entry.setQueue(">"); + entry.setMemoryLimit(1024); + policyMap.setPolicyEntries(Arrays.asList(entry)); + brokerService.setDestinationPolicy(policyMap); + + + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + + verifyQueueLimit("Before", 1024); + + //Reapply new limit with new object that matches + //the same destination, so it should still apply + PolicyEntry entry2 = new PolicyEntry(); + entry2.setQueue(">"); + entry2.setMemoryLimit(4194304); + javaConfigBroker.modifyPolicyEntry(entry2, true); + TimeUnit.SECONDS.sleep(SLEEP); + + // These should change because the policy entry passed in + //matched an existing entry but was not the same reference. + //Since createOrReplace is true, we replace the entry with + //this new entry and apply + verifyQueueLimit("Before", 4194304); + verifyQueueLimit("After", 4194304); + } + + /** + * Test that a new policy is added and applied + * + * @throws Exception + */ + @Test + public void testCreate() throws Exception { + BrokerService brokerService = new BrokerService(); + PolicyMap policyMap = new PolicyMap(); + policyMap.setPolicyEntries(Arrays.asList()); + brokerService.setDestinationPolicy(policyMap); + + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + verifyQueueLimit("Before", (int)brokerService.getSystemUsage().getMemoryUsage().getLimit()); + + PolicyEntry entry = new PolicyEntry(); + entry.setQueue(">"); + entry.setMemoryLimit(1024); + + //The true flag should add the new policy + javaConfigBroker.modifyPolicyEntry(entry, true); + TimeUnit.SECONDS.sleep(SLEEP); + + //Make sure the new policy is added and applied + verifyQueueLimit("Before", 1024); + verifyQueueLimit("After", 1024); + } + + /** + * Test that a new policy is not added + * @throws Exception + */ + @Test + public void testCreateFalse() throws Exception { + BrokerService brokerService = new BrokerService(); + PolicyMap policyMap = new PolicyMap(); + policyMap.setPolicyEntries(Arrays.asList()); + brokerService.setDestinationPolicy(policyMap); + + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + verifyQueueLimit("Before", (int)brokerService.getSystemUsage().getMemoryUsage().getLimit()); + + PolicyEntry entry = new PolicyEntry(); + entry.setQueue(">"); + entry.setMemoryLimit(1024); + //The default should NOT add this policy since it won't match an existing policy to modify + boolean caughtException = false; + try { + javaConfigBroker.modifyPolicyEntry(entry); + } catch (IllegalArgumentException e) { + caughtException = true; + } + assertTrue(caughtException); + TimeUnit.SECONDS.sleep(SLEEP); + + //Make sure there was no change + verifyQueueLimit("Before", (int)brokerService.getSystemUsage().getMemoryUsage().getLimit()); + verifyQueueLimit("After", (int)brokerService.getSystemUsage().getMemoryUsage().getLimit()); + } + + + @Test + public void testModNewPolicyObjectCreateOrReplaceFalse() throws Exception { + BrokerService brokerService = new BrokerService(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry entry = new PolicyEntry(); + entry.setQueue(">"); + entry.setMemoryLimit(1024); + policyMap.setPolicyEntries(Arrays.asList(entry)); + brokerService.setDestinationPolicy(policyMap); + + + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + + verifyQueueLimit("Before", 1024); + + //Reapply new limit with new object that matches + //the same destination, but createOrReplace is false + PolicyEntry entry2 = new PolicyEntry(); + entry2.setQueue(">"); + entry2.setMemoryLimit(4194304); + boolean caughtException = false; + try { + javaConfigBroker.modifyPolicyEntry(entry2, false); + } catch (IllegalArgumentException e) { + caughtException = true; + } + assertTrue(caughtException); + TimeUnit.SECONDS.sleep(SLEEP); + + // These should not change because the policy entry passed in + //matched an existing entry but was not the same reference. + //Since createOrReplace is false, it should noo be updated + verifyQueueLimit("Before", 1024); + verifyQueueLimit("After", 1024); + } + + @Test + public void testModWithChildPolicy() throws Exception { + BrokerService brokerService = new BrokerService(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry entry = new PolicyEntry(); + entry.setQueue("queue.>"); + entry.setMemoryLimit(1024); + PolicyEntry entry2 = new PolicyEntry(); + entry2.setQueue("queue.child.>"); + entry2.setMemoryLimit(2048); + policyMap.setPolicyEntries(Arrays.asList(entry, entry2)); + brokerService.setDestinationPolicy(policyMap); + + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + + brokerService.getBroker().addDestination( + brokerService.getAdminConnectionContext(), new ActiveMQQueue("queue.test"), false); + brokerService.getBroker().addDestination( + brokerService.getAdminConnectionContext(), new ActiveMQQueue("queue.child.test"), false); + + //check destinations before policy updates + verifyQueueLimit("queue.test", 1024); + verifyQueueLimit("queue.child.test", 2048); + + //Reapply new limit to policy 2 + entry2.setMemoryLimit(4194304); + javaConfigBroker.modifyPolicyEntry(entry2); + TimeUnit.SECONDS.sleep(SLEEP); + + //verify new dest and existing are changed + verifyQueueLimit("queue.child.test", 4194304); + verifyQueueLimit("queue.child.test2", 4194304); + + //verify that destination at a higher level policy is not affected + verifyQueueLimit("queue.test", 1024); + } + + @Test + public void testModParentPolicy() throws Exception { + BrokerService brokerService = new BrokerService(); + PolicyMap policyMap = new PolicyMap(); + + PolicyEntry entry = new PolicyEntry(); + entry.setQueue("queue.>"); + entry.setMemoryLimit(1024); + PolicyEntry entry2 = new PolicyEntry(); + entry2.setQueue("queue.child.>"); + entry2.setMemoryLimit(2048); + policyMap.setPolicyEntries(Arrays.asList(entry, entry2)); + brokerService.setDestinationPolicy(policyMap); + + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + + brokerService.getBroker().addDestination( + brokerService.getAdminConnectionContext(), new ActiveMQQueue("queue.test"), false); + brokerService.getBroker().addDestination( + brokerService.getAdminConnectionContext(), new ActiveMQQueue("queue.child.test"), false); + + //check destinations before policy updates + verifyQueueLimit("queue.test", 1024); + verifyQueueLimit("queue.child.test", 2048); + + //Reapply new limit to policy + entry.setMemoryLimit(4194304); + javaConfigBroker.modifyPolicyEntry(entry); + TimeUnit.SECONDS.sleep(SLEEP); + + //verify new dest and existing are not changed + verifyQueueLimit("queue.child.test", 2048); + verifyQueueLimit("queue.child.test2", 2048); + + //verify that destination at a higher level policy is changed + verifyQueueLimit("queue.test", 4194304); + } + @Test public void testAddNdMod() throws Exception { BrokerService brokerService = new BrokerService(); @@ -113,6 +322,53 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport { verifyTopicLimit("Before", 2048l); } + @Test + public void testAddNdModWithMultiplePolicies() throws Exception { + BrokerService brokerService = new BrokerService(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry entry = new PolicyEntry(); + entry.setQueue(">"); + entry.setMemoryLimit(1024); + policyMap.setPolicyEntries(Arrays.asList(entry)); + brokerService.setDestinationPolicy(policyMap); + + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + + verifyQueueLimit("Before", 1024); + verifyTopicLimit("Before", brokerService.getSystemUsage().getMemoryUsage().getLimit()); + + entry.setMemoryLimit(2048); + javaConfigBroker.modifyPolicyEntry(entry); + TimeUnit.SECONDS.sleep(SLEEP); + + PolicyEntry newEntry = new PolicyEntry(); + newEntry.setTopic("test2.>"); + newEntry.setMemoryLimit(2048); + PolicyEntry newEntry2 = new PolicyEntry(); + newEntry2.setTopic("test2.test.>"); + newEntry2.setMemoryLimit(4000); + javaConfigBroker.addNewPolicyEntry(newEntry); + javaConfigBroker.addNewPolicyEntry(newEntry2); + TimeUnit.SECONDS.sleep(SLEEP); + + verifyTopicLimit("test2.after", 2048l); + verifyTopicLimit("test2.test.after", 4000l); + //check existing modified entry + verifyQueueLimit("After", 2048); + + // change to existing dest + PolicyEntry newEntry3 = new PolicyEntry(); + newEntry3.setTopic(">"); + newEntry3.setMemoryLimit(5000); + javaConfigBroker.addNewPolicyEntry(newEntry3); + verifyTopicLimit("Before", 5000l); + + //reverify children + verifyTopicLimit("test2.after", 2048l); + verifyTopicLimit("test2.test.after", 4000l); + } + private void verifyQueueLimit(String dest, int memoryLimit) throws Exception { ActiveMQConnection connection = (ActiveMQConnection) new ActiveMQConnectionFactory("vm://localhost").createConnection(); try { diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-ml-child-mod.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-ml-child-mod.xml new file mode 100644 index 0000000000..86b524936f --- /dev/null +++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-ml-child-mod.xml @@ -0,0 +1,37 @@ + + + + + + + + + + + + + + + + + + diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-ml-parent-mod.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-ml-parent-mod.xml new file mode 100644 index 0000000000..5a5b784e29 --- /dev/null +++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-ml-parent-mod.xml @@ -0,0 +1,37 @@ + + + + + + + + + + + + + + + + + + diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-ml-parent.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-ml-parent.xml new file mode 100644 index 0000000000..7756f4ff66 --- /dev/null +++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-ml-parent.xml @@ -0,0 +1,37 @@ + + + + + + + + + + + + + + + + + +