The JavaRuntimeConfigurationBroker can now apply a subset of policy
properties retrospectively to existing destinations versus applying
all properties of the policy update.
This commit is contained in:
Christopher L. Shannon (cshannon) 2015-12-16 20:50:48 +00:00
parent b9dcb010f7
commit a253ad3c71
4 changed files with 637 additions and 56 deletions

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.broker.region.policy;
import java.util.Set;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.BaseDestination;
@ -135,19 +137,49 @@ public class PolicyEntry extends DestinationMapEntry {
}
public void update(Queue queue) {
baseUpdate(queue);
if (memoryLimit > 0) {
update(queue, null);
}
/**
* Update a queue with this policy. Only apply properties that
* match the includedProperties list. Not all properties are eligible
* to be updated.
*
* If includedProperties is null then all of the properties will be set as
* isUpdate will return true
* @param baseDestination
* @param includedProperties
*/
public void update(Queue queue, Set<String> includedProperties) {
baseUpdate(queue, includedProperties);
if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) {
queue.getMemoryUsage().setLimit(memoryLimit);
}
if (isUpdate("useConsumerPriority", includedProperties)) {
queue.setUseConsumerPriority(isUseConsumerPriority());
}
if (isUpdate("strictOrderDispatch", includedProperties)) {
queue.setStrictOrderDispatch(isStrictOrderDispatch());
}
if (isUpdate("optimizedDispatch", includedProperties)) {
queue.setOptimizedDispatch(isOptimizedDispatch());
}
if (isUpdate("lazyDispatch", includedProperties)) {
queue.setLazyDispatch(isLazyDispatch());
}
if (isUpdate("timeBeforeDispatchStarts", includedProperties)) {
queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts());
}
if (isUpdate("consumersBeforeDispatchStarts", includedProperties)) {
queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
}
if (isUpdate("allConsumersExclusiveByDefault", includedProperties)) {
queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
}
if (isUpdate("persistJMSRedelivered", includedProperties)) {
queue.setPersistJMSRedelivered(isPersistJMSRedelivered());
}
}
public void configure(Broker broker,Topic topic) {
baseConfiguration(broker,topic);
@ -167,43 +199,101 @@ public class PolicyEntry extends DestinationMapEntry {
}
public void update(Topic topic) {
baseUpdate(topic);
if (memoryLimit > 0) {
update(topic, null);
}
//If includedProperties is null then all of the properties will be set as
//isUpdate will return true
public void update(Topic topic, Set<String> includedProperties) {
baseUpdate(topic, includedProperties);
if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) {
topic.getMemoryUsage().setLimit(memoryLimit);
}
if (isUpdate("lazyDispatch", includedProperties)) {
topic.setLazyDispatch(isLazyDispatch());
}
}
// attributes that can change on the fly
public void baseUpdate(BaseDestination destination) {
baseUpdate(destination, null);
}
// attributes that can change on the fly
//If includedProperties is null then all of the properties will be set as
//isUpdate will return true
public void baseUpdate(BaseDestination destination, Set<String> includedProperties) {
if (isUpdate("producerFlowControl", includedProperties)) {
destination.setProducerFlowControl(isProducerFlowControl());
}
if (isUpdate("alwaysRetroactive", includedProperties)) {
destination.setAlwaysRetroactive(isAlwaysRetroactive());
}
if (isUpdate("blockedProducerWarningInterval", includedProperties)) {
destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval());
}
if (isUpdate("maxPageSize", includedProperties)) {
destination.setMaxPageSize(getMaxPageSize());
}
if (isUpdate("maxBrowsePageSize", includedProperties)) {
destination.setMaxBrowsePageSize(getMaxBrowsePageSize());
}
if (isUpdate("minimumMessageSize", includedProperties)) {
destination.setMinimumMessageSize((int) getMinimumMessageSize());
}
if (isUpdate("maxExpirePageSize", includedProperties)) {
destination.setMaxExpirePageSize(getMaxExpirePageSize());
}
if (isUpdate("cursorMemoryHighWaterMark", includedProperties)) {
destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
}
if (isUpdate("storeUsageHighWaterMark", includedProperties)) {
destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark());
}
if (isUpdate("gcInactiveDestinations", includedProperties)) {
destination.setGcIfInactive(isGcInactiveDestinations());
}
if (isUpdate("gcWithNetworkConsumers", includedProperties)) {
destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers());
}
if (isUpdate("inactiveTimeoutBeforeGc", includedProperties)) {
destination.setInactiveTimeoutBeforeGC(getInactiveTimeoutBeforeGC());
}
if (isUpdate("reduceMemoryFootprint", includedProperties)) {
destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
}
if (isUpdate("doOptimizeMessageStore", includedProperties)) {
destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage());
}
if (isUpdate("optimizeMessageStoreInFlightLimit", includedProperties)) {
destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit());
}
if (isUpdate("advisoryForConsumed", includedProperties)) {
destination.setAdvisoryForConsumed(isAdvisoryForConsumed());
}
if (isUpdate("advisoryForDelivery", includedProperties)) {
destination.setAdvisoryForDelivery(isAdvisoryForDelivery());
}
if (isUpdate("advisoryForDiscardingMessages", includedProperties)) {
destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages());
}
if (isUpdate("advisoryForSlowConsumers", includedProperties)) {
destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers());
}
if (isUpdate("advisoryForFastProducers", includedProperties)) {
destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers());
}
if (isUpdate("advisoryWhenFull", includedProperties)) {
destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
}
if (isUpdate("includeBodyForAdvisory", includedProperties)) {
destination.setIncludeBodyForAdvisory(isIncludeBodyForAdvisory());
}
if (isUpdate("sendAdvisoryIfNoConsumers", includedProperties)) {
destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers());
}
}
public void baseConfiguration(Broker broker, BaseDestination destination) {
baseUpdate(destination);
@ -321,6 +411,9 @@ public class PolicyEntry extends DestinationMapEntry {
}
}
private boolean isUpdate(String property, Set<String> includedProperties) {
return includedProperties == null || includedProperties.contains(property);
}
// Properties
// -------------------------------------------------------------------------
public DispatchPolicy getDispatchPolicy() {

View File

@ -17,6 +17,8 @@
package org.apache.activemq.plugin.java;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@ -133,7 +135,7 @@ public class JavaRuntimeConfigurationBroker extends AbstractRuntimeConfiguration
public void addNewPolicyEntry(PolicyEntry addition) {
PolicyMap existingMap = getBrokerService().getDestinationPolicy();
existingMap.put(addition.getDestination(), addition);
applyRetrospectively(addition);
PolicyEntryUtil.applyRetrospectively(this, addition, null);
info("added policy for: " + addition.getDestination());
}
@ -156,6 +158,10 @@ public class JavaRuntimeConfigurationBroker extends AbstractRuntimeConfiguration
modifyPolicyEntry(existing, false);
}
public void modifyPolicyEntry(PolicyEntry existing, boolean createOrReplace) {
modifyPolicyEntry(existing, createOrReplace, null);
}
/**
* This method will modify an existing policy entry that matches the destination
* set on the PolicyEntry passed in. If createOrReplace is true, a new policy
@ -165,10 +171,16 @@ public class JavaRuntimeConfigurationBroker extends AbstractRuntimeConfiguration
* If createOrReplace is false, the policy update will only be applied if
* the PolicyEntry reference already exists in the PolicyMap.
*
* includedProperties is a list of properties that will be applied retrospectively. If
* the list is null, then all properties on the policy will be reapplied to the destination.
* This allows the ability to limit which properties are applied to existing destinations.
*
* @param existing
* @param createIfAbsent
* @param includedProperties - optional list of properties to apply retrospectively
*/
public void modifyPolicyEntry(PolicyEntry existing, boolean createOrReplace) {
public void modifyPolicyEntry(PolicyEntry existing, boolean createOrReplace,
Set<String> includedProperties) {
PolicyMap existingMap = this.getBrokerService().getDestinationPolicy();
//First just look up by the destination type to see if anything matches
@ -194,7 +206,7 @@ public class JavaRuntimeConfigurationBroker extends AbstractRuntimeConfiguration
//Make sure that at this point the passed in object and the entry in
//the map are the same
if (existingEntry != null && existingEntry.equals(existing)) {
applyRetrospectively(existingEntry);
PolicyEntryUtil.applyRetrospectively(this, existingEntry, includedProperties);
this.info("updated policy for: " + existingEntry.getDestination());
} else {
throw new IllegalArgumentException("The policy can not be updated because it either does not exist or the PolicyEntry"
@ -204,10 +216,6 @@ public class JavaRuntimeConfigurationBroker extends AbstractRuntimeConfiguration
}
}
protected void applyRetrospectively(PolicyEntry updatedEntry) {
PolicyEntryUtil.applyRetrospectively(this, updatedEntry);
}
//authentication plugin
public void updateSimpleAuthenticationPlugin(final SimpleAuthenticationPlugin updatedPlugin) {
try {

View File

@ -16,8 +16,10 @@
*/
package org.apache.activemq.plugin.util;
import java.util.List;
import java.util.Set;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.Queue;
@ -67,7 +69,27 @@ public class PolicyEntryUtil {
* @param runtimeBroker
* @param updatedEntry
*/
public static void applyRetrospectively(AbstractRuntimeConfigurationBroker runtimeBroker, PolicyEntry updatedEntry) {
public static void applyRetrospectively(AbstractRuntimeConfigurationBroker runtimeBroker,
PolicyEntry updatedEntry) {
PolicyEntryUtil.applyRetrospectively(runtimeBroker, updatedEntry, null);
}
/**
*
* Utility to properly apply an updated policy entry to all existing destinations that
* match this entry. The destination will only be updated if the policy is the exact
* policy (most specific) that matches the destination.
*
* The includedProperties List is optional and is used to specify a list of properties
* to apply retrospectively to the matching destinations. This allows only certain properties
* to be reapplied. If the list is null then all properties will be applied.
*
* @param runtimeBroker
* @param updatedEntry
* @param includedProperties
*/
public static void applyRetrospectively(AbstractRuntimeConfigurationBroker runtimeBroker,
PolicyEntry updatedEntry, Set<String> includedProperties) {
RegionBroker regionBroker = (RegionBroker) runtimeBroker.getBrokerService().getRegionBroker();
for (Destination destination : regionBroker.getDestinations(updatedEntry.getDestination())) {
//Look up the policy that applies to the destination
@ -78,13 +100,15 @@ public class PolicyEntryUtil {
//currently just an identity check which is what we want
if (updatedEntry.equals(specificyPolicy)){
Destination target = destination;
if (destination instanceof DestinationFilter) {
target = ((DestinationFilter)destination).getNext();
while (target instanceof DestinationFilter) {
target = ((DestinationFilter)target).getNext();
}
//If we are providing a list of properties to set then use them
//to set eligible properties that are in the includedProperties list
if (target.getActiveMQDestination().isQueue()) {
updatedEntry.update((Queue) target);
updatedEntry.update((Queue) target, includedProperties);
} else if (target.getActiveMQDestination().isTopic()) {
updatedEntry.update((Topic) target);
updatedEntry.update((Topic) target, includedProperties);
}
runtimeBroker.debug("applied update to:" + target);
}

View File

@ -17,9 +17,12 @@
package org.apache.activemq.java;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.jms.Session;
@ -29,6 +32,9 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RuntimeConfigTestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
@ -53,6 +59,11 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
(JavaRuntimeConfigurationBroker) brokerService.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class);
}
/**
* Test modifying a policy
*
* @throws Exception
*/
@Test
public void testMod() throws Exception {
BrokerService brokerService = new BrokerService();
@ -63,10 +74,8 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
policyMap.setPolicyEntries(Arrays.asList(entry));
brokerService.setDestinationPolicy(policyMap);
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
verifyQueueLimit("Before", 1024);
//Reapply new limit
@ -80,6 +89,150 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
verifyQueueLimit("Before", 4194304);
}
/**
* Test modifying a policy but only applying a subset o
* properties retroactively to existing destinations
*
* @throws Exception
*/
@Test
public void testModFilterProperties() throws Exception {
BrokerService brokerService = new BrokerService();
PolicyMap policyMap = new PolicyMap();
PolicyEntry entry = new PolicyEntry();
entry.setQueue(">");
entry.setMemoryLimit(1024);
entry.setMaxPageSize(500);
entry.setMaxBrowsePageSize(100);
policyMap.setPolicyEntries(Arrays.asList(entry));
brokerService.setDestinationPolicy(policyMap);
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
verifyQueueLimit("Before", 1024);
assertEquals(500, getQueue("Before").getMaxPageSize());
assertEquals(100, getQueue("Before").getMaxBrowsePageSize());
//Reapply new limit, add the property to the list of included properties
entry.setMemoryLimit(4194304);
entry.setMaxPageSize(300);
entry.setMaxBrowsePageSize(200);
Set<String> properties = new HashSet<>();
properties.add("memoryLimit");
properties.add("maxPageSize");
javaConfigBroker.modifyPolicyEntry(entry, false, properties);
TimeUnit.SECONDS.sleep(SLEEP);
verifyQueueLimit("After", 4194304);
assertEquals(300, getQueue("After").getMaxPageSize());
assertEquals(200, getQueue("After").getMaxBrowsePageSize());
// change to existing dest, maxBrowsePageSize was not included
//in the property list so it should not have changed
verifyQueueLimit("Before", 4194304);
assertEquals(300, getQueue("Before").getMaxPageSize());
assertEquals(100, getQueue("Before").getMaxBrowsePageSize());
}
@Test
public void testModQueueAndTopic() throws Exception {
BrokerService brokerService = new BrokerService();
PolicyMap policyMap = new PolicyMap();
PolicyEntry qEntry = new PolicyEntry();
qEntry.setQueue(">");
qEntry.setPersistJMSRedelivered(true);
PolicyEntry tEntry = new PolicyEntry();
tEntry.setTopic(">");
tEntry.setLazyDispatch(true);
policyMap.setPolicyEntries(Arrays.asList(qEntry, tEntry));
brokerService.setDestinationPolicy(policyMap);
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
assertEquals(true, getQueue("queueBefore").isPersistJMSRedelivered());
assertEquals(true, getTopic("topicBefore").isLazyDispatch());
//Reapply new limit, add the property to the list of included properties
qEntry.setPersistJMSRedelivered(false);
tEntry.setLazyDispatch(false);
Set<String> queueProperties = new HashSet<>();
queueProperties.add("persistJMSRedelivered");
Set<String> topicProperties = new HashSet<>();
topicProperties.add("lazyDispatch");
javaConfigBroker.modifyPolicyEntry(qEntry, false, queueProperties);
javaConfigBroker.modifyPolicyEntry(tEntry, false, topicProperties);
TimeUnit.SECONDS.sleep(SLEEP);
assertEquals(false, getQueue("queueBefore").isPersistJMSRedelivered());
assertEquals(false, getTopic("topicBefore").isLazyDispatch());
assertEquals(false, getQueue("queueAfter").isPersistJMSRedelivered());
assertEquals(false, getTopic("topicAfter").isLazyDispatch());
}
/**
* Test that a property that is not part of the update methods (can't be changed after creation)
* will not be applied to existing destinations but will be applied to new destinations
*
* @throws Exception
*/
@Test
public void testModFilterExcludedProperty() throws Exception {
BrokerService brokerService = new BrokerService();
PolicyMap policyMap = new PolicyMap();
PolicyEntry entry = new PolicyEntry();
entry.setQueue(">");
entry.setEnableAudit(true);
policyMap.setPolicyEntries(Arrays.asList(entry));
brokerService.setDestinationPolicy(policyMap);
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
assertTrue(getQueue("Before").isEnableAudit());
//Reapply new limit, add the property to the list of included properties
entry.setEnableAudit(false);
Set<String> properties = new HashSet<>();
properties.add("enableAudit");
javaConfigBroker.modifyPolicyEntry(entry, false, properties);
TimeUnit.SECONDS.sleep(SLEEP);
//no change because enableAudit is excluded
assertTrue(getQueue("Before").isEnableAudit());
//A new destination should have the property changed
assertFalse(getQueue("After").isEnableAudit());
}
@Test
public void testModFilterPropertiesInvalid() throws Exception {
BrokerService brokerService = new BrokerService();
PolicyMap policyMap = new PolicyMap();
PolicyEntry entry = new PolicyEntry();
entry.setQueue(">");
entry.setMemoryLimit(1024);
policyMap.setPolicyEntries(Arrays.asList(entry));
brokerService.setDestinationPolicy(policyMap);
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
verifyQueueLimit("Before", 1024);
//use a property that doesn't exist, so nothing should be updated
entry.setMemoryLimit(4194304);
Set<String> properties = new HashSet<>();
properties.add("invalid");
javaConfigBroker.modifyPolicyEntry(entry, false, properties);
TimeUnit.SECONDS.sleep(SLEEP);
//This should be unchanged as the list of properties only
//has an invalid property so nothing will be re-applied retrospectively
verifyQueueLimit("Before", 1024);
//A new destination should be updated because the policy was changed
verifyQueueLimit("After", 4194304);
}
@Test
public void testModNewPolicyObject() throws Exception {
BrokerService brokerService = new BrokerService();
@ -114,6 +267,8 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
/**
* Test that a new policy is added and applied
* Test that a new policy will be added when setting createOrReplace to true
* when calling modifyPolicyEntry
*
* @throws Exception
*/
@ -143,6 +298,9 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
/**
* Test that a new policy is not added
* Pass a new policy to modifyPolicyEntry which should throw an exception
* because the policy didn't already exist
*
* @throws Exception
*/
@Test
@ -185,10 +343,8 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
policyMap.setPolicyEntries(Arrays.asList(entry));
brokerService.setDestinationPolicy(policyMap);
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
verifyQueueLimit("Before", 1024);
//Reapply new limit with new object that matches
@ -369,6 +525,133 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
verifyTopicLimit("test2.test.after", 4000l);
}
@Test
public void testAllQueuePropertiesApplied() throws Exception {
testAllQueuePropertiesAppliedFilter(null);
}
/**
* Make sure all properties set on the filter Set are applied
*
* @throws Exception
*/
@Test
public void testAllQueuePropertiesAppliedFilter() throws Exception {
testAllQueuePropertiesAppliedFilter(getQueuePropertySet());
}
/**
* Make sure all properties set on the filter Set are applied
*
* @throws Exception
*/
@Test
public void testAllTopicPropertiesAppliedFilter() throws Exception {
testAllTopicPropertiesAppliedFilter(getTopicPropertySet());
}
@Test
public void testAllTopicPropertiesApplied() throws Exception {
testAllTopicPropertiesAppliedFilter(null);
}
private void testAllQueuePropertiesAppliedFilter(Set<String> properties) throws Exception {
BrokerService brokerService = new BrokerService();
PolicyMap policyMap = new PolicyMap();
PolicyEntry entry = new PolicyEntry();
entry.setQueue(">");
//initial config
setAllDestPolicyProperties(entry, true, true, 10,
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
30, true, true, true, true, true, true, true, true);
setAllQueuePolicyProperties(entry, 10000, true, true, true, true, 100,
100, true, true);
policyMap.setPolicyEntries(Arrays.asList(entry));
brokerService.setDestinationPolicy(policyMap);
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
//validate config
assertAllDestPolicyProperties(getQueue("Before"), true, true, 10,
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
30, true, true, true, true, true, true, true, true);
assertAllQueuePolicyProperties(getQueue("Before"), 10000, true, true, true, true, 100,
100, true, true);
//change config
setAllDestPolicyProperties(entry, false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
300, false, false, false, false, false, false, false, false);
setAllQueuePolicyProperties(entry, 100000, false, false, false, false, 1000,
1000, false, false);
javaConfigBroker.modifyPolicyEntry(entry, false, properties);
TimeUnit.SECONDS.sleep(SLEEP);
assertAllDestPolicyProperties(getQueue("Before"), false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
300, false, false, false, false, false, false, false, false);
assertAllQueuePolicyProperties(getQueue("Before"), 100000, false, false, false, false, 1000,
1000, false, false);
//check new dest
assertAllDestPolicyProperties(getQueue("After"), false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
300, false, false, false, false, false, false, false, false);
assertAllQueuePolicyProperties(getQueue("After"), 100000, false, false, false, false, 1000,
1000, false, false);
}
private void testAllTopicPropertiesAppliedFilter(Set<String> properties) throws Exception {
BrokerService brokerService = new BrokerService();
PolicyMap policyMap = new PolicyMap();
PolicyEntry entry = new PolicyEntry();
entry.setTopic(">");
//initial config
setAllDestPolicyProperties(entry, true, true, 10,
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
30, true, true, true, true, true, true, true, true);
setAllTopicPolicyProperties(entry, 10000, true);
policyMap.setPolicyEntries(Arrays.asList(entry));
brokerService.setDestinationPolicy(policyMap);
startBroker(brokerService);
assertTrue("broker alive", brokerService.isStarted());
//validate config
assertAllDestPolicyProperties(getTopic("Before"), true, true, 10,
100, 200, 1000, 400, 40, 30, true, true, 1000, true, true,
30, true, true, true, true, true, true, true, true);
assertAllTopicPolicyProperties(getTopic("Before"), 10000, true);
//change config
setAllDestPolicyProperties(entry, false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
300, false, false, false, false, false, false, false, false);
setAllTopicPolicyProperties(entry, 100000, false);
javaConfigBroker.modifyPolicyEntry(entry, false, properties);
TimeUnit.SECONDS.sleep(SLEEP);
assertAllDestPolicyProperties(getTopic("Before"), false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
300, false, false, false, false, false, false, false, false);
assertAllTopicPolicyProperties(getTopic("Before"), 100000, false);
//check new dest
assertAllDestPolicyProperties(getTopic("After"), false, false, 100,
1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false,
300, false, false, false, false, false, false, false, false);
assertAllTopicPolicyProperties(getTopic("After"), 100000, false);
}
private void verifyQueueLimit(String dest, int memoryLimit) throws Exception {
ActiveMQConnection connection = (ActiveMQConnection) new ActiveMQConnectionFactory("vm://localhost").createConnection();
try {
@ -376,7 +659,7 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createConsumer(session.createQueue(dest));
assertEquals(memoryLimit, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).getMemoryUsage().getLimit());
assertEquals(memoryLimit, getQueue(dest).getMemoryUsage().getLimit());
} finally {
connection.close();
}
@ -389,9 +672,182 @@ public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createConsumer(session.createTopic(dest));
assertEquals(memoryLimit, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQTopic(dest)).getMemoryUsage().getLimit());
assertEquals(memoryLimit, getTopic(dest).getMemoryUsage().getLimit());
} finally {
connection.close();
}
}
private Queue getQueue(String queue) throws Exception {
return (Queue) brokerService.getRegionBroker().addDestination(
brokerService.getAdminConnectionContext(), new ActiveMQQueue(queue), false);
}
private Topic getTopic(String topic) throws Exception {
return (Topic) brokerService.getRegionBroker().addDestination(
brokerService.getAdminConnectionContext(), new ActiveMQTopic(topic), false);
}
private Set<String> getQueuePropertySet() {
Set<String> properties = new HashSet<>(getDestPropertySet());
properties.add("memoryLimit");
properties.add("useConsumerPriority");
properties.add("strictOrderDispatch");
properties.add("optimizedDispatch");
properties.add("lazyDispatch");
properties.add("timeBeforeDispatchStarts");
properties.add("consumersBeforeDispatchStarts");
properties.add("allConsumersExclusiveByDefault");
properties.add("persistJMSRedelivered");
return properties;
}
private Set<String> getTopicPropertySet() {
Set<String> properties = new HashSet<>(getDestPropertySet());
properties.add("memoryLimit");
properties.add("lazyDispatch");
return properties;
}
private Set<String> getDestPropertySet() {
Set<String> properties = new HashSet<>();
properties.add("producerFlowControl");
properties.add("alwaysRetroactive");
properties.add("blockedProducerWarningInterval");
properties.add("maxPageSize");
properties.add("maxBrowsePageSize");
properties.add("minimumMessageSize");
properties.add("maxExpirePageSize");
properties.add("cursorMemoryHighWaterMark");
properties.add("storeUsageHighWaterMark");
properties.add("gcInactiveDestinations");
properties.add("gcWithNetworkConsumers");
properties.add("inactiveTimeoutBeforeGC");
properties.add("reduceMemoryFootprint");
properties.add("doOptimizeMessageStore");
properties.add("optimizeMessageStoreInFlightLimit");
properties.add("advisoryForConsumed");
properties.add("advisoryForDelivery");
properties.add("advisoryForDiscardingMessages");
properties.add("advisoryForSlowConsumers");
properties.add("advisoryForFastProducers");
properties.add("advisoryWhenFull");
properties.add("includeBodyForAdvisory");
properties.add("sendAdvisoryIfNoConsumers");
return properties;
}
private void setAllQueuePolicyProperties(PolicyEntry entry, long memoryLimit, boolean useConsumerPriority,
boolean strictOrderDispatch, boolean optimizedDispatch, boolean lazyDispatch,
int timeBeforeDispatchStarts, int consumersBeforeDispatchStarts, boolean allConsumersExclusiveByDefault,
boolean persistJMSRedelivered) {
entry.setMemoryLimit(memoryLimit);
entry.setUseConsumerPriority(useConsumerPriority);
entry.setStrictOrderDispatch(strictOrderDispatch);
entry.setOptimizedDispatch(optimizedDispatch);
entry.setLazyDispatch(lazyDispatch);
entry.setTimeBeforeDispatchStarts(timeBeforeDispatchStarts);
entry.setConsumersBeforeDispatchStarts(consumersBeforeDispatchStarts);
entry.setAllConsumersExclusiveByDefault(allConsumersExclusiveByDefault);
entry.setPersistJMSRedelivered(persistJMSRedelivered);
}
private void setAllTopicPolicyProperties(PolicyEntry entry, long memoryLimit, boolean lazyDispatch) {
entry.setMemoryLimit(memoryLimit);
entry.setLazyDispatch(lazyDispatch);
}
private void setAllDestPolicyProperties(PolicyEntry entry, boolean producerFlowControl,
boolean alwaysRetroactive, long blockedProducerWarningInterval, int maxPageSize,
int maxBrowsePageSize, long minimumMessageSize, int maxExpirePageSize, int cursorMemoryHighWaterMark,
int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithNetworkConsumers,
long inactiveTimeoutBeforeGC,boolean reduceMemoryFootprint, boolean doOptimizeMessageStore,
int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, boolean advisoryForDelivery,
boolean advisoryForDiscardingMessages, boolean advisoryForSlowConsumers, boolean advisoryForFastProducers,
boolean advisoryWhenFull, boolean includeBodyForAdvisory, boolean sendAdvisoryIfNoConsumers) {
entry.setProducerFlowControl(producerFlowControl);
entry.setAlwaysRetroactive(alwaysRetroactive);
entry.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
entry.setMaxPageSize(maxPageSize);
entry.setMaxBrowsePageSize(maxBrowsePageSize);
entry.setMinimumMessageSize(minimumMessageSize);
entry.setMaxExpirePageSize(maxExpirePageSize);
entry.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
entry.setStoreUsageHighWaterMark(storeUsageHighWaterMark);
entry.setGcInactiveDestinations(gcInactiveDestinations);
entry.setGcWithNetworkConsumers(gcWithNetworkConsumers);
entry.setInactiveTimeoutBeforeGC(inactiveTimeoutBeforeGC);
entry.setReduceMemoryFootprint(reduceMemoryFootprint);
entry.setDoOptimzeMessageStorage(doOptimizeMessageStore);
entry.setOptimizeMessageStoreInFlightLimit(optimizeMessageStoreInFlightLimit);
entry.setAdvisoryForConsumed(advisoryForConsumed);
entry.setAdvisoryForDelivery(advisoryForDelivery);
entry.setAdvisoryForDiscardingMessages(advisoryForDiscardingMessages);
entry.setAdvisoryForSlowConsumers(advisoryForSlowConsumers);
entry.setAdvisoryForFastProducers(advisoryForFastProducers);
entry.setAdvisoryWhenFull(advisoryWhenFull);
entry.setIncludeBodyForAdvisory(includeBodyForAdvisory);
entry.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
}
private void assertAllQueuePolicyProperties(Queue queue, long memoryLimit, boolean useConsumerPriority,
boolean strictOrderDispatch, boolean optimizedDispatch, boolean lazyDispatch,
int timeBeforeDispatchStarts, int consumersBeforeDispatchStarts, boolean allConsumersExclusiveByDefault,
boolean persistJMSRedelivered) {
assertEquals(memoryLimit, queue.getMemoryUsage().getLimit());
assertEquals(useConsumerPriority, queue.isUseConsumerPriority());
assertEquals(strictOrderDispatch, queue.isStrictOrderDispatch());
assertEquals(optimizedDispatch, queue.isOptimizedDispatch());
assertEquals(lazyDispatch, queue.isLazyDispatch());
assertEquals(timeBeforeDispatchStarts, queue.getTimeBeforeDispatchStarts());
assertEquals(consumersBeforeDispatchStarts, queue.getConsumersBeforeDispatchStarts());
assertEquals(allConsumersExclusiveByDefault, queue.isAllConsumersExclusiveByDefault());
assertEquals(persistJMSRedelivered, queue.isPersistJMSRedelivered());
}
private void assertAllTopicPolicyProperties(Topic topic, long memoryLimit, boolean lazyDispatch) {
assertEquals(memoryLimit, topic.getMemoryUsage().getLimit());
assertEquals(lazyDispatch, topic.isLazyDispatch());
}
private void assertAllDestPolicyProperties(BaseDestination dest, boolean producerFlowControl,
boolean alwaysRetroactive, long blockedProducerWarningInterval, int maxPageSize,
int maxBrowsePageSize, long minimumMessageSize, int maxExpirePageSize, int cursorMemoryHighWaterMark,
int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithNetworkConsumers,
long inactiveTimeoutBeforeGC,boolean reduceMemoryFootprint, boolean doOptimizeMessageStore,
int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, boolean advisoryForDelivery,
boolean advisoryForDiscardingMessages, boolean advisoryForSlowConsumers, boolean advisoryForFastProducers,
boolean advisoryWhenFull, boolean includeBodyForAdvisory, boolean sendAdvisoryIfNoConsumers) {
assertEquals(producerFlowControl, dest.isProducerFlowControl());
assertEquals(alwaysRetroactive, dest.isAlwaysRetroactive());
assertEquals(blockedProducerWarningInterval, dest.getBlockedProducerWarningInterval());
assertEquals(maxPageSize, dest.getMaxPageSize());
assertEquals(maxBrowsePageSize, dest.getMaxBrowsePageSize());
assertEquals(minimumMessageSize, dest.getMinimumMessageSize());
assertEquals(maxExpirePageSize, dest.getMaxExpirePageSize());
assertEquals(cursorMemoryHighWaterMark, dest.getCursorMemoryHighWaterMark());
assertEquals(storeUsageHighWaterMark, dest.getStoreUsageHighWaterMark());
assertEquals(gcInactiveDestinations, dest.isGcIfInactive());
assertEquals(gcWithNetworkConsumers, dest.isGcWithNetworkConsumers());
assertEquals(inactiveTimeoutBeforeGC, dest.getInactiveTimeoutBeforeGC());
assertEquals(reduceMemoryFootprint, dest.isReduceMemoryFootprint());
assertEquals(doOptimizeMessageStore, dest.isDoOptimzeMessageStorage());
assertEquals(optimizeMessageStoreInFlightLimit, dest.getOptimizeMessageStoreInFlightLimit());
assertEquals(advisoryForConsumed, dest.isAdvisoryForConsumed());
assertEquals(advisoryForDelivery, dest.isAdvisoryForDelivery());
assertEquals(advisoryForDiscardingMessages, dest.isAdvisoryForDiscardingMessages());
assertEquals(advisoryForSlowConsumers, dest.isAdvisoryForSlowConsumers());
assertEquals(advisoryForFastProducers, dest.isAdvisoryForFastProducers());
assertEquals(advisoryWhenFull, dest.isAdvisoryWhenFull());
assertEquals(includeBodyForAdvisory, dest.isIncludeBodyForAdvisory());
assertEquals(sendAdvisoryIfNoConsumers, dest.isSendAdvisoryIfNoConsumers());
}
}