https://issues.apache.org/jira/browse/AMQ-4682 - add support for mods and additions to destination policy - primitive attributes only

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1518009 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2013-08-27 22:02:07 +00:00
parent 8b61c3919c
commit e8ea6cd95c
7 changed files with 332 additions and 19 deletions

View File

@ -24,6 +24,7 @@ import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.QueueBrowserSubscription;
import org.apache.activemq.broker.region.QueueSubscription;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.TopicSubscription;
@ -128,6 +129,20 @@ public class PolicyEntry extends DestinationMapEntry {
queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
}
public void update(Queue queue) {
baseUpdate(queue);
if (memoryLimit > 0) {
queue.getMemoryUsage().setLimit(memoryLimit);
}
queue.setUseConsumerPriority(isUseConsumerPriority());
queue.setStrictOrderDispatch(isStrictOrderDispatch());
queue.setOptimizedDispatch(isOptimizedDispatch());
queue.setLazyDispatch(isLazyDispatch());
queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts());
queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
}
public void configure(Broker broker,Topic topic) {
baseConfiguration(broker,topic);
if (dispatchPolicy != null) {
@ -145,28 +160,51 @@ public class PolicyEntry extends DestinationMapEntry {
topic.setLazyDispatch(isLazyDispatch());
}
public void baseConfiguration(Broker broker, BaseDestination destination) {
public void update(Topic topic) {
baseUpdate(topic);
if (memoryLimit > 0) {
topic.getMemoryUsage().setLimit(memoryLimit);
}
topic.setLazyDispatch(isLazyDispatch());
}
// attributes that can change on the fly
public void baseUpdate(BaseDestination destination) {
destination.setProducerFlowControl(isProducerFlowControl());
destination.setAlwaysRetroactive(isAlwaysRetroactive());
destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval());
destination.setEnableAudit(isEnableAudit());
destination.setMaxAuditDepth(getMaxQueueAuditDepth());
destination.setMaxProducersToAudit(getMaxProducersToAudit());
destination.setMaxPageSize(getMaxPageSize());
destination.setMaxBrowsePageSize(getMaxBrowsePageSize());
destination.setUseCache(isUseCache());
destination.setMinimumMessageSize((int) getMinimumMessageSize());
destination.setMaxExpirePageSize(getMaxExpirePageSize());
destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark());
destination.setGcIfInactive(isGcInactiveDestinations());
destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers());
destination.setInactiveTimoutBeforeGC(getInactiveTimoutBeforeGC());
destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage());
destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit());
destination.setAdvisoryForConsumed(isAdvisoryForConsumed());
destination.setAdvisoryForDelivery(isAdvisoryForDelivery());
destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages());
destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers());
destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers());
destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
destination.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers());
}
public void baseConfiguration(Broker broker, BaseDestination destination) {
baseUpdate(destination);
destination.setEnableAudit(isEnableAudit());
destination.setMaxAuditDepth(getMaxQueueAuditDepth());
destination.setMaxProducersToAudit(getMaxProducersToAudit());
destination.setUseCache(isUseCache());
destination.setExpireMessagesPeriod(getExpireMessagesPeriod());
destination.setMaxExpirePageSize(getMaxExpirePageSize());
destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark());
SlowConsumerStrategy scs = getSlowConsumerStrategy();
if (scs != null) {
scs.setBrokerService(broker);
@ -174,12 +212,6 @@ public class PolicyEntry extends DestinationMapEntry {
}
destination.setSlowConsumerStrategy(scs);
destination.setPrioritizedMessages(isPrioritizedMessages());
destination.setGcIfInactive(isGcInactiveDestinations());
destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers());
destination.setInactiveTimoutBeforeGC(getInactiveTimoutBeforeGC());
destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage());
destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit());
}
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
@ -872,4 +904,5 @@ public class PolicyEntry extends DestinationMapEntry {
public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) {
this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
}
}

View File

@ -23,6 +23,7 @@ import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -45,7 +46,11 @@ import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.virtual.CompositeQueue;
import org.apache.activemq.broker.region.virtual.CompositeTopic;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
@ -62,6 +67,8 @@ import org.apache.activemq.schema.core.DtoBroker;
import org.apache.activemq.schema.core.DtoCompositeQueue;
import org.apache.activemq.schema.core.DtoCompositeTopic;
import org.apache.activemq.schema.core.DtoNetworkConnector;
import org.apache.activemq.schema.core.DtoPolicyEntry;
import org.apache.activemq.schema.core.DtoPolicyMap;
import org.apache.activemq.schema.core.DtoVirtualDestinationInterceptor;
import org.apache.activemq.schema.core.DtoVirtualTopic;
import org.apache.activemq.security.AuthorizationBroker;
@ -231,6 +238,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
private void processSelectiveChanges(DtoBroker currentConfiguration, DtoBroker modifiedConfiguration) {
for (Class upDatable : new Class[]{
DtoBroker.DestinationPolicy.class,
DtoBroker.NetworkConnectors.class,
DtoBroker.DestinationInterceptors.class,
DtoBroker.Plugins.class}) {
@ -243,6 +251,13 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
List current = filter(currentConfiguration, upDatable);
List modified = filter(modifiedConfiguration, upDatable);
if (current.equals(modified)) {
LOG.debug("no changes to " + upDatable.getSimpleName());
return;
} else {
info("changes to " + upDatable.getSimpleName());
}
int modIndex = 0, currentIndex = 0;
for (; modIndex < modified.size() && currentIndex < current.size(); modIndex++, currentIndex++) {
// walk the list for mods
@ -267,12 +282,18 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
// mapping all supported updatable elements to support getContents
private List<Object> getContents(Object o) {
List<Object> answer = new ArrayList<Object>();
try {
return (List<Object>) o.getClass().getMethod("getContents", new Class[]{}).invoke(o, new Object[]{});
Object val = o.getClass().getMethod("getContents", new Class[]{}).invoke(o, new Object[]{});
if (val instanceof List) {
answer = (List<Object>) val;
} else {
answer.add(val);
}
} catch (Exception e) {
info("Failed to access getContents for " + o + ", runtime modifications not supported", e);
}
return new ArrayList<Object>();
return answer;
}
private void applyModifications(List<Object> current, List<Object> modification) {
@ -297,6 +318,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
private void modify(Object existing, Object candidate) {
if (candidate instanceof DtoAuthorizationPlugin) {
try {
// replace authorization map - need exclusive write lock to total broker
AuthorizationBroker authorizationBroker =
@ -306,12 +328,47 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
} catch (Exception e) {
info("failed to apply modified AuthorizationMap to AuthorizationBroker", e);
}
} else if (candidate instanceof DtoPolicyMap) {
List<Object> existingEntries = filter(existing, DtoPolicyMap.PolicyEntries.class);
List<Object> candidateEntries = filter(candidate, DtoPolicyMap.PolicyEntries.class);
// walk the map for mods
applyModifications(getContents(existingEntries.get(0)), getContents(candidateEntries.get(0)));
} else if (candidate instanceof DtoPolicyEntry) {
PolicyMap existingMap = getBrokerService().getDestinationPolicy();
PolicyEntry updatedEntry = fromDto(candidate, new PolicyEntry());
Set existingEntry = existingMap.get(updatedEntry.getDestination());
if (existingEntry.size() == 1) {
updatedEntry = fromDto(candidate, (PolicyEntry) existingEntry.iterator().next());
applyRetrospectively(updatedEntry);
info("updated policy for: " + updatedEntry.getDestination());
} else {
info("cannot modify policy matching multiple destinations: " + existingEntry + ", destination:" + updatedEntry.getDestination());
}
} else {
remove(existing);
addNew(candidate);
}
}
private void applyRetrospectively(PolicyEntry updatedEntry) {
RegionBroker regionBroker = (RegionBroker) getBrokerService().getRegionBroker();
for (Destination destination : regionBroker.getDestinations(updatedEntry.getDestination())) {
if (destination.getActiveMQDestination().isQueue()) {
updatedEntry.update((Queue) destination);
} else if (destination.getActiveMQDestination().isTopic()) {
updatedEntry.update((Topic) destination);
}
LOG.debug("applied update to:" + destination);
}
}
private AuthorizationMap fromDto(List<Object> map) {
XBeanAuthorizationMap xBeanAuthorizationMap = new XBeanAuthorizationMap();
for (Object o : map) {
@ -452,8 +509,16 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
}
}
});
} else if (o instanceof DtoPolicyEntry) {
PolicyEntry addition = fromDto(o, new PolicyEntry());
PolicyMap existingMap = getBrokerService().getDestinationPolicy();
existingMap.put(addition.getDestination(), addition);
applyRetrospectively(addition);
info("added policy for: " + addition.getDestination());
} else {
info("No runtime support for modifications to " + o);
info("No runtime support for additions of " + o);
}
}

View File

@ -27,6 +27,22 @@
<jxb:property name="Contents" />
</jxb:bindings>
<jxb:bindings node="xs:element[@name='broker']/xs:complexType/xs:choice/xs:choice/xs:element[@name='destinationPolicy']/xs:complexType/xs:choice">
<jxb:property name="Contents" />
</jxb:bindings>
<jxb:bindings node="xs:element[@name='policyMap']/xs:complexType/xs:choice">
<jxb:property name="Contents" />
</jxb:bindings>
<jxb:bindings node="xs:element[@name='policyMap']/xs:complexType/xs:choice/xs:choice/xs:element[@name='policyEntries']/xs:complexType/xs:sequence">
<jxb:property name="Contents" />
</jxb:bindings>
<jxb:bindings node="xs:element[@name='policyMap']/xs:complexType/xs:choice/xs:choice/xs:element[@name='defaultEntry']/xs:complexType/xs:choice">
<jxb:property name="Contents" />
</jxb:bindings>
<jxb:bindings node="xs:element[@name='virtualDestinationInterceptor']/xs:complexType/xs:choice">
<jxb:property name="Contents" />
</jxb:bindings>
@ -43,7 +59,7 @@
<jxb:property name="Contents" />
</jxb:bindings>
<jxb:bindings node="xs:element[@name='authorizationMap']/xs:complexType/xs:choice/xs:choice/xs:element[@name='authorizationEntries']/xs:complexType/xs:sequence">
<jxb:bindings node="xs:element/xs:complexType/xs:choice/xs:choice/xs:element[@name='authorizationEntries']/xs:complexType/xs:sequence">
<jxb:property name="Contents" />
</jxb:bindings>

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import javax.jms.Session;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class PolicyEntryTest extends RuntimeConfigTestSupport {
String configurationSeed = "policyEntryTest";
@Test
public void testMod() throws Exception {
final String brokerConfig = configurationSeed + "-policy-ml-broker";
applyNewConfig(brokerConfig, configurationSeed + "-policy-ml");
startBroker(brokerConfig);
assertTrue("broker alive", brokerService.isStarted());
verifyQueueLimit("Before", 1024);
applyNewConfig(brokerConfig, configurationSeed + "-policy-ml-mod", SLEEP);
verifyQueueLimit("After", 2048);
// change to existing dest
verifyQueueLimit("Before", 2048);
}
@Test
public void testAddNdMod() throws Exception {
final String brokerConfig = configurationSeed + "-policy-ml-broker";
applyNewConfig(brokerConfig, configurationSeed + "-policy-ml");
startBroker(brokerConfig);
assertTrue("broker alive", brokerService.isStarted());
verifyQueueLimit("Before", 1024);
verifyTopicLimit("Before", brokerService.getSystemUsage().getMemoryUsage().getLimit());
applyNewConfig(brokerConfig, configurationSeed + "-policy-ml-add", SLEEP);
verifyTopicLimit("After", 2048l);
verifyQueueLimit("After", 2048);
// change to existing dest
verifyTopicLimit("Before", 2048l);
}
private void verifyQueueLimit(String dest, int memoryLimit) throws Exception {
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection();
try {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createConsumer(session.createQueue(dest));
assertEquals(memoryLimit, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).getMemoryUsage().getLimit());
} finally {
connection.close();
}
}
private void verifyTopicLimit(String dest, long memoryLimit) throws Exception {
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection();
try {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createConsumer(session.createTopic(dest));
assertEquals(memoryLimit, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQTopic(dest)).getMemoryUsage().getLimit());
} finally {
connection.close();
}
}
}

View File

@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
<plugins>
<runtimeConfigurationPlugin checkPeriod="1000" />
</plugins>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" memoryLimit="2048"/>
<policyEntry topic=">" memoryLimit="2048"/>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
</beans>

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
<plugins>
<runtimeConfigurationPlugin checkPeriod="1000" />
</plugins>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" memoryLimit="2048"/>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
</beans>

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
<plugins>
<runtimeConfigurationPlugin checkPeriod="1000" />
</plugins>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" memoryLimit="1024"/>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
</beans>