https://issues.apache.org/jira/browse/AMQ-4091 - apply destination policy to temp topics, allows a memory limit to be specified to partition temp dest usage and enable producer flow control

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1393174 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-10-02 20:51:00 +00:00
parent 9fbf52df7a
commit 1005fc5062
10 changed files with 63 additions and 50 deletions

View File

@ -607,6 +607,7 @@
<exclude>org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.*</exclude>
<exclude>org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.*</exclude>
<exclude>org/apache/activemq/usecases/TopicProducerFlowControlTest.*</exclude>
<exclude>org/apache/activemq/usecases/TempTopicProducerFlowControlTest.*</exclude>
<exclude>org/apache/activemq/store/jdbc/JDBCNegativeQueueTest.*</exclude>
<exclude>org/apache/activemq/bugs/AMQ2314Test.*</exclude>
<exclude>org/apache/activemq/kaha/LoadTest.*</exclude>

View File

@ -147,7 +147,7 @@ public class ManagedRegionBroker extends RegionBroker {
@Override
protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
@Override

View File

@ -35,9 +35,9 @@ public class ManagedTempQueueRegion extends TempQueueRegion {
private final ManagedRegionBroker regionBroker;
public ManagedTempQueueRegion(ManagedRegionBroker broker, BrokerService brokerService, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
public ManagedTempQueueRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(broker, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
this.regionBroker = broker;
}

View File

@ -78,17 +78,13 @@ public abstract class AbstractTempRegion extends AbstractRegion {
}
}
protected abstract Destination doCreateDestination(
ConnectionContext context, ActiveMQDestination destination)
throws Exception;
protected synchronized Destination createDestination(
ConnectionContext context, ActiveMQDestination destination)
throws Exception {
Destination result = cachedDestinations.remove(new CachedDestination(
destination));
if (result == null) {
result = doCreateDestination(context, destination);
result = destinationFactory.createDestination(context, destination, destinationStatistics);
}
return result;
}

View File

@ -77,6 +77,7 @@ public class DestinationFactoryImpl extends DestinationFactory {
if (destination.isTemporary()) {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
Queue queue = new TempQueue(brokerService, destination, null, destinationStatistics, taskRunnerFactory);
configureQueue(queue, destination);
queue.initialize();
return queue;
} else {
@ -89,6 +90,7 @@ public class DestinationFactoryImpl extends DestinationFactory {
} else if (destination.isTemporary()) {
Topic topic = new Topic(brokerService, destination, null, destinationStatistics, taskRunnerFactory);
configureTopic(topic, destination);
topic.initialize();
return topic;
} else {

View File

@ -169,7 +169,7 @@ public class RegionBroker extends EmptyBroker {
}
protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new TempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
return new TempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {

View File

@ -33,24 +33,10 @@ import org.slf4j.LoggerFactory;
*
*/
public class TempQueueRegion extends AbstractTempRegion {
private static final Logger LOG = LoggerFactory.getLogger(TempQueueRegion.class);
private final BrokerService brokerService;
public TempQueueRegion(RegionBroker broker, BrokerService brokerService, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
public TempQueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
// We should allow the following to be configurable via a Destination
// Policy
// setAutoCreateDestinations(false);
this.brokerService = brokerService;
}
protected Destination doCreateDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
TempQueue result = new TempQueue(brokerService, destination, null, destinationStatistics, taskRunnerFactory);
brokerService.getDestinationPolicy();
configureQueue(result, destination);
result.initialize();
return result;
}
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
@ -87,17 +73,4 @@ public class TempQueueRegion extends AbstractTempRegion {
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
processDispatchNotificationViaDestination(messageDispatchNotification);
}
protected void configureQueue(Queue queue, ActiveMQDestination destination) {
if (broker == null) {
throw new IllegalStateException("broker property is not set");
}
if (broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {
entry.configure(broker,queue);
}
}
}
}

View File

@ -21,6 +21,7 @@ import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
@ -37,9 +38,6 @@ public class TempTopicRegion extends AbstractTempRegion {
public TempTopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
// We should allow the following to be configurable via a Destination
// Policy
// setAutoCreateDestinations(false);
}
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
@ -82,9 +80,4 @@ public class TempTopicRegion extends AbstractTempRegion {
super.removeDestination(context, destination, timeout);
}
protected Destination doCreateDestination(ConnectionContext context,
ActiveMQDestination destination) throws Exception {
return destinationFactory.createDestination(context, destination, destinationStatistics);
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import javax.jms.Destination;
import javax.jms.Session;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
public class TempTopicProducerFlowControlTest extends TopicProducerFlowControlTest {
@Override
protected void setDestinationPolicy(BrokerService broker, PolicyMap pm) {
PolicyEntry tpe = new PolicyEntry();
tpe.setTempTopic(true);
tpe.setMemoryLimit(destinationMemLimit);
tpe.setProducerFlowControl(true);
tpe.setAdvisoryWhenFull(true);
pm.setDefaultEntry(tpe);
broker.setDestinationPolicy(pm);
}
@Override
protected Destination createDestination(Session session) throws Exception {
return session.createTemporaryTopic();
}
}

View File

@ -30,6 +30,8 @@ import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@ -103,6 +105,9 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis
connectionFactory.setAlwaysSyncSend(true);
connectionFactory.setProducerWindowSize(1024);
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setAll(5000);
connectionFactory.setPrefetchPolicy(prefetchPolicy);
// Start the test destination listener
Connection c = connectionFactory.createConnection();
c.start();
@ -158,10 +163,10 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis
assertEquals("Didn't consume all messages", numMessagesToSend, consumed.get());
assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return blockedCounter.get() > 0;
}
}, 5 * 1000));
public boolean isSatisified() throws Exception {
return blockedCounter.get() > 0;
}
}, 5 * 1000));
}
protected Destination createDestination(Session listenerSession) throws Exception {