mirror of https://github.com/apache/activemq.git
This closes #97
This commit is contained in:
commit
062b0c2c08
|
@ -25,7 +25,10 @@ import java.util.Set;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import javax.jms.IllegalStateException;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.advisory.AdvisorySupport;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.ConsumerBrokerExchange;
|
||||
import org.apache.activemq.DestinationDoesNotExistException;
|
||||
|
@ -64,6 +67,7 @@ public abstract class AbstractRegion implements Region {
|
|||
protected final SystemUsage usageManager;
|
||||
protected final DestinationFactory destinationFactory;
|
||||
protected final DestinationStatistics destinationStatistics;
|
||||
protected final RegionStatistics regionStatistics = new RegionStatistics();
|
||||
protected final RegionBroker broker;
|
||||
protected boolean autoCreateDestinations = true;
|
||||
protected final TaskRunnerFactory taskRunnerFactory;
|
||||
|
@ -120,7 +124,16 @@ public abstract class AbstractRegion implements Region {
|
|||
} finally {
|
||||
destinationsLock.readLock().unlock();
|
||||
}
|
||||
destinations.clear();
|
||||
|
||||
destinationsLock.writeLock().lock();
|
||||
try {
|
||||
destinations.clear();
|
||||
regionStatistics.getAdvisoryDestinations().reset();
|
||||
regionStatistics.getDestinations().reset();
|
||||
regionStatistics.getAllDestinations().reset();
|
||||
} finally {
|
||||
destinationsLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
|
||||
|
@ -131,6 +144,10 @@ public abstract class AbstractRegion implements Region {
|
|||
Destination dest = destinations.get(destination);
|
||||
if (dest == null) {
|
||||
if (destination.isTemporary() == false || createIfTemporary) {
|
||||
// Limit the number of destinations that can be created if
|
||||
// maxDestinations has been set on a policy
|
||||
validateMaxDestinations(destination);
|
||||
|
||||
LOG.debug("{} adding destination: {}", broker.getBrokerName(), destination);
|
||||
dest = createDestination(context, destination);
|
||||
// intercept if there is a valid interceptor defined
|
||||
|
@ -140,6 +157,7 @@ public abstract class AbstractRegion implements Region {
|
|||
}
|
||||
dest.start();
|
||||
destinations.put(destination, dest);
|
||||
updateRegionDestCounts(destination, 1);
|
||||
destinationMap.put(destination, dest);
|
||||
addSubscriptionsForDestination(context, dest);
|
||||
}
|
||||
|
@ -157,6 +175,61 @@ public abstract class AbstractRegion implements Region {
|
|||
return subscriptions;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Updates the counts in RegionStatistics based on whether or not the destination
|
||||
* is an Advisory Destination or not
|
||||
*
|
||||
* @param destination the destination being used to determine which counters to update
|
||||
* @param count the count to add to the counters
|
||||
*/
|
||||
protected void updateRegionDestCounts(ActiveMQDestination destination, int count) {
|
||||
if (destination != null) {
|
||||
if (AdvisorySupport.isAdvisoryTopic(destination)) {
|
||||
regionStatistics.getAdvisoryDestinations().add(count);
|
||||
} else {
|
||||
regionStatistics.getDestinations().add(count);
|
||||
}
|
||||
regionStatistics.getAllDestinations().add(count);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method checks whether or not the destination can be created based on
|
||||
* {@link PolicyEntry#getMaxDestinations}, if it has been set. Advisory
|
||||
* topics are ignored.
|
||||
*
|
||||
* @param destination
|
||||
* @throws Exception
|
||||
*/
|
||||
protected void validateMaxDestinations(ActiveMQDestination destination)
|
||||
throws Exception {
|
||||
if (broker.getDestinationPolicy() != null) {
|
||||
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
|
||||
// Make sure the destination is not an advisory topic
|
||||
if (entry != null && entry.getMaxDestinations() >= 0
|
||||
&& !AdvisorySupport.isAdvisoryTopic(destination)) {
|
||||
// If there is an entry for this destination, look up the set of
|
||||
// destinations associated with this policy
|
||||
// If a destination isn't specified, then just count up
|
||||
// non-advisory destinations (ie count all destinations)
|
||||
int destinationSize = (int) (entry.getDestination() != null ?
|
||||
destinationMap.get(entry.getDestination()).size() : regionStatistics.getDestinations().getCount());
|
||||
if (destinationSize >= entry.getMaxDestinations()) {
|
||||
if (entry.getDestination() != null) {
|
||||
throw new IllegalStateException(
|
||||
"The maxmimum number of destinations allowed ("+ entry.getMaxDestinations() +
|
||||
") for the policy " + entry.getDestination() + " has already been reached.");
|
||||
// No destination has been set (default policy)
|
||||
} else {
|
||||
throw new IllegalStateException("The maxmimum number of destinations allowed ("
|
||||
+ entry.getMaxDestinations() + ") has already been reached.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest)
|
||||
throws Exception {
|
||||
|
||||
|
@ -210,6 +283,8 @@ public abstract class AbstractRegion implements Region {
|
|||
try {
|
||||
Destination dest = destinations.remove(destination);
|
||||
if (dest != null) {
|
||||
updateRegionDestCounts(destination, -1);
|
||||
|
||||
// timeout<0 or we timed out, we now force any remaining
|
||||
// subscriptions to un-subscribe.
|
||||
for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
|
||||
|
@ -620,7 +695,10 @@ public abstract class AbstractRegion implements Region {
|
|||
destination = destinationInterceptor.intercept(destination);
|
||||
}
|
||||
getDestinationMap().put(key, destination);
|
||||
destinations.put(key, destination);
|
||||
Destination prev = destinations.put(key, destination);
|
||||
if (prev == null) {
|
||||
updateRegionDestCounts(key, 1);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
destinationsLock.writeLock().unlock();
|
||||
|
|
|
@ -0,0 +1,89 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.broker.region;
|
||||
|
||||
import org.apache.activemq.management.CountStatisticImpl;
|
||||
import org.apache.activemq.management.StatsImpl;
|
||||
|
||||
/**
|
||||
* The J2EE Statistics for the Connection.
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class RegionStatistics extends StatsImpl {
|
||||
|
||||
private CountStatisticImpl advisoryDestinations;
|
||||
private CountStatisticImpl destinations;
|
||||
private CountStatisticImpl allDestinations;
|
||||
|
||||
public RegionStatistics() {
|
||||
this(true);
|
||||
}
|
||||
|
||||
public RegionStatistics(boolean enabled) {
|
||||
|
||||
advisoryDestinations = new CountStatisticImpl("advisoryTopics", "The number of advisory destinations in the region");
|
||||
destinations = new CountStatisticImpl("destinations", "The number of regular (non-adivsory) destinations in the region");
|
||||
allDestinations = new CountStatisticImpl("allDestinations", "The total number of destinations, including advisory destinations, in the region");
|
||||
|
||||
addStatistic("advisoryDestinations", advisoryDestinations);
|
||||
addStatistic("destinations", destinations);
|
||||
addStatistic("allDestinations", allDestinations);
|
||||
|
||||
this.setEnabled(enabled);
|
||||
}
|
||||
|
||||
public CountStatisticImpl getAdvisoryDestinations() {
|
||||
return advisoryDestinations;
|
||||
}
|
||||
|
||||
public CountStatisticImpl getDestinations() {
|
||||
return destinations;
|
||||
}
|
||||
|
||||
public CountStatisticImpl getAllDestinations() {
|
||||
return allDestinations;
|
||||
}
|
||||
|
||||
public void reset() {
|
||||
super.reset();
|
||||
advisoryDestinations.reset();
|
||||
destinations.reset();
|
||||
allDestinations.reset();
|
||||
}
|
||||
|
||||
public void setEnabled(boolean enabled) {
|
||||
super.setEnabled(enabled);
|
||||
advisoryDestinations.setEnabled(enabled);
|
||||
destinations.setEnabled(enabled);
|
||||
allDestinations.setEnabled(enabled);
|
||||
}
|
||||
|
||||
public void setParent(RegionStatistics parent) {
|
||||
if (parent != null) {
|
||||
advisoryDestinations.setParent(parent.getAdvisoryDestinations());
|
||||
destinations.setParent(parent.getDestinations());
|
||||
allDestinations.setParent(parent.getAllDestinations());
|
||||
} else {
|
||||
advisoryDestinations.setParent(null);
|
||||
destinations.setParent(null);
|
||||
allDestinations.setParent(null);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -99,6 +99,8 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
private boolean reduceMemoryFootprint;
|
||||
private NetworkBridgeFilterFactory networkBridgeFilterFactory;
|
||||
private boolean doOptimzeMessageStorage = true;
|
||||
private int maxDestinations = -1;
|
||||
|
||||
/*
|
||||
* percentage of in-flight messages above which optimize message store is disabled
|
||||
*/
|
||||
|
@ -962,4 +964,19 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
public boolean isPersistJMSRedelivered() {
|
||||
return persistJMSRedelivered;
|
||||
}
|
||||
|
||||
public int getMaxDestinations() {
|
||||
return maxDestinations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the maximum number of destinations that can be created
|
||||
*
|
||||
* @param maxDestinations
|
||||
* maximum number of destinations
|
||||
*/
|
||||
public void setMaxDestinations(int maxDestinations) {
|
||||
this.maxDestinations = maxDestinations;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,271 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.policy;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.Topic;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* This unit test is to test that setting the property "maxDestinations" on
|
||||
* PolicyEntry works correctly. If this property is set, it will limit the
|
||||
* number of destinations that can be created. Advisory topics will be ignored
|
||||
* during calculations.
|
||||
*
|
||||
*/
|
||||
public class MaxDestinationsPolicyTest {
|
||||
BrokerService broker;
|
||||
ConnectionFactory factory;
|
||||
Connection connection;
|
||||
Session session;
|
||||
MessageProducer producer;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
broker = new BrokerService();
|
||||
|
||||
File testDataDir = new File("target/activemq-data/AMQ-5751");
|
||||
broker.setDataDirectoryFile(testDataDir);
|
||||
broker.setUseJmx(true);
|
||||
broker.setDeleteAllMessagesOnStartup(true);
|
||||
broker.getSystemUsage().getMemoryUsage().setLimit(1024l * 1024 * 64);
|
||||
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
|
||||
persistenceAdapter.setDirectory(new File(testDataDir, "kahadb"));
|
||||
broker.setPersistenceAdapter(persistenceAdapter);
|
||||
broker.addConnector("tcp://localhost:0");
|
||||
broker.start();
|
||||
factory = new ActiveMQConnectionFactory(broker.getTransportConnectors()
|
||||
.get(0).getConnectUri().toString());
|
||||
connection = factory.createConnection();
|
||||
connection.start();
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
session.close();
|
||||
connection.stop();
|
||||
connection.close();
|
||||
broker.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that 10 queues can be created when default policy allows it.
|
||||
*/
|
||||
@Test
|
||||
public void testMaxDestinationDefaultPolicySuccess() throws Exception {
|
||||
applyDefaultMaximumDestinationPolicy(10);
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
createQueue("queue." + i);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that default policy prevents going beyond max
|
||||
*/
|
||||
@Test(expected = javax.jms.IllegalStateException.class)
|
||||
public void testMaxDestinationDefaultPolicyFail() throws Exception {
|
||||
applyDefaultMaximumDestinationPolicy(10);
|
||||
|
||||
for (int i = 0; i < 11; i++) {
|
||||
createQueue("queue." + i);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that a queue policy overrides the default policy
|
||||
*/
|
||||
@Test(expected = javax.jms.IllegalStateException.class)
|
||||
public void testMaxDestinationOnQueuePolicy() throws Exception {
|
||||
PolicyMap policyMap = applyDefaultMaximumDestinationPolicy(10);
|
||||
applyMaximumDestinationPolicy(policyMap, new ActiveMQQueue("queue.>"),
|
||||
5);
|
||||
|
||||
// This should fail even though the default policy is set to a limit of
|
||||
// 10 because the
|
||||
// queue policy overrides it
|
||||
for (int i = 0; i < 6; i++) {
|
||||
createQueue("queue." + i);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that 10 topics can be created when default policy allows it.
|
||||
*/
|
||||
@Test
|
||||
public void testTopicMaxDestinationDefaultPolicySuccess() throws Exception {
|
||||
applyDefaultMaximumDestinationPolicy(10);
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
createTopic("topic." + i);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that topic creation will faill when exceeding the limit
|
||||
*/
|
||||
@Test(expected = javax.jms.IllegalStateException.class)
|
||||
public void testTopicMaxDestinationDefaultPolicyFail() throws Exception {
|
||||
applyDefaultMaximumDestinationPolicy(20);
|
||||
|
||||
for (int i = 0; i < 21; i++) {
|
||||
createTopic("topic." + i);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that no limit is enforced
|
||||
*/
|
||||
@Test
|
||||
public void testTopicDefaultPolicyNoMaxDestinations() throws Exception {
|
||||
// -1 is the default and signals no max destinations
|
||||
applyDefaultMaximumDestinationPolicy(-1);
|
||||
for (int i = 0; i < 100; i++) {
|
||||
createTopic("topic." + i);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a mixture of queue and topic policies
|
||||
*/
|
||||
@Test
|
||||
public void testComplexMaxDestinationPolicy() throws Exception {
|
||||
PolicyMap policyMap = applyMaximumDestinationPolicy(new PolicyMap(),
|
||||
new ActiveMQQueue("queue.>"), 5);
|
||||
applyMaximumDestinationPolicy(policyMap, new ActiveMQTopic("topic.>"),
|
||||
10);
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
createQueue("queue." + i);
|
||||
}
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
createTopic("topic." + i);
|
||||
}
|
||||
|
||||
// Make sure that adding one more of either a topic or a queue fails
|
||||
boolean fail = false;
|
||||
try {
|
||||
createTopic("topic.test");
|
||||
} catch (javax.jms.IllegalStateException e) {
|
||||
fail = true;
|
||||
}
|
||||
assertTrue(fail);
|
||||
|
||||
fail = false;
|
||||
try {
|
||||
createQueue("queue.test");
|
||||
} catch (javax.jms.IllegalStateException e) {
|
||||
fail = true;
|
||||
}
|
||||
assertTrue(fail);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test child destinations of a policy
|
||||
*/
|
||||
@Test
|
||||
public void testMaxDestinationPolicyOnChildDests() throws Exception {
|
||||
applyMaximumDestinationPolicy(new PolicyMap(), new ActiveMQTopic(
|
||||
"topic.>"), 10);
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
createTopic("topic.test" + i);
|
||||
}
|
||||
|
||||
// Make sure that adding one more fails
|
||||
boolean fail = false;
|
||||
try {
|
||||
createTopic("topic.abc.test");
|
||||
} catch (javax.jms.IllegalStateException e) {
|
||||
fail = true;
|
||||
}
|
||||
assertTrue(fail);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a topic policy overrides the default
|
||||
*/
|
||||
@Test(expected = javax.jms.IllegalStateException.class)
|
||||
public void testMaxDestinationOnTopicPolicy() throws Exception {
|
||||
PolicyMap policyMap = applyDefaultMaximumDestinationPolicy(10);
|
||||
applyMaximumDestinationPolicy(policyMap, new ActiveMQTopic("topic.>"),
|
||||
5);
|
||||
|
||||
// This should fail even though the default policy is set to a limit of
|
||||
// 10 because the
|
||||
// queue policy overrides it
|
||||
for (int i = 0; i < 6; i++) {
|
||||
createTopic("topic." + i);
|
||||
}
|
||||
}
|
||||
|
||||
private PolicyMap applyMaximumDestinationPolicy(PolicyMap policyMap,
|
||||
ActiveMQDestination destination, int maxDestinations) {
|
||||
PolicyEntry entry = new PolicyEntry();
|
||||
entry.setDestination(destination);
|
||||
entry.setMaxDestinations(maxDestinations);
|
||||
policyMap.setPolicyEntries(Lists.newArrayList(entry));
|
||||
broker.setDestinationPolicy(policyMap);
|
||||
return policyMap;
|
||||
}
|
||||
|
||||
private PolicyMap applyDefaultMaximumDestinationPolicy(int maxDestinations) {
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
PolicyEntry defaultEntry = new PolicyEntry();
|
||||
if (maxDestinations >= 0) {
|
||||
defaultEntry.setMaxDestinations(maxDestinations);
|
||||
}
|
||||
policyMap.setDefaultEntry(defaultEntry);
|
||||
broker.setDestinationPolicy(policyMap);
|
||||
return policyMap;
|
||||
}
|
||||
|
||||
private void createQueue(String queueName) throws Exception {
|
||||
Queue queue = session.createQueue(queueName);
|
||||
producer = session.createProducer(queue);
|
||||
}
|
||||
|
||||
private void createTopic(String topicName) throws Exception {
|
||||
Topic topic = session.createTopic(topicName);
|
||||
producer = session.createProducer(topic);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue