From e35519f3a116231e3e88452e2c0752609c3cf931 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Fri, 17 Dec 2010 18:33:13 +0000 Subject: [PATCH] resolve: https://issues.apache.org/jira/browse/AMQ-3095 - with test git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1050463 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/policy/PolicyEntry.java | 2 +- .../activemq/bugs/DurableConsumerTest.java | 29 ++++++++++++++++++- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 1ba2f4e4ec..c62f2738bb 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -209,7 +209,7 @@ public class PolicyEntry extends DestinationMapEntry { int prefetch = sub.getPrefetchSize(); sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); //override prefetch size if not set by the Consumer - if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH){ + if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH || prefetch == ActiveMQPrefetchPolicy.DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH){ sub.setPrefetchSize(getDurableTopicPrefetch()); } if (pendingDurableSubscriberPolicy != null) { diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java index 4296c274d3..18f5e8baf9 100755 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java @@ -42,11 +42,15 @@ import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import javax.management.ObjectName; import junit.framework.Test; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.BrokerView; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.store.kahadb.KahaDBStore; @@ -297,6 +301,30 @@ public class DurableConsumerTest extends CombinationTestSupport{ public void testConsumer() throws Exception{ doTestConsumer(false); } + + public void testPrefetchViaBrokerConfig() throws Exception { + + Integer prefetchVal = new Integer(150); + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setDurableTopicPrefetch(prefetchVal.intValue()); + policyEntry.setPrioritizedMessages(true); + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(policyEntry); + broker.setDestinationPolicy(policyMap); + broker.start(); + + factory = createConnectionFactory(); + Connection consumerConnection = factory.createConnection(); + consumerConnection.setClientID(CONSUMER_NAME); + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = consumerSession.createTopic(getClass().getName()); + MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME); + consumerConnection.start(); + + ObjectName activeSubscriptionObjectName = broker.getAdminView().getDurableTopicSubscribers()[0]; + Object prefetchFromSubView = broker.getManagementContext().getAttribute(activeSubscriptionObjectName, "PrefetchSize"); + assertEquals(prefetchVal, prefetchFromSubView); + } public void doTestConsumer(boolean forceRecover) throws Exception{ @@ -407,7 +435,6 @@ public class DurableConsumerTest extends CombinationTestSupport{ answer.setPersistenceAdapter(kaha); answer.addConnector(bindAddress); answer.setUseShutdownHook(false); - answer.setUseJmx(false); answer.setAdvisorySupport(false); answer.setDedicatedTaskRunner(useDedicatedTaskRunner); }