diff --git a/activemq-core/pom.xml b/activemq-core/pom.xml index 0994cc3ca2..aa25236a2a 100755 --- a/activemq-core/pom.xml +++ b/activemq-core/pom.xml @@ -607,6 +607,7 @@ org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.* org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.* org/apache/activemq/usecases/TopicProducerFlowControlTest.* + org/apache/activemq/usecases/TempTopicProducerFlowControlTest.* org/apache/activemq/store/jdbc/JDBCNegativeQueueTest.* org/apache/activemq/bugs/AMQ2314Test.* org/apache/activemq/kaha/LoadTest.* diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java index 555e364f06..3830c03a16 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java @@ -147,7 +147,7 @@ public class ManagedRegionBroker extends RegionBroker { @Override protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { - return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); + return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); } @Override diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java index 98316cbcc4..cf2900b278 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java @@ -35,9 +35,9 @@ public class ManagedTempQueueRegion extends TempQueueRegion { private final ManagedRegionBroker regionBroker; - public ManagedTempQueueRegion(ManagedRegionBroker broker, BrokerService brokerService, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, + public ManagedTempQueueRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { - super(broker, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); + super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); this.regionBroker = broker; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java index 7469c90c24..c955caa2ab 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java @@ -78,17 +78,13 @@ public abstract class AbstractTempRegion extends AbstractRegion { } } - protected abstract Destination doCreateDestination( - ConnectionContext context, ActiveMQDestination destination) - throws Exception; - protected synchronized Destination createDestination( ConnectionContext context, ActiveMQDestination destination) throws Exception { Destination result = cachedDestinations.remove(new CachedDestination( destination)); if (result == null) { - result = doCreateDestination(context, destination); + result = destinationFactory.createDestination(context, destination, destinationStatistics); } return result; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java index c8af5178b4..6c146f0c8b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java @@ -77,6 +77,7 @@ public class DestinationFactoryImpl extends DestinationFactory { if (destination.isTemporary()) { final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination; Queue queue = new TempQueue(brokerService, destination, null, destinationStatistics, taskRunnerFactory); + configureQueue(queue, destination); queue.initialize(); return queue; } else { @@ -89,6 +90,7 @@ public class DestinationFactoryImpl extends DestinationFactory { } else if (destination.isTemporary()) { Topic topic = new Topic(brokerService, destination, null, destinationStatistics, taskRunnerFactory); + configureTopic(topic, destination); topic.initialize(); return topic; } else { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index a68e96665f..ab9357e4b2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -169,7 +169,7 @@ public class RegionBroker extends EmptyBroker { } protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { - return new TempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); + return new TempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); } protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java index fce069abaa..75a6d6ccee 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java @@ -33,24 +33,10 @@ import org.slf4j.LoggerFactory; * */ public class TempQueueRegion extends AbstractTempRegion { - private static final Logger LOG = LoggerFactory.getLogger(TempQueueRegion.class); - private final BrokerService brokerService; - - public TempQueueRegion(RegionBroker broker, BrokerService brokerService, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, + + public TempQueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); - // We should allow the following to be configurable via a Destination - // Policy - // setAutoCreateDestinations(false); - this.brokerService = brokerService; - } - - protected Destination doCreateDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { - TempQueue result = new TempQueue(brokerService, destination, null, destinationStatistics, taskRunnerFactory); - brokerService.getDestinationPolicy(); - configureQueue(result, destination); - result.initialize(); - return result; } protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException { @@ -87,17 +73,4 @@ public class TempQueueRegion extends AbstractTempRegion { public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { processDispatchNotificationViaDestination(messageDispatchNotification); } - - protected void configureQueue(Queue queue, ActiveMQDestination destination) { - if (broker == null) { - throw new IllegalStateException("broker property is not set"); - } - if (broker.getDestinationPolicy() != null) { - PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); - if (entry != null) { - entry.configure(broker,queue); - } - } - } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java index 644c142294..e1ffd05910 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java @@ -21,6 +21,7 @@ import javax.jms.JMSException; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTempTopic; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.usage.SystemUsage; @@ -37,9 +38,6 @@ public class TempTopicRegion extends AbstractTempRegion { public TempTopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); - // We should allow the following to be configurable via a Destination - // Policy - // setAutoCreateDestinations(false); } protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException { @@ -82,9 +80,4 @@ public class TempTopicRegion extends AbstractTempRegion { super.removeDestination(context, destination, timeout); } - - protected Destination doCreateDestination(ConnectionContext context, - ActiveMQDestination destination) throws Exception { - return destinationFactory.createDestination(context, destination, destinationStatistics); - } } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/TempTopicProducerFlowControlTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/TempTopicProducerFlowControlTest.java new file mode 100644 index 0000000000..ca8b697bba --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/TempTopicProducerFlowControlTest.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import javax.jms.Destination; +import javax.jms.Session; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; + +public class TempTopicProducerFlowControlTest extends TopicProducerFlowControlTest { + + @Override + protected void setDestinationPolicy(BrokerService broker, PolicyMap pm) { + PolicyEntry tpe = new PolicyEntry(); + tpe.setTempTopic(true); + tpe.setMemoryLimit(destinationMemLimit); + tpe.setProducerFlowControl(true); + tpe.setAdvisoryWhenFull(true); + pm.setDefaultEntry(tpe); + + broker.setDestinationPolicy(pm); + } + + @Override + protected Destination createDestination(Session session) throws Exception { + return session.createTemporaryTopic(); + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java index 008430824d..9abe1250fa 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java @@ -30,6 +30,8 @@ import javax.jms.Session; import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; @@ -103,6 +105,9 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis connectionFactory.setAlwaysSyncSend(true); connectionFactory.setProducerWindowSize(1024); + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); + prefetchPolicy.setAll(5000); + connectionFactory.setPrefetchPolicy(prefetchPolicy); // Start the test destination listener Connection c = connectionFactory.createConnection(); c.start(); @@ -158,10 +163,10 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis assertEquals("Didn't consume all messages", numMessagesToSend, consumed.get()); assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() { - public boolean isSatisified() throws Exception { - return blockedCounter.get() > 0; - } - }, 5 * 1000)); + public boolean isSatisified() throws Exception { + return blockedCounter.get() > 0; + } + }, 5 * 1000)); } protected Destination createDestination(Session listenerSession) throws Exception {