diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index 4f487bfe45..ab47ea4e30 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -25,7 +25,10 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.jms.IllegalStateException; import javax.jms.JMSException; + +import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConsumerBrokerExchange; import org.apache.activemq.DestinationDoesNotExistException; @@ -64,6 +67,7 @@ public abstract class AbstractRegion implements Region { protected final SystemUsage usageManager; protected final DestinationFactory destinationFactory; protected final DestinationStatistics destinationStatistics; + protected final RegionStatistics regionStatistics = new RegionStatistics(); protected final RegionBroker broker; protected boolean autoCreateDestinations = true; protected final TaskRunnerFactory taskRunnerFactory; @@ -120,7 +124,16 @@ public abstract class AbstractRegion implements Region { } finally { destinationsLock.readLock().unlock(); } - destinations.clear(); + + destinationsLock.writeLock().lock(); + try { + destinations.clear(); + regionStatistics.getAdvisoryDestinations().reset(); + regionStatistics.getDestinations().reset(); + regionStatistics.getAllDestinations().reset(); + } finally { + destinationsLock.writeLock().unlock(); + } } public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, @@ -131,6 +144,10 @@ public abstract class AbstractRegion implements Region { Destination dest = destinations.get(destination); if (dest == null) { if (destination.isTemporary() == false || createIfTemporary) { + // Limit the number of destinations that can be created if + // maxDestinations has been set on a policy + validateMaxDestinations(destination); + LOG.debug("{} adding destination: {}", broker.getBrokerName(), destination); dest = createDestination(context, destination); // intercept if there is a valid interceptor defined @@ -140,6 +157,7 @@ public abstract class AbstractRegion implements Region { } dest.start(); destinations.put(destination, dest); + updateRegionDestCounts(destination, 1); destinationMap.put(destination, dest); addSubscriptionsForDestination(context, dest); } @@ -157,6 +175,61 @@ public abstract class AbstractRegion implements Region { return subscriptions; } + + /** + * Updates the counts in RegionStatistics based on whether or not the destination + * is an Advisory Destination or not + * + * @param destination the destination being used to determine which counters to update + * @param count the count to add to the counters + */ + protected void updateRegionDestCounts(ActiveMQDestination destination, int count) { + if (destination != null) { + if (AdvisorySupport.isAdvisoryTopic(destination)) { + regionStatistics.getAdvisoryDestinations().add(count); + } else { + regionStatistics.getDestinations().add(count); + } + regionStatistics.getAllDestinations().add(count); + } + } + + /** + * This method checks whether or not the destination can be created based on + * {@link PolicyEntry#getMaxDestinations}, if it has been set. Advisory + * topics are ignored. + * + * @param destination + * @throws Exception + */ + protected void validateMaxDestinations(ActiveMQDestination destination) + throws Exception { + if (broker.getDestinationPolicy() != null) { + PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); + // Make sure the destination is not an advisory topic + if (entry != null && entry.getMaxDestinations() >= 0 + && !AdvisorySupport.isAdvisoryTopic(destination)) { + // If there is an entry for this destination, look up the set of + // destinations associated with this policy + // If a destination isn't specified, then just count up + // non-advisory destinations (ie count all destinations) + int destinationSize = (int) (entry.getDestination() != null ? + destinationMap.get(entry.getDestination()).size() : regionStatistics.getDestinations().getCount()); + if (destinationSize >= entry.getMaxDestinations()) { + if (entry.getDestination() != null) { + throw new IllegalStateException( + "The maxmimum number of destinations allowed ("+ entry.getMaxDestinations() + + ") for the policy " + entry.getDestination() + " has already been reached."); + // No destination has been set (default policy) + } else { + throw new IllegalStateException("The maxmimum number of destinations allowed (" + + entry.getMaxDestinations() + ") has already been reached."); + } + } + } + } + } + protected List addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception { @@ -210,6 +283,8 @@ public abstract class AbstractRegion implements Region { try { Destination dest = destinations.remove(destination); if (dest != null) { + updateRegionDestCounts(destination, -1); + // timeout<0 or we timed out, we now force any remaining // subscriptions to un-subscribe. for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) { @@ -620,7 +695,10 @@ public abstract class AbstractRegion implements Region { destination = destinationInterceptor.intercept(destination); } getDestinationMap().put(key, destination); - destinations.put(key, destination); + Destination prev = destinations.put(key, destination); + if (prev == null) { + updateRegionDestCounts(key, 1); + } } } finally { destinationsLock.writeLock().unlock(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionStatistics.java new file mode 100644 index 0000000000..d39a4f1214 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionStatistics.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.region; + +import org.apache.activemq.management.CountStatisticImpl; +import org.apache.activemq.management.StatsImpl; + +/** + * The J2EE Statistics for the Connection. + * + * + */ +public class RegionStatistics extends StatsImpl { + + private CountStatisticImpl advisoryDestinations; + private CountStatisticImpl destinations; + private CountStatisticImpl allDestinations; + + public RegionStatistics() { + this(true); + } + + public RegionStatistics(boolean enabled) { + + advisoryDestinations = new CountStatisticImpl("advisoryTopics", "The number of advisory destinations in the region"); + destinations = new CountStatisticImpl("destinations", "The number of regular (non-adivsory) destinations in the region"); + allDestinations = new CountStatisticImpl("allDestinations", "The total number of destinations, including advisory destinations, in the region"); + + addStatistic("advisoryDestinations", advisoryDestinations); + addStatistic("destinations", destinations); + addStatistic("allDestinations", allDestinations); + + this.setEnabled(enabled); + } + + public CountStatisticImpl getAdvisoryDestinations() { + return advisoryDestinations; + } + + public CountStatisticImpl getDestinations() { + return destinations; + } + + public CountStatisticImpl getAllDestinations() { + return allDestinations; + } + + public void reset() { + super.reset(); + advisoryDestinations.reset(); + destinations.reset(); + allDestinations.reset(); + } + + public void setEnabled(boolean enabled) { + super.setEnabled(enabled); + advisoryDestinations.setEnabled(enabled); + destinations.setEnabled(enabled); + allDestinations.setEnabled(enabled); + } + + public void setParent(RegionStatistics parent) { + if (parent != null) { + advisoryDestinations.setParent(parent.getAdvisoryDestinations()); + destinations.setParent(parent.getDestinations()); + allDestinations.setParent(parent.getAllDestinations()); + } else { + advisoryDestinations.setParent(null); + destinations.setParent(null); + allDestinations.setParent(null); + } + } + +} diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 41b77b209c..26cfa6b156 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -99,6 +99,8 @@ public class PolicyEntry extends DestinationMapEntry { private boolean reduceMemoryFootprint; private NetworkBridgeFilterFactory networkBridgeFilterFactory; private boolean doOptimzeMessageStorage = true; + private int maxDestinations = -1; + /* * percentage of in-flight messages above which optimize message store is disabled */ @@ -962,4 +964,19 @@ public class PolicyEntry extends DestinationMapEntry { public boolean isPersistJMSRedelivered() { return persistJMSRedelivered; } + + public int getMaxDestinations() { + return maxDestinations; + } + + /** + * Sets the maximum number of destinations that can be created + * + * @param maxDestinations + * maximum number of destinations + */ + public void setMaxDestinations(int maxDestinations) { + this.maxDestinations = maxDestinations; + } + } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MaxDestinationsPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MaxDestinationsPolicyTest.java new file mode 100644 index 0000000000..714da18732 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MaxDestinationsPolicyTest.java @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import static org.junit.Assert.assertTrue; + +import java.io.File; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; + +/** + * This unit test is to test that setting the property "maxDestinations" on + * PolicyEntry works correctly. If this property is set, it will limit the + * number of destinations that can be created. Advisory topics will be ignored + * during calculations. + * + */ +public class MaxDestinationsPolicyTest { + BrokerService broker; + ConnectionFactory factory; + Connection connection; + Session session; + MessageProducer producer; + + @Before + public void setUp() throws Exception { + broker = new BrokerService(); + + File testDataDir = new File("target/activemq-data/AMQ-5751"); + broker.setDataDirectoryFile(testDataDir); + broker.setUseJmx(true); + broker.setDeleteAllMessagesOnStartup(true); + broker.getSystemUsage().getMemoryUsage().setLimit(1024l * 1024 * 64); + KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); + persistenceAdapter.setDirectory(new File(testDataDir, "kahadb")); + broker.setPersistenceAdapter(persistenceAdapter); + broker.addConnector("tcp://localhost:0"); + broker.start(); + factory = new ActiveMQConnectionFactory(broker.getTransportConnectors() + .get(0).getConnectUri().toString()); + connection = factory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + @After + public void tearDown() throws Exception { + session.close(); + connection.stop(); + connection.close(); + broker.stop(); + } + + /** + * Test that 10 queues can be created when default policy allows it. + */ + @Test + public void testMaxDestinationDefaultPolicySuccess() throws Exception { + applyDefaultMaximumDestinationPolicy(10); + + for (int i = 0; i < 10; i++) { + createQueue("queue." + i); + } + } + + /** + * Test that default policy prevents going beyond max + */ + @Test(expected = javax.jms.IllegalStateException.class) + public void testMaxDestinationDefaultPolicyFail() throws Exception { + applyDefaultMaximumDestinationPolicy(10); + + for (int i = 0; i < 11; i++) { + createQueue("queue." + i); + } + } + + /** + * Test that a queue policy overrides the default policy + */ + @Test(expected = javax.jms.IllegalStateException.class) + public void testMaxDestinationOnQueuePolicy() throws Exception { + PolicyMap policyMap = applyDefaultMaximumDestinationPolicy(10); + applyMaximumDestinationPolicy(policyMap, new ActiveMQQueue("queue.>"), + 5); + + // This should fail even though the default policy is set to a limit of + // 10 because the + // queue policy overrides it + for (int i = 0; i < 6; i++) { + createQueue("queue." + i); + } + } + + /** + * Test that 10 topics can be created when default policy allows it. + */ + @Test + public void testTopicMaxDestinationDefaultPolicySuccess() throws Exception { + applyDefaultMaximumDestinationPolicy(10); + + for (int i = 0; i < 10; i++) { + createTopic("topic." + i); + } + } + + /** + * Test that topic creation will faill when exceeding the limit + */ + @Test(expected = javax.jms.IllegalStateException.class) + public void testTopicMaxDestinationDefaultPolicyFail() throws Exception { + applyDefaultMaximumDestinationPolicy(20); + + for (int i = 0; i < 21; i++) { + createTopic("topic." + i); + } + } + + /** + * Test that no limit is enforced + */ + @Test + public void testTopicDefaultPolicyNoMaxDestinations() throws Exception { + // -1 is the default and signals no max destinations + applyDefaultMaximumDestinationPolicy(-1); + for (int i = 0; i < 100; i++) { + createTopic("topic." + i); + } + } + + /** + * Test a mixture of queue and topic policies + */ + @Test + public void testComplexMaxDestinationPolicy() throws Exception { + PolicyMap policyMap = applyMaximumDestinationPolicy(new PolicyMap(), + new ActiveMQQueue("queue.>"), 5); + applyMaximumDestinationPolicy(policyMap, new ActiveMQTopic("topic.>"), + 10); + + for (int i = 0; i < 5; i++) { + createQueue("queue." + i); + } + + for (int i = 0; i < 10; i++) { + createTopic("topic." + i); + } + + // Make sure that adding one more of either a topic or a queue fails + boolean fail = false; + try { + createTopic("topic.test"); + } catch (javax.jms.IllegalStateException e) { + fail = true; + } + assertTrue(fail); + + fail = false; + try { + createQueue("queue.test"); + } catch (javax.jms.IllegalStateException e) { + fail = true; + } + assertTrue(fail); + } + + /** + * Test child destinations of a policy + */ + @Test + public void testMaxDestinationPolicyOnChildDests() throws Exception { + applyMaximumDestinationPolicy(new PolicyMap(), new ActiveMQTopic( + "topic.>"), 10); + + for (int i = 0; i < 10; i++) { + createTopic("topic.test" + i); + } + + // Make sure that adding one more fails + boolean fail = false; + try { + createTopic("topic.abc.test"); + } catch (javax.jms.IllegalStateException e) { + fail = true; + } + assertTrue(fail); + + } + + /** + * Test a topic policy overrides the default + */ + @Test(expected = javax.jms.IllegalStateException.class) + public void testMaxDestinationOnTopicPolicy() throws Exception { + PolicyMap policyMap = applyDefaultMaximumDestinationPolicy(10); + applyMaximumDestinationPolicy(policyMap, new ActiveMQTopic("topic.>"), + 5); + + // This should fail even though the default policy is set to a limit of + // 10 because the + // queue policy overrides it + for (int i = 0; i < 6; i++) { + createTopic("topic." + i); + } + } + + private PolicyMap applyMaximumDestinationPolicy(PolicyMap policyMap, + ActiveMQDestination destination, int maxDestinations) { + PolicyEntry entry = new PolicyEntry(); + entry.setDestination(destination); + entry.setMaxDestinations(maxDestinations); + policyMap.setPolicyEntries(Lists.newArrayList(entry)); + broker.setDestinationPolicy(policyMap); + return policyMap; + } + + private PolicyMap applyDefaultMaximumDestinationPolicy(int maxDestinations) { + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + if (maxDestinations >= 0) { + defaultEntry.setMaxDestinations(maxDestinations); + } + policyMap.setDefaultEntry(defaultEntry); + broker.setDestinationPolicy(policyMap); + return policyMap; + } + + private void createQueue(String queueName) throws Exception { + Queue queue = session.createQueue(queueName); + producer = session.createProducer(queue); + } + + private void createTopic(String topicName) throws Exception { + Topic topic = session.createTopic(topicName); + producer = session.createProducer(topic); + } + +}