diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 97d9155dad..01343a298a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.broker.region.policy; +import java.util.Set; + import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.region.BaseDestination; @@ -135,18 +137,48 @@ public class PolicyEntry extends DestinationMapEntry { } public void update(Queue queue) { - baseUpdate(queue); - if (memoryLimit > 0) { + update(queue, null); + } + + /** + * Update a queue with this policy. Only apply properties that + * match the includedProperties list. Not all properties are eligible + * to be updated. + * + * If includedProperties is null then all of the properties will be set as + * isUpdate will return true + * @param baseDestination + * @param includedProperties + */ + public void update(Queue queue, Set includedProperties) { + baseUpdate(queue, includedProperties); + if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) { queue.getMemoryUsage().setLimit(memoryLimit); } - queue.setUseConsumerPriority(isUseConsumerPriority()); - queue.setStrictOrderDispatch(isStrictOrderDispatch()); - queue.setOptimizedDispatch(isOptimizedDispatch()); - queue.setLazyDispatch(isLazyDispatch()); - queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts()); - queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts()); - queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault()); - queue.setPersistJMSRedelivered(isPersistJMSRedelivered()); + if (isUpdate("useConsumerPriority", includedProperties)) { + queue.setUseConsumerPriority(isUseConsumerPriority()); + } + if (isUpdate("strictOrderDispatch", includedProperties)) { + queue.setStrictOrderDispatch(isStrictOrderDispatch()); + } + if (isUpdate("optimizedDispatch", includedProperties)) { + queue.setOptimizedDispatch(isOptimizedDispatch()); + } + if (isUpdate("lazyDispatch", includedProperties)) { + queue.setLazyDispatch(isLazyDispatch()); + } + if (isUpdate("timeBeforeDispatchStarts", includedProperties)) { + queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts()); + } + if (isUpdate("consumersBeforeDispatchStarts", includedProperties)) { + queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts()); + } + if (isUpdate("allConsumersExclusiveByDefault", includedProperties)) { + queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault()); + } + if (isUpdate("persistJMSRedelivered", includedProperties)) { + queue.setPersistJMSRedelivered(isPersistJMSRedelivered()); + } } public void configure(Broker broker,Topic topic) { @@ -167,42 +199,100 @@ public class PolicyEntry extends DestinationMapEntry { } public void update(Topic topic) { - baseUpdate(topic); - if (memoryLimit > 0) { + update(topic, null); + } + + //If includedProperties is null then all of the properties will be set as + //isUpdate will return true + public void update(Topic topic, Set includedProperties) { + baseUpdate(topic, includedProperties); + if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) { topic.getMemoryUsage().setLimit(memoryLimit); } - topic.setLazyDispatch(isLazyDispatch()); + if (isUpdate("lazyDispatch", includedProperties)) { + topic.setLazyDispatch(isLazyDispatch()); + } } // attributes that can change on the fly public void baseUpdate(BaseDestination destination) { - destination.setProducerFlowControl(isProducerFlowControl()); - destination.setAlwaysRetroactive(isAlwaysRetroactive()); - destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval()); + baseUpdate(destination, null); + } - destination.setMaxPageSize(getMaxPageSize()); - destination.setMaxBrowsePageSize(getMaxBrowsePageSize()); + // attributes that can change on the fly + //If includedProperties is null then all of the properties will be set as + //isUpdate will return true + public void baseUpdate(BaseDestination destination, Set includedProperties) { + if (isUpdate("producerFlowControl", includedProperties)) { + destination.setProducerFlowControl(isProducerFlowControl()); + } + if (isUpdate("alwaysRetroactive", includedProperties)) { + destination.setAlwaysRetroactive(isAlwaysRetroactive()); + } + if (isUpdate("blockedProducerWarningInterval", includedProperties)) { + destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval()); + } + if (isUpdate("maxPageSize", includedProperties)) { + destination.setMaxPageSize(getMaxPageSize()); + } + if (isUpdate("maxBrowsePageSize", includedProperties)) { + destination.setMaxBrowsePageSize(getMaxBrowsePageSize()); + } - destination.setMinimumMessageSize((int) getMinimumMessageSize()); - destination.setMaxExpirePageSize(getMaxExpirePageSize()); - destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); - destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark()); - - destination.setGcIfInactive(isGcInactiveDestinations()); - destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers()); - destination.setInactiveTimeoutBeforeGC(getInactiveTimeoutBeforeGC()); - destination.setReduceMemoryFootprint(isReduceMemoryFootprint()); - destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage()); - destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit()); - - destination.setAdvisoryForConsumed(isAdvisoryForConsumed()); - destination.setAdvisoryForDelivery(isAdvisoryForDelivery()); - destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages()); - destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers()); - destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers()); - destination.setAdvisoryWhenFull(isAdvisoryWhenFull()); - destination.setIncludeBodyForAdvisory(isIncludeBodyForAdvisory()); - destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers()); + if (isUpdate("minimumMessageSize", includedProperties)) { + destination.setMinimumMessageSize((int) getMinimumMessageSize()); + } + if (isUpdate("maxExpirePageSize", includedProperties)) { + destination.setMaxExpirePageSize(getMaxExpirePageSize()); + } + if (isUpdate("cursorMemoryHighWaterMark", includedProperties)) { + destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); + } + if (isUpdate("storeUsageHighWaterMark", includedProperties)) { + destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark()); + } + if (isUpdate("gcInactiveDestinations", includedProperties)) { + destination.setGcIfInactive(isGcInactiveDestinations()); + } + if (isUpdate("gcWithNetworkConsumers", includedProperties)) { + destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers()); + } + if (isUpdate("inactiveTimeoutBeforeGc", includedProperties)) { + destination.setInactiveTimeoutBeforeGC(getInactiveTimeoutBeforeGC()); + } + if (isUpdate("reduceMemoryFootprint", includedProperties)) { + destination.setReduceMemoryFootprint(isReduceMemoryFootprint()); + } + if (isUpdate("doOptimizeMessageStore", includedProperties)) { + destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage()); + } + if (isUpdate("optimizeMessageStoreInFlightLimit", includedProperties)) { + destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit()); + } + if (isUpdate("advisoryForConsumed", includedProperties)) { + destination.setAdvisoryForConsumed(isAdvisoryForConsumed()); + } + if (isUpdate("advisoryForDelivery", includedProperties)) { + destination.setAdvisoryForDelivery(isAdvisoryForDelivery()); + } + if (isUpdate("advisoryForDiscardingMessages", includedProperties)) { + destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages()); + } + if (isUpdate("advisoryForSlowConsumers", includedProperties)) { + destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers()); + } + if (isUpdate("advisoryForFastProducers", includedProperties)) { + destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers()); + } + if (isUpdate("advisoryWhenFull", includedProperties)) { + destination.setAdvisoryWhenFull(isAdvisoryWhenFull()); + } + if (isUpdate("includeBodyForAdvisory", includedProperties)) { + destination.setIncludeBodyForAdvisory(isIncludeBodyForAdvisory()); + } + if (isUpdate("sendAdvisoryIfNoConsumers", includedProperties)) { + destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers()); + } } public void baseConfiguration(Broker broker, BaseDestination destination) { @@ -321,6 +411,9 @@ public class PolicyEntry extends DestinationMapEntry { } } + private boolean isUpdate(String property, Set includedProperties) { + return includedProperties == null || includedProperties.contains(property); + } // Properties // ------------------------------------------------------------------------- public DispatchPolicy getDispatchPolicy() { diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationBroker.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationBroker.java index 8e4ed2988e..536909e09c 100644 --- a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationBroker.java +++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/java/JavaRuntimeConfigurationBroker.java @@ -17,6 +17,8 @@ package org.apache.activemq.plugin.java; import java.util.Arrays; +import java.util.List; +import java.util.Set; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.region.policy.PolicyEntry; @@ -133,7 +135,7 @@ public class JavaRuntimeConfigurationBroker extends AbstractRuntimeConfiguration public void addNewPolicyEntry(PolicyEntry addition) { PolicyMap existingMap = getBrokerService().getDestinationPolicy(); existingMap.put(addition.getDestination(), addition); - applyRetrospectively(addition); + PolicyEntryUtil.applyRetrospectively(this, addition, null); info("added policy for: " + addition.getDestination()); } @@ -156,6 +158,10 @@ public class JavaRuntimeConfigurationBroker extends AbstractRuntimeConfiguration modifyPolicyEntry(existing, false); } + public void modifyPolicyEntry(PolicyEntry existing, boolean createOrReplace) { + modifyPolicyEntry(existing, createOrReplace, null); + } + /** * This method will modify an existing policy entry that matches the destination * set on the PolicyEntry passed in. If createOrReplace is true, a new policy @@ -165,10 +171,16 @@ public class JavaRuntimeConfigurationBroker extends AbstractRuntimeConfiguration * If createOrReplace is false, the policy update will only be applied if * the PolicyEntry reference already exists in the PolicyMap. * + * includedProperties is a list of properties that will be applied retrospectively. If + * the list is null, then all properties on the policy will be reapplied to the destination. + * This allows the ability to limit which properties are applied to existing destinations. + * * @param existing * @param createIfAbsent + * @param includedProperties - optional list of properties to apply retrospectively */ - public void modifyPolicyEntry(PolicyEntry existing, boolean createOrReplace) { + public void modifyPolicyEntry(PolicyEntry existing, boolean createOrReplace, + Set includedProperties) { PolicyMap existingMap = this.getBrokerService().getDestinationPolicy(); //First just look up by the destination type to see if anything matches @@ -194,7 +206,7 @@ public class JavaRuntimeConfigurationBroker extends AbstractRuntimeConfiguration //Make sure that at this point the passed in object and the entry in //the map are the same if (existingEntry != null && existingEntry.equals(existing)) { - applyRetrospectively(existingEntry); + PolicyEntryUtil.applyRetrospectively(this, existingEntry, includedProperties); this.info("updated policy for: " + existingEntry.getDestination()); } else { throw new IllegalArgumentException("The policy can not be updated because it either does not exist or the PolicyEntry" @@ -204,10 +216,6 @@ public class JavaRuntimeConfigurationBroker extends AbstractRuntimeConfiguration } } - protected void applyRetrospectively(PolicyEntry updatedEntry) { - PolicyEntryUtil.applyRetrospectively(this, updatedEntry); - } - //authentication plugin public void updateSimpleAuthenticationPlugin(final SimpleAuthenticationPlugin updatedPlugin) { try { diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/util/PolicyEntryUtil.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/util/PolicyEntryUtil.java index 7f38f5bd43..5ac135adcc 100644 --- a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/util/PolicyEntryUtil.java +++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/util/PolicyEntryUtil.java @@ -16,8 +16,10 @@ */ package org.apache.activemq.plugin.util; +import java.util.List; import java.util.Set; +import org.apache.activemq.broker.region.BaseDestination; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationFilter; import org.apache.activemq.broker.region.Queue; @@ -67,7 +69,27 @@ public class PolicyEntryUtil { * @param runtimeBroker * @param updatedEntry */ - public static void applyRetrospectively(AbstractRuntimeConfigurationBroker runtimeBroker, PolicyEntry updatedEntry) { + public static void applyRetrospectively(AbstractRuntimeConfigurationBroker runtimeBroker, + PolicyEntry updatedEntry) { + PolicyEntryUtil.applyRetrospectively(runtimeBroker, updatedEntry, null); + } + + /** + * + * Utility to properly apply an updated policy entry to all existing destinations that + * match this entry. The destination will only be updated if the policy is the exact + * policy (most specific) that matches the destination. + * + * The includedProperties List is optional and is used to specify a list of properties + * to apply retrospectively to the matching destinations. This allows only certain properties + * to be reapplied. If the list is null then all properties will be applied. + * + * @param runtimeBroker + * @param updatedEntry + * @param includedProperties + */ + public static void applyRetrospectively(AbstractRuntimeConfigurationBroker runtimeBroker, + PolicyEntry updatedEntry, Set includedProperties) { RegionBroker regionBroker = (RegionBroker) runtimeBroker.getBrokerService().getRegionBroker(); for (Destination destination : regionBroker.getDestinations(updatedEntry.getDestination())) { //Look up the policy that applies to the destination @@ -78,13 +100,15 @@ public class PolicyEntryUtil { //currently just an identity check which is what we want if (updatedEntry.equals(specificyPolicy)){ Destination target = destination; - if (destination instanceof DestinationFilter) { - target = ((DestinationFilter)destination).getNext(); + while (target instanceof DestinationFilter) { + target = ((DestinationFilter)target).getNext(); } + //If we are providing a list of properties to set then use them + //to set eligible properties that are in the includedProperties list if (target.getActiveMQDestination().isQueue()) { - updatedEntry.update((Queue) target); + updatedEntry.update((Queue) target, includedProperties); } else if (target.getActiveMQDestination().isTopic()) { - updatedEntry.update((Topic) target); + updatedEntry.update((Topic) target, includedProperties); } runtimeBroker.debug("applied update to:" + target); } diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java index accae06621..29b655e8cd 100644 --- a/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java @@ -17,9 +17,12 @@ package org.apache.activemq.java; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.TimeUnit; import javax.jms.Session; @@ -29,6 +32,9 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RuntimeConfigTestSupport; import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.BaseDestination; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQQueue; @@ -53,6 +59,11 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport { (JavaRuntimeConfigurationBroker) brokerService.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class); } + /** + * Test modifying a policy + * + * @throws Exception + */ @Test public void testMod() throws Exception { BrokerService brokerService = new BrokerService(); @@ -63,10 +74,8 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport { policyMap.setPolicyEntries(Arrays.asList(entry)); brokerService.setDestinationPolicy(policyMap); - startBroker(brokerService); assertTrue("broker alive", brokerService.isStarted()); - verifyQueueLimit("Before", 1024); //Reapply new limit @@ -80,6 +89,150 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport { verifyQueueLimit("Before", 4194304); } + /** + * Test modifying a policy but only applying a subset o + * properties retroactively to existing destinations + * + * @throws Exception + */ + @Test + public void testModFilterProperties() throws Exception { + BrokerService brokerService = new BrokerService(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry entry = new PolicyEntry(); + entry.setQueue(">"); + entry.setMemoryLimit(1024); + entry.setMaxPageSize(500); + entry.setMaxBrowsePageSize(100); + policyMap.setPolicyEntries(Arrays.asList(entry)); + brokerService.setDestinationPolicy(policyMap); + + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + verifyQueueLimit("Before", 1024); + assertEquals(500, getQueue("Before").getMaxPageSize()); + assertEquals(100, getQueue("Before").getMaxBrowsePageSize()); + + //Reapply new limit, add the property to the list of included properties + entry.setMemoryLimit(4194304); + entry.setMaxPageSize(300); + entry.setMaxBrowsePageSize(200); + Set properties = new HashSet<>(); + properties.add("memoryLimit"); + properties.add("maxPageSize"); + javaConfigBroker.modifyPolicyEntry(entry, false, properties); + TimeUnit.SECONDS.sleep(SLEEP); + + verifyQueueLimit("After", 4194304); + assertEquals(300, getQueue("After").getMaxPageSize()); + assertEquals(200, getQueue("After").getMaxBrowsePageSize()); + + // change to existing dest, maxBrowsePageSize was not included + //in the property list so it should not have changed + verifyQueueLimit("Before", 4194304); + assertEquals(300, getQueue("Before").getMaxPageSize()); + assertEquals(100, getQueue("Before").getMaxBrowsePageSize()); + } + + @Test + public void testModQueueAndTopic() throws Exception { + BrokerService brokerService = new BrokerService(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry qEntry = new PolicyEntry(); + qEntry.setQueue(">"); + qEntry.setPersistJMSRedelivered(true); + PolicyEntry tEntry = new PolicyEntry(); + tEntry.setTopic(">"); + tEntry.setLazyDispatch(true); + policyMap.setPolicyEntries(Arrays.asList(qEntry, tEntry)); + brokerService.setDestinationPolicy(policyMap); + + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + assertEquals(true, getQueue("queueBefore").isPersistJMSRedelivered()); + assertEquals(true, getTopic("topicBefore").isLazyDispatch()); + + //Reapply new limit, add the property to the list of included properties + qEntry.setPersistJMSRedelivered(false); + tEntry.setLazyDispatch(false); + Set queueProperties = new HashSet<>(); + queueProperties.add("persistJMSRedelivered"); + Set topicProperties = new HashSet<>(); + topicProperties.add("lazyDispatch"); + javaConfigBroker.modifyPolicyEntry(qEntry, false, queueProperties); + javaConfigBroker.modifyPolicyEntry(tEntry, false, topicProperties); + TimeUnit.SECONDS.sleep(SLEEP); + + assertEquals(false, getQueue("queueBefore").isPersistJMSRedelivered()); + assertEquals(false, getTopic("topicBefore").isLazyDispatch()); + + assertEquals(false, getQueue("queueAfter").isPersistJMSRedelivered()); + assertEquals(false, getTopic("topicAfter").isLazyDispatch()); + } + + /** + * Test that a property that is not part of the update methods (can't be changed after creation) + * will not be applied to existing destinations but will be applied to new destinations + * + * @throws Exception + */ + @Test + public void testModFilterExcludedProperty() throws Exception { + BrokerService brokerService = new BrokerService(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry entry = new PolicyEntry(); + entry.setQueue(">"); + entry.setEnableAudit(true); + policyMap.setPolicyEntries(Arrays.asList(entry)); + brokerService.setDestinationPolicy(policyMap); + + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + assertTrue(getQueue("Before").isEnableAudit()); + + //Reapply new limit, add the property to the list of included properties + entry.setEnableAudit(false); + Set properties = new HashSet<>(); + properties.add("enableAudit"); + javaConfigBroker.modifyPolicyEntry(entry, false, properties); + TimeUnit.SECONDS.sleep(SLEEP); + + //no change because enableAudit is excluded + assertTrue(getQueue("Before").isEnableAudit()); + + //A new destination should have the property changed + assertFalse(getQueue("After").isEnableAudit()); + } + + @Test + public void testModFilterPropertiesInvalid() throws Exception { + BrokerService brokerService = new BrokerService(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry entry = new PolicyEntry(); + entry.setQueue(">"); + entry.setMemoryLimit(1024); + policyMap.setPolicyEntries(Arrays.asList(entry)); + brokerService.setDestinationPolicy(policyMap); + + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + verifyQueueLimit("Before", 1024); + + //use a property that doesn't exist, so nothing should be updated + entry.setMemoryLimit(4194304); + Set properties = new HashSet<>(); + properties.add("invalid"); + javaConfigBroker.modifyPolicyEntry(entry, false, properties); + TimeUnit.SECONDS.sleep(SLEEP); + + //This should be unchanged as the list of properties only + //has an invalid property so nothing will be re-applied retrospectively + verifyQueueLimit("Before", 1024); + + //A new destination should be updated because the policy was changed + verifyQueueLimit("After", 4194304); + } + @Test public void testModNewPolicyObject() throws Exception { BrokerService brokerService = new BrokerService(); @@ -114,6 +267,8 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport { /** * Test that a new policy is added and applied + * Test that a new policy will be added when setting createOrReplace to true + * when calling modifyPolicyEntry * * @throws Exception */ @@ -143,6 +298,9 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport { /** * Test that a new policy is not added + * Pass a new policy to modifyPolicyEntry which should throw an exception + * because the policy didn't already exist + * * @throws Exception */ @Test @@ -185,10 +343,8 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport { policyMap.setPolicyEntries(Arrays.asList(entry)); brokerService.setDestinationPolicy(policyMap); - startBroker(brokerService); assertTrue("broker alive", brokerService.isStarted()); - verifyQueueLimit("Before", 1024); //Reapply new limit with new object that matches @@ -369,6 +525,133 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport { verifyTopicLimit("test2.test.after", 4000l); } + @Test + public void testAllQueuePropertiesApplied() throws Exception { + testAllQueuePropertiesAppliedFilter(null); + } + + /** + * Make sure all properties set on the filter Set are applied + * + * @throws Exception + */ + @Test + public void testAllQueuePropertiesAppliedFilter() throws Exception { + testAllQueuePropertiesAppliedFilter(getQueuePropertySet()); + } + + /** + * Make sure all properties set on the filter Set are applied + * + * @throws Exception + */ + @Test + public void testAllTopicPropertiesAppliedFilter() throws Exception { + testAllTopicPropertiesAppliedFilter(getTopicPropertySet()); + } + + @Test + public void testAllTopicPropertiesApplied() throws Exception { + testAllTopicPropertiesAppliedFilter(null); + } + + private void testAllQueuePropertiesAppliedFilter(Set properties) throws Exception { + BrokerService brokerService = new BrokerService(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry entry = new PolicyEntry(); + entry.setQueue(">"); + + //initial config + setAllDestPolicyProperties(entry, true, true, 10, + 100, 200, 1000, 400, 40, 30, true, true, 1000, true, true, + 30, true, true, true, true, true, true, true, true); + setAllQueuePolicyProperties(entry, 10000, true, true, true, true, 100, + 100, true, true); + + policyMap.setPolicyEntries(Arrays.asList(entry)); + brokerService.setDestinationPolicy(policyMap); + + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + + //validate config + assertAllDestPolicyProperties(getQueue("Before"), true, true, 10, + 100, 200, 1000, 400, 40, 30, true, true, 1000, true, true, + 30, true, true, true, true, true, true, true, true); + assertAllQueuePolicyProperties(getQueue("Before"), 10000, true, true, true, true, 100, + 100, true, true); + + + //change config + setAllDestPolicyProperties(entry, false, false, 100, + 1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false, + 300, false, false, false, false, false, false, false, false); + setAllQueuePolicyProperties(entry, 100000, false, false, false, false, 1000, + 1000, false, false); + + javaConfigBroker.modifyPolicyEntry(entry, false, properties); + TimeUnit.SECONDS.sleep(SLEEP); + + assertAllDestPolicyProperties(getQueue("Before"), false, false, 100, + 1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false, + 300, false, false, false, false, false, false, false, false); + assertAllQueuePolicyProperties(getQueue("Before"), 100000, false, false, false, false, 1000, + 1000, false, false); + + //check new dest + assertAllDestPolicyProperties(getQueue("After"), false, false, 100, + 1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false, + 300, false, false, false, false, false, false, false, false); + assertAllQueuePolicyProperties(getQueue("After"), 100000, false, false, false, false, 1000, + 1000, false, false); + } + + private void testAllTopicPropertiesAppliedFilter(Set properties) throws Exception { + BrokerService brokerService = new BrokerService(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry entry = new PolicyEntry(); + entry.setTopic(">"); + + //initial config + setAllDestPolicyProperties(entry, true, true, 10, + 100, 200, 1000, 400, 40, 30, true, true, 1000, true, true, + 30, true, true, true, true, true, true, true, true); + setAllTopicPolicyProperties(entry, 10000, true); + + policyMap.setPolicyEntries(Arrays.asList(entry)); + brokerService.setDestinationPolicy(policyMap); + + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + + //validate config + assertAllDestPolicyProperties(getTopic("Before"), true, true, 10, + 100, 200, 1000, 400, 40, 30, true, true, 1000, true, true, + 30, true, true, true, true, true, true, true, true); + assertAllTopicPolicyProperties(getTopic("Before"), 10000, true); + + + //change config + setAllDestPolicyProperties(entry, false, false, 100, + 1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false, + 300, false, false, false, false, false, false, false, false); + setAllTopicPolicyProperties(entry, 100000, false); + + javaConfigBroker.modifyPolicyEntry(entry, false, properties); + TimeUnit.SECONDS.sleep(SLEEP); + + assertAllDestPolicyProperties(getTopic("Before"), false, false, 100, + 1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false, + 300, false, false, false, false, false, false, false, false); + assertAllTopicPolicyProperties(getTopic("Before"), 100000, false); + + //check new dest + assertAllDestPolicyProperties(getTopic("After"), false, false, 100, + 1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false, + 300, false, false, false, false, false, false, false, false); + assertAllTopicPolicyProperties(getTopic("After"), 100000, false); + } + private void verifyQueueLimit(String dest, int memoryLimit) throws Exception { ActiveMQConnection connection = (ActiveMQConnection) new ActiveMQConnectionFactory("vm://localhost").createConnection(); try { @@ -376,7 +659,7 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); session.createConsumer(session.createQueue(dest)); - assertEquals(memoryLimit, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).getMemoryUsage().getLimit()); + assertEquals(memoryLimit, getQueue(dest).getMemoryUsage().getLimit()); } finally { connection.close(); } @@ -389,9 +672,182 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); session.createConsumer(session.createTopic(dest)); - assertEquals(memoryLimit, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQTopic(dest)).getMemoryUsage().getLimit()); + assertEquals(memoryLimit, getTopic(dest).getMemoryUsage().getLimit()); } finally { connection.close(); } } + + private Queue getQueue(String queue) throws Exception { + return (Queue) brokerService.getRegionBroker().addDestination( + brokerService.getAdminConnectionContext(), new ActiveMQQueue(queue), false); + } + + private Topic getTopic(String topic) throws Exception { + return (Topic) brokerService.getRegionBroker().addDestination( + brokerService.getAdminConnectionContext(), new ActiveMQTopic(topic), false); + } + + private Set getQueuePropertySet() { + Set properties = new HashSet<>(getDestPropertySet()); + properties.add("memoryLimit"); + properties.add("useConsumerPriority"); + properties.add("strictOrderDispatch"); + properties.add("optimizedDispatch"); + properties.add("lazyDispatch"); + properties.add("timeBeforeDispatchStarts"); + properties.add("consumersBeforeDispatchStarts"); + properties.add("allConsumersExclusiveByDefault"); + properties.add("persistJMSRedelivered"); + return properties; + } + + private Set getTopicPropertySet() { + Set properties = new HashSet<>(getDestPropertySet()); + properties.add("memoryLimit"); + properties.add("lazyDispatch"); + return properties; + } + + private Set getDestPropertySet() { + Set properties = new HashSet<>(); + properties.add("producerFlowControl"); + properties.add("alwaysRetroactive"); + properties.add("blockedProducerWarningInterval"); + properties.add("maxPageSize"); + properties.add("maxBrowsePageSize"); + properties.add("minimumMessageSize"); + properties.add("maxExpirePageSize"); + properties.add("cursorMemoryHighWaterMark"); + properties.add("storeUsageHighWaterMark"); + properties.add("gcInactiveDestinations"); + properties.add("gcWithNetworkConsumers"); + properties.add("inactiveTimeoutBeforeGC"); + properties.add("reduceMemoryFootprint"); + properties.add("doOptimizeMessageStore"); + properties.add("optimizeMessageStoreInFlightLimit"); + properties.add("advisoryForConsumed"); + properties.add("advisoryForDelivery"); + properties.add("advisoryForDiscardingMessages"); + properties.add("advisoryForSlowConsumers"); + properties.add("advisoryForFastProducers"); + properties.add("advisoryWhenFull"); + properties.add("includeBodyForAdvisory"); + properties.add("sendAdvisoryIfNoConsumers"); + return properties; + + } + + private void setAllQueuePolicyProperties(PolicyEntry entry, long memoryLimit, boolean useConsumerPriority, + boolean strictOrderDispatch, boolean optimizedDispatch, boolean lazyDispatch, + int timeBeforeDispatchStarts, int consumersBeforeDispatchStarts, boolean allConsumersExclusiveByDefault, + boolean persistJMSRedelivered) { + + entry.setMemoryLimit(memoryLimit); + entry.setUseConsumerPriority(useConsumerPriority); + entry.setStrictOrderDispatch(strictOrderDispatch); + entry.setOptimizedDispatch(optimizedDispatch); + entry.setLazyDispatch(lazyDispatch); + entry.setTimeBeforeDispatchStarts(timeBeforeDispatchStarts); + entry.setConsumersBeforeDispatchStarts(consumersBeforeDispatchStarts); + entry.setAllConsumersExclusiveByDefault(allConsumersExclusiveByDefault); + entry.setPersistJMSRedelivered(persistJMSRedelivered); + } + + private void setAllTopicPolicyProperties(PolicyEntry entry, long memoryLimit, boolean lazyDispatch) { + entry.setMemoryLimit(memoryLimit); + entry.setLazyDispatch(lazyDispatch); + } + + private void setAllDestPolicyProperties(PolicyEntry entry, boolean producerFlowControl, + boolean alwaysRetroactive, long blockedProducerWarningInterval, int maxPageSize, + int maxBrowsePageSize, long minimumMessageSize, int maxExpirePageSize, int cursorMemoryHighWaterMark, + int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithNetworkConsumers, + long inactiveTimeoutBeforeGC,boolean reduceMemoryFootprint, boolean doOptimizeMessageStore, + int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, boolean advisoryForDelivery, + boolean advisoryForDiscardingMessages, boolean advisoryForSlowConsumers, boolean advisoryForFastProducers, + boolean advisoryWhenFull, boolean includeBodyForAdvisory, boolean sendAdvisoryIfNoConsumers) { + + entry.setProducerFlowControl(producerFlowControl); + entry.setAlwaysRetroactive(alwaysRetroactive); + entry.setBlockedProducerWarningInterval(blockedProducerWarningInterval); + entry.setMaxPageSize(maxPageSize); + entry.setMaxBrowsePageSize(maxBrowsePageSize); + entry.setMinimumMessageSize(minimumMessageSize); + entry.setMaxExpirePageSize(maxExpirePageSize); + entry.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark); + entry.setStoreUsageHighWaterMark(storeUsageHighWaterMark); + entry.setGcInactiveDestinations(gcInactiveDestinations); + entry.setGcWithNetworkConsumers(gcWithNetworkConsumers); + entry.setInactiveTimeoutBeforeGC(inactiveTimeoutBeforeGC); + entry.setReduceMemoryFootprint(reduceMemoryFootprint); + entry.setDoOptimzeMessageStorage(doOptimizeMessageStore); + entry.setOptimizeMessageStoreInFlightLimit(optimizeMessageStoreInFlightLimit); + entry.setAdvisoryForConsumed(advisoryForConsumed); + entry.setAdvisoryForDelivery(advisoryForDelivery); + entry.setAdvisoryForDiscardingMessages(advisoryForDiscardingMessages); + entry.setAdvisoryForSlowConsumers(advisoryForSlowConsumers); + entry.setAdvisoryForFastProducers(advisoryForFastProducers); + entry.setAdvisoryWhenFull(advisoryWhenFull); + entry.setIncludeBodyForAdvisory(includeBodyForAdvisory); + entry.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers); + } + + private void assertAllQueuePolicyProperties(Queue queue, long memoryLimit, boolean useConsumerPriority, + boolean strictOrderDispatch, boolean optimizedDispatch, boolean lazyDispatch, + int timeBeforeDispatchStarts, int consumersBeforeDispatchStarts, boolean allConsumersExclusiveByDefault, + boolean persistJMSRedelivered) { + + assertEquals(memoryLimit, queue.getMemoryUsage().getLimit()); + assertEquals(useConsumerPriority, queue.isUseConsumerPriority()); + assertEquals(strictOrderDispatch, queue.isStrictOrderDispatch()); + assertEquals(optimizedDispatch, queue.isOptimizedDispatch()); + assertEquals(lazyDispatch, queue.isLazyDispatch()); + assertEquals(timeBeforeDispatchStarts, queue.getTimeBeforeDispatchStarts()); + assertEquals(consumersBeforeDispatchStarts, queue.getConsumersBeforeDispatchStarts()); + assertEquals(allConsumersExclusiveByDefault, queue.isAllConsumersExclusiveByDefault()); + assertEquals(persistJMSRedelivered, queue.isPersistJMSRedelivered()); + + } + + private void assertAllTopicPolicyProperties(Topic topic, long memoryLimit, boolean lazyDispatch) { + assertEquals(memoryLimit, topic.getMemoryUsage().getLimit()); + assertEquals(lazyDispatch, topic.isLazyDispatch()); + } + + private void assertAllDestPolicyProperties(BaseDestination dest, boolean producerFlowControl, + boolean alwaysRetroactive, long blockedProducerWarningInterval, int maxPageSize, + int maxBrowsePageSize, long minimumMessageSize, int maxExpirePageSize, int cursorMemoryHighWaterMark, + int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithNetworkConsumers, + long inactiveTimeoutBeforeGC,boolean reduceMemoryFootprint, boolean doOptimizeMessageStore, + int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, boolean advisoryForDelivery, + boolean advisoryForDiscardingMessages, boolean advisoryForSlowConsumers, boolean advisoryForFastProducers, + boolean advisoryWhenFull, boolean includeBodyForAdvisory, boolean sendAdvisoryIfNoConsumers) { + + + assertEquals(producerFlowControl, dest.isProducerFlowControl()); + assertEquals(alwaysRetroactive, dest.isAlwaysRetroactive()); + assertEquals(blockedProducerWarningInterval, dest.getBlockedProducerWarningInterval()); + assertEquals(maxPageSize, dest.getMaxPageSize()); + assertEquals(maxBrowsePageSize, dest.getMaxBrowsePageSize()); + assertEquals(minimumMessageSize, dest.getMinimumMessageSize()); + assertEquals(maxExpirePageSize, dest.getMaxExpirePageSize()); + assertEquals(cursorMemoryHighWaterMark, dest.getCursorMemoryHighWaterMark()); + assertEquals(storeUsageHighWaterMark, dest.getStoreUsageHighWaterMark()); + assertEquals(gcInactiveDestinations, dest.isGcIfInactive()); + assertEquals(gcWithNetworkConsumers, dest.isGcWithNetworkConsumers()); + assertEquals(inactiveTimeoutBeforeGC, dest.getInactiveTimeoutBeforeGC()); + assertEquals(reduceMemoryFootprint, dest.isReduceMemoryFootprint()); + assertEquals(doOptimizeMessageStore, dest.isDoOptimzeMessageStorage()); + assertEquals(optimizeMessageStoreInFlightLimit, dest.getOptimizeMessageStoreInFlightLimit()); + assertEquals(advisoryForConsumed, dest.isAdvisoryForConsumed()); + assertEquals(advisoryForDelivery, dest.isAdvisoryForDelivery()); + assertEquals(advisoryForDiscardingMessages, dest.isAdvisoryForDiscardingMessages()); + assertEquals(advisoryForSlowConsumers, dest.isAdvisoryForSlowConsumers()); + assertEquals(advisoryForFastProducers, dest.isAdvisoryForFastProducers()); + assertEquals(advisoryWhenFull, dest.isAdvisoryWhenFull()); + assertEquals(includeBodyForAdvisory, dest.isIncludeBodyForAdvisory()); + assertEquals(sendAdvisoryIfNoConsumers, dest.isSendAdvisoryIfNoConsumers()); + + } }