Adding a configuration option to PolicyEntry to enable setting the maximum number of created destinations by policy on the broker.

This resolves https://issues.apache.org/jira/browse/AMQ-5751
This commit is contained in:
Christopher L. Shannon (cshannon) 2015-05-01 19:02:18 +00:00 committed by Timothy Bish
parent eaf5c12151
commit 886e2d4d97
4 changed files with 457 additions and 2 deletions

View File

@ -25,7 +25,10 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.DestinationDoesNotExistException;
@ -64,6 +67,7 @@ public abstract class AbstractRegion implements Region {
protected final SystemUsage usageManager;
protected final DestinationFactory destinationFactory;
protected final DestinationStatistics destinationStatistics;
protected final RegionStatistics regionStatistics = new RegionStatistics();
protected final RegionBroker broker;
protected boolean autoCreateDestinations = true;
protected final TaskRunnerFactory taskRunnerFactory;
@ -120,7 +124,16 @@ public abstract class AbstractRegion implements Region {
} finally {
destinationsLock.readLock().unlock();
}
destinationsLock.writeLock().lock();
try {
destinations.clear();
regionStatistics.getAdvisoryDestinations().reset();
regionStatistics.getDestinations().reset();
regionStatistics.getAllDestinations().reset();
} finally {
destinationsLock.writeLock().unlock();
}
}
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
@ -131,6 +144,10 @@ public abstract class AbstractRegion implements Region {
Destination dest = destinations.get(destination);
if (dest == null) {
if (destination.isTemporary() == false || createIfTemporary) {
// Limit the number of destinations that can be created if
// maxDestinations has been set on a policy
validateMaxDestinations(destination);
LOG.debug("{} adding destination: {}", broker.getBrokerName(), destination);
dest = createDestination(context, destination);
// intercept if there is a valid interceptor defined
@ -140,6 +157,7 @@ public abstract class AbstractRegion implements Region {
}
dest.start();
destinations.put(destination, dest);
updateRegionDestCounts(destination, 1);
destinationMap.put(destination, dest);
addSubscriptionsForDestination(context, dest);
}
@ -157,6 +175,61 @@ public abstract class AbstractRegion implements Region {
return subscriptions;
}
/**
* Updates the counts in RegionStatistics based on whether or not the destination
* is an Advisory Destination or not
*
* @param destination the destination being used to determine which counters to update
* @param count the count to add to the counters
*/
protected void updateRegionDestCounts(ActiveMQDestination destination, int count) {
if (destination != null) {
if (AdvisorySupport.isAdvisoryTopic(destination)) {
regionStatistics.getAdvisoryDestinations().add(count);
} else {
regionStatistics.getDestinations().add(count);
}
regionStatistics.getAllDestinations().add(count);
}
}
/**
* This method checks whether or not the destination can be created based on
* {@link PolicyEntry#getMaxDestinations}, if it has been set. Advisory
* topics are ignored.
*
* @param destination
* @throws Exception
*/
protected void validateMaxDestinations(ActiveMQDestination destination)
throws Exception {
if (broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
// Make sure the destination is not an advisory topic
if (entry != null && entry.getMaxDestinations() >= 0
&& !AdvisorySupport.isAdvisoryTopic(destination)) {
// If there is an entry for this destination, look up the set of
// destinations associated with this policy
// If a destination isn't specified, then just count up
// non-advisory destinations (ie count all destinations)
int destinationSize = (int) (entry.getDestination() != null ?
destinationMap.get(entry.getDestination()).size() : regionStatistics.getDestinations().getCount());
if (destinationSize >= entry.getMaxDestinations()) {
if (entry.getDestination() != null) {
throw new IllegalStateException(
"The maxmimum number of destinations allowed ("+ entry.getMaxDestinations() +
") for the policy " + entry.getDestination() + " has already been reached.");
// No destination has been set (default policy)
} else {
throw new IllegalStateException("The maxmimum number of destinations allowed ("
+ entry.getMaxDestinations() + ") has already been reached.");
}
}
}
}
}
protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest)
throws Exception {
@ -210,6 +283,8 @@ public abstract class AbstractRegion implements Region {
try {
Destination dest = destinations.remove(destination);
if (dest != null) {
updateRegionDestCounts(destination, -1);
// timeout<0 or we timed out, we now force any remaining
// subscriptions to un-subscribe.
for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
@ -620,7 +695,10 @@ public abstract class AbstractRegion implements Region {
destination = destinationInterceptor.intercept(destination);
}
getDestinationMap().put(key, destination);
destinations.put(key, destination);
Destination prev = destinations.put(key, destination);
if (prev == null) {
updateRegionDestCounts(key, 1);
}
}
} finally {
destinationsLock.writeLock().unlock();

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.management.StatsImpl;
/**
* The J2EE Statistics for the Connection.
*
*
*/
public class RegionStatistics extends StatsImpl {
private CountStatisticImpl advisoryDestinations;
private CountStatisticImpl destinations;
private CountStatisticImpl allDestinations;
public RegionStatistics() {
this(true);
}
public RegionStatistics(boolean enabled) {
advisoryDestinations = new CountStatisticImpl("advisoryTopics", "The number of advisory destinations in the region");
destinations = new CountStatisticImpl("destinations", "The number of regular (non-adivsory) destinations in the region");
allDestinations = new CountStatisticImpl("allDestinations", "The total number of destinations, including advisory destinations, in the region");
addStatistic("advisoryDestinations", advisoryDestinations);
addStatistic("destinations", destinations);
addStatistic("allDestinations", allDestinations);
this.setEnabled(enabled);
}
public CountStatisticImpl getAdvisoryDestinations() {
return advisoryDestinations;
}
public CountStatisticImpl getDestinations() {
return destinations;
}
public CountStatisticImpl getAllDestinations() {
return allDestinations;
}
public void reset() {
super.reset();
advisoryDestinations.reset();
destinations.reset();
allDestinations.reset();
}
public void setEnabled(boolean enabled) {
super.setEnabled(enabled);
advisoryDestinations.setEnabled(enabled);
destinations.setEnabled(enabled);
allDestinations.setEnabled(enabled);
}
public void setParent(RegionStatistics parent) {
if (parent != null) {
advisoryDestinations.setParent(parent.getAdvisoryDestinations());
destinations.setParent(parent.getDestinations());
allDestinations.setParent(parent.getAllDestinations());
} else {
advisoryDestinations.setParent(null);
destinations.setParent(null);
allDestinations.setParent(null);
}
}
}

View File

@ -99,6 +99,8 @@ public class PolicyEntry extends DestinationMapEntry {
private boolean reduceMemoryFootprint;
private NetworkBridgeFilterFactory networkBridgeFilterFactory;
private boolean doOptimzeMessageStorage = true;
private int maxDestinations = -1;
/*
* percentage of in-flight messages above which optimize message store is disabled
*/
@ -962,4 +964,19 @@ public class PolicyEntry extends DestinationMapEntry {
public boolean isPersistJMSRedelivered() {
return persistJMSRedelivered;
}
public int getMaxDestinations() {
return maxDestinations;
}
/**
* Sets the maximum number of destinations that can be created
*
* @param maxDestinations
* maximum number of destinations
*/
public void setMaxDestinations(int maxDestinations) {
this.maxDestinations = maxDestinations;
}
}

View File

@ -0,0 +1,271 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.policy;
import static org.junit.Assert.assertTrue;
import java.io.File;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Lists;
/**
* This unit test is to test that setting the property "maxDestinations" on
* PolicyEntry works correctly. If this property is set, it will limit the
* number of destinations that can be created. Advisory topics will be ignored
* during calculations.
*
*/
public class MaxDestinationsPolicyTest {
BrokerService broker;
ConnectionFactory factory;
Connection connection;
Session session;
MessageProducer producer;
@Before
public void setUp() throws Exception {
broker = new BrokerService();
File testDataDir = new File("target/activemq-data/AMQ-5751");
broker.setDataDirectoryFile(testDataDir);
broker.setUseJmx(true);
broker.setDeleteAllMessagesOnStartup(true);
broker.getSystemUsage().getMemoryUsage().setLimit(1024l * 1024 * 64);
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
persistenceAdapter.setDirectory(new File(testDataDir, "kahadb"));
broker.setPersistenceAdapter(persistenceAdapter);
broker.addConnector("tcp://localhost:0");
broker.start();
factory = new ActiveMQConnectionFactory(broker.getTransportConnectors()
.get(0).getConnectUri().toString());
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
@After
public void tearDown() throws Exception {
session.close();
connection.stop();
connection.close();
broker.stop();
}
/**
* Test that 10 queues can be created when default policy allows it.
*/
@Test
public void testMaxDestinationDefaultPolicySuccess() throws Exception {
applyDefaultMaximumDestinationPolicy(10);
for (int i = 0; i < 10; i++) {
createQueue("queue." + i);
}
}
/**
* Test that default policy prevents going beyond max
*/
@Test(expected = javax.jms.IllegalStateException.class)
public void testMaxDestinationDefaultPolicyFail() throws Exception {
applyDefaultMaximumDestinationPolicy(10);
for (int i = 0; i < 11; i++) {
createQueue("queue." + i);
}
}
/**
* Test that a queue policy overrides the default policy
*/
@Test(expected = javax.jms.IllegalStateException.class)
public void testMaxDestinationOnQueuePolicy() throws Exception {
PolicyMap policyMap = applyDefaultMaximumDestinationPolicy(10);
applyMaximumDestinationPolicy(policyMap, new ActiveMQQueue("queue.>"),
5);
// This should fail even though the default policy is set to a limit of
// 10 because the
// queue policy overrides it
for (int i = 0; i < 6; i++) {
createQueue("queue." + i);
}
}
/**
* Test that 10 topics can be created when default policy allows it.
*/
@Test
public void testTopicMaxDestinationDefaultPolicySuccess() throws Exception {
applyDefaultMaximumDestinationPolicy(10);
for (int i = 0; i < 10; i++) {
createTopic("topic." + i);
}
}
/**
* Test that topic creation will faill when exceeding the limit
*/
@Test(expected = javax.jms.IllegalStateException.class)
public void testTopicMaxDestinationDefaultPolicyFail() throws Exception {
applyDefaultMaximumDestinationPolicy(20);
for (int i = 0; i < 21; i++) {
createTopic("topic." + i);
}
}
/**
* Test that no limit is enforced
*/
@Test
public void testTopicDefaultPolicyNoMaxDestinations() throws Exception {
// -1 is the default and signals no max destinations
applyDefaultMaximumDestinationPolicy(-1);
for (int i = 0; i < 100; i++) {
createTopic("topic." + i);
}
}
/**
* Test a mixture of queue and topic policies
*/
@Test
public void testComplexMaxDestinationPolicy() throws Exception {
PolicyMap policyMap = applyMaximumDestinationPolicy(new PolicyMap(),
new ActiveMQQueue("queue.>"), 5);
applyMaximumDestinationPolicy(policyMap, new ActiveMQTopic("topic.>"),
10);
for (int i = 0; i < 5; i++) {
createQueue("queue." + i);
}
for (int i = 0; i < 10; i++) {
createTopic("topic." + i);
}
// Make sure that adding one more of either a topic or a queue fails
boolean fail = false;
try {
createTopic("topic.test");
} catch (javax.jms.IllegalStateException e) {
fail = true;
}
assertTrue(fail);
fail = false;
try {
createQueue("queue.test");
} catch (javax.jms.IllegalStateException e) {
fail = true;
}
assertTrue(fail);
}
/**
* Test child destinations of a policy
*/
@Test
public void testMaxDestinationPolicyOnChildDests() throws Exception {
applyMaximumDestinationPolicy(new PolicyMap(), new ActiveMQTopic(
"topic.>"), 10);
for (int i = 0; i < 10; i++) {
createTopic("topic.test" + i);
}
// Make sure that adding one more fails
boolean fail = false;
try {
createTopic("topic.abc.test");
} catch (javax.jms.IllegalStateException e) {
fail = true;
}
assertTrue(fail);
}
/**
* Test a topic policy overrides the default
*/
@Test(expected = javax.jms.IllegalStateException.class)
public void testMaxDestinationOnTopicPolicy() throws Exception {
PolicyMap policyMap = applyDefaultMaximumDestinationPolicy(10);
applyMaximumDestinationPolicy(policyMap, new ActiveMQTopic("topic.>"),
5);
// This should fail even though the default policy is set to a limit of
// 10 because the
// queue policy overrides it
for (int i = 0; i < 6; i++) {
createTopic("topic." + i);
}
}
private PolicyMap applyMaximumDestinationPolicy(PolicyMap policyMap,
ActiveMQDestination destination, int maxDestinations) {
PolicyEntry entry = new PolicyEntry();
entry.setDestination(destination);
entry.setMaxDestinations(maxDestinations);
policyMap.setPolicyEntries(Lists.newArrayList(entry));
broker.setDestinationPolicy(policyMap);
return policyMap;
}
private PolicyMap applyDefaultMaximumDestinationPolicy(int maxDestinations) {
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
if (maxDestinations >= 0) {
defaultEntry.setMaxDestinations(maxDestinations);
}
policyMap.setDefaultEntry(defaultEntry);
broker.setDestinationPolicy(policyMap);
return policyMap;
}
private void createQueue(String queueName) throws Exception {
Queue queue = session.createQueue(queueName);
producer = session.createProducer(queue);
}
private void createTopic(String topicName) throws Exception {
Topic topic = session.createTopic(topicName);
producer = session.createProducer(topic);
}
}