diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java index 2251b5b5b5..16ddcd6ee2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java @@ -47,6 +47,7 @@ public abstract class AbstractSubscription implements Subscription { protected final CopyOnWriteArrayList destinations = new CopyOnWriteArrayList(); private BooleanExpression selectorExpression; private ObjectName objectName; + private int cursorMemoryHighWaterMark = 70; public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { @@ -211,6 +212,14 @@ public abstract class AbstractSubscription implements Subscription { } + public int getCursorMemoryHighWaterMark(){ + return this.cursorMemoryHighWaterMark; + } + + public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){ + this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark; + } + public int countBeforeFull() { return getDispatchedQueueSize() - info.getPrefetchSize(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 0ad0173841..7f526af05a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -50,7 +50,7 @@ public abstract class BaseDestination implements Destination { protected final MessageStore store; protected SystemUsage systemUsage; protected MemoryUsage memoryUsage; - private boolean producerFlowControl = false; + private boolean producerFlowControl = true; private int maxProducersToAudit = 1024; private int maxAuditDepth = 2048; private boolean enableAudit = true; @@ -72,6 +72,7 @@ public abstract class BaseDestination implements Destination { protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY; protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD; private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE; + protected int cursorMemoryHighWaterMark = 70; /** * @param broker @@ -374,6 +375,14 @@ public abstract class BaseDestination implements Destination { public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { this.deadLetterStrategy = deadLetterStrategy; } + + public int getCursorMemoryHighWaterMark() { + return this.cursorMemoryHighWaterMark; + } + + public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { + this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; + } /** * called when message is consumed @@ -510,5 +519,4 @@ public abstract class BaseDestination implements Destination { public void processDispatchNotification( MessageDispatchNotification messageDispatchNotification) throws Exception { } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java index 3fcd7d68f2..8a59c0487b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -105,6 +105,10 @@ public interface Destination extends Service, Task { public void setMinimumMessageSize(int minimumMessageSize); + public int getCursorMemoryHighWaterMark(); + + public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark); + /** * optionally called by a Subscriber - to inform the Destination its * ready for more messages diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index 92037d7471..3d93292693 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -265,4 +265,12 @@ public class DestinationFilter implements Destination { MessageDispatchNotification messageDispatchNotification) throws Exception { next.processDispatchNotification(messageDispatchNotification); } + + public int getCursorMemoryHighWaterMark() { + return next.getCursorMemoryHighWaterMark(); + } + + public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { + next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 0f24e865d4..8f0db770a2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -54,6 +54,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us super(broker,usageManager, context, info); this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this); this.pending.setSystemUsage(usageManager); + this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); this.keepDurableSubsActive = keepDurableSubsActive; subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); @@ -115,6 +116,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us } synchronized (pending) { pending.setSystemUsage(memoryManager); + pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); pending.start(); // If nothing was in the persistent store, then try to use the diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index e51a8d7505..5f05528022 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -525,6 +525,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { this.pending = pending; if (this.pending!=null) { this.pending.setSystemUsage(usageManager); + this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index b6b49d8fd3..f51959aab1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -191,6 +191,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { messages.setMaxAuditDepth(getMaxAuditDepth()); messages.setMaxProducersToAudit(getMaxProducersToAudit()); messages.setUseCache(isUseCache()); + messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); if (messages.isRecoveryRequired()) { store.recover(new MessageRecoveryListener() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java index f8b7028e8a..d408423e51 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java @@ -223,4 +223,8 @@ public interface Subscription extends SubscriptionRecovery { int countBeforeFull(); ConnectionContext getContext(); + + public int getCursorMemoryHighWaterMark(); + + public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java index aeadd4b5a9..3f94d6ca6a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java @@ -52,6 +52,7 @@ public class TempQueue extends Queue{ public void initialize() throws Exception { this.messages=new VMPendingMessageCursor(); + this.messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); this.systemUsage = brokerService.getSystemUsage(); memoryUsage.setParent(systemUsage.getMemoryUsage()); this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue: " + destination.getPhysicalName()); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index f36e5554cc..b74032af0f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -76,6 +76,7 @@ public class TopicSubscription extends AbstractSubscription { public void init() throws Exception { this.matched.setSystemUsage(usageManager); + this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); this.matched.start(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java index 742c12b928..967d06714c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java @@ -249,6 +249,16 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { nonPersistent.setUseCache(useCache); } } + + public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { + super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); + if (persistent != null) { + persistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); + } + if (nonPersistent != null) { + nonPersistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index fa74dca131..d7029e4083 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -59,7 +59,7 @@ public class PolicyEntry extends DestinationMapEntry { private int maxAuditDepth=2048; private int maxQueueAuditDepth=2048; private boolean enableAudit=true; - private boolean producerFlowControl = false; + private boolean producerFlowControl = true; private boolean optimizedDispatch=false; private int maxPageSize=BaseDestination.MAX_PAGE_SIZE; private int maxBrowsePageSize=BaseDestination.MAX_BROWSE_PAGE_SIZE; @@ -82,6 +82,7 @@ public class PolicyEntry extends DestinationMapEntry { private int queueBrowserPrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH; private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH; private int durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH; + private int cursorMemoryHighWaterMark=70; public void configure(Broker broker,Queue queue) { @@ -140,6 +141,7 @@ public class PolicyEntry extends DestinationMapEntry { destination.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers); destination.setExpireMessagesPeriod(getExpireMessagesPeriod()); destination.setMaxExpirePageSize(getMaxExpirePageSize()); + destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); } public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) { @@ -177,8 +179,8 @@ public class PolicyEntry extends DestinationMapEntry { String clientId = sub.getSubscriptionKey().getClientId(); String subName = sub.getSubscriptionKey().getSubscriptionName(); int prefetch = sub.getPrefetchSize(); + sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); //override prefetch size if not set by the Consumer - if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH){ sub.setPrefetchSize(getDurableTopicPrefetch()); } @@ -189,6 +191,7 @@ public class PolicyEntry extends DestinationMapEntry { } sub.setMaxAuditDepth(getMaxAuditDepth()); sub.setMaxProducersToAudit(getMaxProducersToAudit()); + } public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription sub) { @@ -199,6 +202,7 @@ public class PolicyEntry extends DestinationMapEntry { if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH){ sub.setPrefetchSize(getQueueBrowserPrefetch()); } + sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); } public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) { @@ -209,6 +213,7 @@ public class PolicyEntry extends DestinationMapEntry { if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH){ sub.setPrefetchSize(getQueuePrefetch()); } + sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); } // Properties @@ -661,6 +666,14 @@ public class PolicyEntry extends DestinationMapEntry { public void setDurableTopicPrefetch(int durableTopicPrefetch) { this.durableTopicPrefetch = durableTopicPrefetch; } + + public int getCursorMemoryHighWaterMark() { + return this.cursorMemoryHighWaterMark; + } + + public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { + this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; + } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java new file mode 100644 index 0000000000..c8dd27ba8d --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.policy; + +import junit.framework.TestCase; + +public class PolicyConfigTest extends TestCase{ + + public void testNoop() { + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java index 51190b8d56..d33972efac 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java @@ -285,6 +285,14 @@ public class QueueDuplicatesFromStoreTest extends TestCase { public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception { } + + public int getCursorMemoryHighWaterMark(){ + return 0; + } + + public void setCursorMemoryHighWaterMark( + int cursorMemoryHighWaterMark) { + } }; queue.addSubscription(contextNotInTx, subscription);