diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 97f4b3e2b7..9302804f91 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -24,6 +24,7 @@ import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.QueueBrowserSubscription; import org.apache.activemq.broker.region.QueueSubscription; +import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.TopicSubscription; @@ -128,6 +129,20 @@ public class PolicyEntry extends DestinationMapEntry { queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault()); } + public void update(Queue queue) { + baseUpdate(queue); + if (memoryLimit > 0) { + queue.getMemoryUsage().setLimit(memoryLimit); + } + queue.setUseConsumerPriority(isUseConsumerPriority()); + queue.setStrictOrderDispatch(isStrictOrderDispatch()); + queue.setOptimizedDispatch(isOptimizedDispatch()); + queue.setLazyDispatch(isLazyDispatch()); + queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts()); + queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts()); + queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault()); + } + public void configure(Broker broker,Topic topic) { baseConfiguration(broker,topic); if (dispatchPolicy != null) { @@ -145,28 +160,51 @@ public class PolicyEntry extends DestinationMapEntry { topic.setLazyDispatch(isLazyDispatch()); } - public void baseConfiguration(Broker broker, BaseDestination destination) { + public void update(Topic topic) { + baseUpdate(topic); + if (memoryLimit > 0) { + topic.getMemoryUsage().setLimit(memoryLimit); + } + topic.setLazyDispatch(isLazyDispatch()); + } + + // attributes that can change on the fly + public void baseUpdate(BaseDestination destination) { destination.setProducerFlowControl(isProducerFlowControl()); destination.setAlwaysRetroactive(isAlwaysRetroactive()); destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval()); - destination.setEnableAudit(isEnableAudit()); - destination.setMaxAuditDepth(getMaxQueueAuditDepth()); - destination.setMaxProducersToAudit(getMaxProducersToAudit()); + destination.setMaxPageSize(getMaxPageSize()); destination.setMaxBrowsePageSize(getMaxBrowsePageSize()); - destination.setUseCache(isUseCache()); + destination.setMinimumMessageSize((int) getMinimumMessageSize()); + destination.setMaxExpirePageSize(getMaxExpirePageSize()); + destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); + destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark()); + + destination.setGcIfInactive(isGcInactiveDestinations()); + destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers()); + destination.setInactiveTimoutBeforeGC(getInactiveTimoutBeforeGC()); + destination.setReduceMemoryFootprint(isReduceMemoryFootprint()); + destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage()); + destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit()); + destination.setAdvisoryForConsumed(isAdvisoryForConsumed()); destination.setAdvisoryForDelivery(isAdvisoryForDelivery()); destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages()); destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers()); destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers()); destination.setAdvisoryWhenFull(isAdvisoryWhenFull()); - destination.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers); + destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers()); + } + + public void baseConfiguration(Broker broker, BaseDestination destination) { + baseUpdate(destination); + destination.setEnableAudit(isEnableAudit()); + destination.setMaxAuditDepth(getMaxQueueAuditDepth()); + destination.setMaxProducersToAudit(getMaxProducersToAudit()); + destination.setUseCache(isUseCache()); destination.setExpireMessagesPeriod(getExpireMessagesPeriod()); - destination.setMaxExpirePageSize(getMaxExpirePageSize()); - destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); - destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark()); SlowConsumerStrategy scs = getSlowConsumerStrategy(); if (scs != null) { scs.setBrokerService(broker); @@ -174,12 +212,6 @@ public class PolicyEntry extends DestinationMapEntry { } destination.setSlowConsumerStrategy(scs); destination.setPrioritizedMessages(isPrioritizedMessages()); - destination.setGcIfInactive(isGcInactiveDestinations()); - destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers()); - destination.setInactiveTimoutBeforeGC(getInactiveTimoutBeforeGC()); - destination.setReduceMemoryFootprint(isReduceMemoryFootprint()); - destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage()); - destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit()); } public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) { @@ -872,4 +904,5 @@ public class PolicyEntry extends DestinationMapEntry { public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) { this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit; } + } diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/RuntimeConfigurationBroker.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/RuntimeConfigurationBroker.java index 67bae0f8f3..5e73a2a150 100644 --- a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/RuntimeConfigurationBroker.java +++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/RuntimeConfigurationBroker.java @@ -23,6 +23,7 @@ import java.util.Date; import java.util.LinkedList; import java.util.List; import java.util.Properties; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -45,7 +46,11 @@ import org.apache.activemq.broker.jmx.ManagementContext; import org.apache.activemq.broker.region.CompositeDestinationInterceptor; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.virtual.CompositeQueue; import org.apache.activemq.broker.region.virtual.CompositeTopic; import org.apache.activemq.broker.region.virtual.VirtualDestination; @@ -62,6 +67,8 @@ import org.apache.activemq.schema.core.DtoBroker; import org.apache.activemq.schema.core.DtoCompositeQueue; import org.apache.activemq.schema.core.DtoCompositeTopic; import org.apache.activemq.schema.core.DtoNetworkConnector; +import org.apache.activemq.schema.core.DtoPolicyEntry; +import org.apache.activemq.schema.core.DtoPolicyMap; import org.apache.activemq.schema.core.DtoVirtualDestinationInterceptor; import org.apache.activemq.schema.core.DtoVirtualTopic; import org.apache.activemq.security.AuthorizationBroker; @@ -231,6 +238,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter { private void processSelectiveChanges(DtoBroker currentConfiguration, DtoBroker modifiedConfiguration) { for (Class upDatable : new Class[]{ + DtoBroker.DestinationPolicy.class, DtoBroker.NetworkConnectors.class, DtoBroker.DestinationInterceptors.class, DtoBroker.Plugins.class}) { @@ -243,6 +251,13 @@ public class RuntimeConfigurationBroker extends BrokerFilter { List current = filter(currentConfiguration, upDatable); List modified = filter(modifiedConfiguration, upDatable); + if (current.equals(modified)) { + LOG.debug("no changes to " + upDatable.getSimpleName()); + return; + } else { + info("changes to " + upDatable.getSimpleName()); + } + int modIndex = 0, currentIndex = 0; for (; modIndex < modified.size() && currentIndex < current.size(); modIndex++, currentIndex++) { // walk the list for mods @@ -267,12 +282,18 @@ public class RuntimeConfigurationBroker extends BrokerFilter { // mapping all supported updatable elements to support getContents private List getContents(Object o) { + List answer = new ArrayList(); try { - return (List) o.getClass().getMethod("getContents", new Class[]{}).invoke(o, new Object[]{}); + Object val = o.getClass().getMethod("getContents", new Class[]{}).invoke(o, new Object[]{}); + if (val instanceof List) { + answer = (List) val; + } else { + answer.add(val); + } } catch (Exception e) { info("Failed to access getContents for " + o + ", runtime modifications not supported", e); } - return new ArrayList(); + return answer; } private void applyModifications(List current, List modification) { @@ -297,6 +318,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter { private void modify(Object existing, Object candidate) { if (candidate instanceof DtoAuthorizationPlugin) { + try { // replace authorization map - need exclusive write lock to total broker AuthorizationBroker authorizationBroker = @@ -306,12 +328,47 @@ public class RuntimeConfigurationBroker extends BrokerFilter { } catch (Exception e) { info("failed to apply modified AuthorizationMap to AuthorizationBroker", e); } + + } else if (candidate instanceof DtoPolicyMap) { + + List existingEntries = filter(existing, DtoPolicyMap.PolicyEntries.class); + List candidateEntries = filter(candidate, DtoPolicyMap.PolicyEntries.class); + // walk the map for mods + applyModifications(getContents(existingEntries.get(0)), getContents(candidateEntries.get(0))); + + } else if (candidate instanceof DtoPolicyEntry) { + + PolicyMap existingMap = getBrokerService().getDestinationPolicy(); + + PolicyEntry updatedEntry = fromDto(candidate, new PolicyEntry()); + + Set existingEntry = existingMap.get(updatedEntry.getDestination()); + if (existingEntry.size() == 1) { + updatedEntry = fromDto(candidate, (PolicyEntry) existingEntry.iterator().next()); + applyRetrospectively(updatedEntry); + info("updated policy for: " + updatedEntry.getDestination()); + } else { + info("cannot modify policy matching multiple destinations: " + existingEntry + ", destination:" + updatedEntry.getDestination()); + } + } else { remove(existing); addNew(candidate); } } + private void applyRetrospectively(PolicyEntry updatedEntry) { + RegionBroker regionBroker = (RegionBroker) getBrokerService().getRegionBroker(); + for (Destination destination : regionBroker.getDestinations(updatedEntry.getDestination())) { + if (destination.getActiveMQDestination().isQueue()) { + updatedEntry.update((Queue) destination); + } else if (destination.getActiveMQDestination().isTopic()) { + updatedEntry.update((Topic) destination); + } + LOG.debug("applied update to:" + destination); + } + } + private AuthorizationMap fromDto(List map) { XBeanAuthorizationMap xBeanAuthorizationMap = new XBeanAuthorizationMap(); for (Object o : map) { @@ -452,8 +509,16 @@ public class RuntimeConfigurationBroker extends BrokerFilter { } } }); + } else if (o instanceof DtoPolicyEntry) { + + PolicyEntry addition = fromDto(o, new PolicyEntry()); + PolicyMap existingMap = getBrokerService().getDestinationPolicy(); + existingMap.put(addition.getDestination(), addition); + applyRetrospectively(addition); + info("added policy for: " + addition.getDestination()); + } else { - info("No runtime support for modifications to " + o); + info("No runtime support for additions of " + o); } } diff --git a/activemq-runtime-config/src/main/resources/binding.xjb b/activemq-runtime-config/src/main/resources/binding.xjb index cff5f9ddf1..e8ba555c6e 100644 --- a/activemq-runtime-config/src/main/resources/binding.xjb +++ b/activemq-runtime-config/src/main/resources/binding.xjb @@ -27,6 +27,22 @@ + + + + + + + + + + + + + + + + @@ -43,7 +59,7 @@ - + diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java new file mode 100644 index 0000000000..5374c49873 --- /dev/null +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Session; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class PolicyEntryTest extends RuntimeConfigTestSupport { + + String configurationSeed = "policyEntryTest"; + + @Test + public void testMod() throws Exception { + final String brokerConfig = configurationSeed + "-policy-ml-broker"; + applyNewConfig(brokerConfig, configurationSeed + "-policy-ml"); + startBroker(brokerConfig); + assertTrue("broker alive", brokerService.isStarted()); + + verifyQueueLimit("Before", 1024); + applyNewConfig(brokerConfig, configurationSeed + "-policy-ml-mod", SLEEP); + verifyQueueLimit("After", 2048); + + // change to existing dest + verifyQueueLimit("Before", 2048); + } + + @Test + public void testAddNdMod() throws Exception { + final String brokerConfig = configurationSeed + "-policy-ml-broker"; + applyNewConfig(brokerConfig, configurationSeed + "-policy-ml"); + startBroker(brokerConfig); + assertTrue("broker alive", brokerService.isStarted()); + + verifyQueueLimit("Before", 1024); + verifyTopicLimit("Before", brokerService.getSystemUsage().getMemoryUsage().getLimit()); + + applyNewConfig(brokerConfig, configurationSeed + "-policy-ml-add", SLEEP); + + verifyTopicLimit("After", 2048l); + verifyQueueLimit("After", 2048); + + // change to existing dest + verifyTopicLimit("Before", 2048l); + } + + private void verifyQueueLimit(String dest, int memoryLimit) throws Exception { + ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(); + try { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createConsumer(session.createQueue(dest)); + + assertEquals(memoryLimit, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).getMemoryUsage().getLimit()); + } finally { + connection.close(); + } + } + + private void verifyTopicLimit(String dest, long memoryLimit) throws Exception { + ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(); + try { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createConsumer(session.createTopic(dest)); + + assertEquals(memoryLimit, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQTopic(dest)).getMemoryUsage().getLimit()); + } finally { + connection.close(); + } + } +} diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-ml-add.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-ml-add.xml new file mode 100644 index 0000000000..b7ddb268ba --- /dev/null +++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-ml-add.xml @@ -0,0 +1,37 @@ + + + + + + + + + + + + + + + + + + diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-ml-mod.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-ml-mod.xml new file mode 100644 index 0000000000..3333bf8155 --- /dev/null +++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-ml-mod.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-ml.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-ml.xml new file mode 100644 index 0000000000..21884ab579 --- /dev/null +++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-ml.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + +