mirror of https://github.com/apache/activemq.git
resolve: https://issues.apache.org/jira/browse/AMQ-3095 - with test
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1050463 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
11d79743bf
commit
e35519f3a1
|
@ -209,7 +209,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
||||||
int prefetch = sub.getPrefetchSize();
|
int prefetch = sub.getPrefetchSize();
|
||||||
sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
|
sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
|
||||||
//override prefetch size if not set by the Consumer
|
//override prefetch size if not set by the Consumer
|
||||||
if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH){
|
if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH || prefetch == ActiveMQPrefetchPolicy.DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH){
|
||||||
sub.setPrefetchSize(getDurableTopicPrefetch());
|
sub.setPrefetchSize(getDurableTopicPrefetch());
|
||||||
}
|
}
|
||||||
if (pendingDurableSubscriberPolicy != null) {
|
if (pendingDurableSubscriberPolicy != null) {
|
||||||
|
|
|
@ -42,11 +42,15 @@ import javax.jms.TopicConnectionFactory;
|
||||||
import javax.jms.TopicPublisher;
|
import javax.jms.TopicPublisher;
|
||||||
import javax.jms.TopicSession;
|
import javax.jms.TopicSession;
|
||||||
import javax.jms.TopicSubscriber;
|
import javax.jms.TopicSubscriber;
|
||||||
|
import javax.management.ObjectName;
|
||||||
import junit.framework.Test;
|
import junit.framework.Test;
|
||||||
import org.apache.activemq.ActiveMQConnection;
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.CombinationTestSupport;
|
import org.apache.activemq.CombinationTestSupport;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.jmx.BrokerView;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||||
import org.apache.activemq.store.kahadb.KahaDBStore;
|
import org.apache.activemq.store.kahadb.KahaDBStore;
|
||||||
|
@ -298,6 +302,30 @@ public class DurableConsumerTest extends CombinationTestSupport{
|
||||||
doTestConsumer(false);
|
doTestConsumer(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testPrefetchViaBrokerConfig() throws Exception {
|
||||||
|
|
||||||
|
Integer prefetchVal = new Integer(150);
|
||||||
|
PolicyEntry policyEntry = new PolicyEntry();
|
||||||
|
policyEntry.setDurableTopicPrefetch(prefetchVal.intValue());
|
||||||
|
policyEntry.setPrioritizedMessages(true);
|
||||||
|
PolicyMap policyMap = new PolicyMap();
|
||||||
|
policyMap.setDefaultEntry(policyEntry);
|
||||||
|
broker.setDestinationPolicy(policyMap);
|
||||||
|
broker.start();
|
||||||
|
|
||||||
|
factory = createConnectionFactory();
|
||||||
|
Connection consumerConnection = factory.createConnection();
|
||||||
|
consumerConnection.setClientID(CONSUMER_NAME);
|
||||||
|
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Topic topic = consumerSession.createTopic(getClass().getName());
|
||||||
|
MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
|
||||||
|
consumerConnection.start();
|
||||||
|
|
||||||
|
ObjectName activeSubscriptionObjectName = broker.getAdminView().getDurableTopicSubscribers()[0];
|
||||||
|
Object prefetchFromSubView = broker.getManagementContext().getAttribute(activeSubscriptionObjectName, "PrefetchSize");
|
||||||
|
assertEquals(prefetchVal, prefetchFromSubView);
|
||||||
|
}
|
||||||
|
|
||||||
public void doTestConsumer(boolean forceRecover) throws Exception{
|
public void doTestConsumer(boolean forceRecover) throws Exception{
|
||||||
|
|
||||||
if (forceRecover) {
|
if (forceRecover) {
|
||||||
|
@ -407,7 +435,6 @@ public class DurableConsumerTest extends CombinationTestSupport{
|
||||||
answer.setPersistenceAdapter(kaha);
|
answer.setPersistenceAdapter(kaha);
|
||||||
answer.addConnector(bindAddress);
|
answer.addConnector(bindAddress);
|
||||||
answer.setUseShutdownHook(false);
|
answer.setUseShutdownHook(false);
|
||||||
answer.setUseJmx(false);
|
|
||||||
answer.setAdvisorySupport(false);
|
answer.setAdvisorySupport(false);
|
||||||
answer.setDedicatedTaskRunner(useDedicatedTaskRunner);
|
answer.setDedicatedTaskRunner(useDedicatedTaskRunner);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue