From 37ede544868e2bf662f06dcbb2bffae5dd2f30a9 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Mon, 3 Jun 2013 21:12:44 +0000 Subject: [PATCH] some fixes for: https://issues.apache.org/jira/browse/AMQ-4487 Allow the maxProducersToAudit value to be set of Queue and QueueBrowser subscriptions which can help to workaround OOM errors on QueueBrowsers when there are more than the max number of producers audited. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1489180 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/PrefetchSubscription.java | 6 ++ .../broker/region/policy/PolicyEntry.java | 62 ++++++++++--------- 2 files changed, 38 insertions(+), 30 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index db3c6089a9..dc1fd9459f 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -834,6 +834,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription { public void setMaxProducersToAudit(int maxProducersToAudit) { this.maxProducersToAudit = maxProducersToAudit; + if (this.pending != null) { + this.pending.setMaxProducersToAudit(maxProducersToAudit); + } } public int getMaxAuditDepth() { @@ -842,6 +845,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription { public void setMaxAuditDepth(int maxAuditDepth) { this.maxAuditDepth = maxAuditDepth; + if (this.pending != null) { + this.pending.setMaxAuditDepth(maxAuditDepth); + } } public boolean isUsePrefetchExtension() { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 3f9ce63237..b5134a3937 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -39,9 +39,9 @@ import org.slf4j.LoggerFactory; /** * Represents an entry in a {@link PolicyMap} for assigning policies to a * specific destination or a hierarchical wildcard area of destinations. - * + * * @org.apache.xbean.XBean - * + * */ public class PolicyEntry extends DestinationMapEntry { @@ -118,7 +118,7 @@ public class PolicyEntry extends DestinationMapEntry { PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(broker,queue); queue.setMessages(messages); } - + queue.setUseConsumerPriority(isUseConsumerPriority()); queue.setStrictOrderDispatch(isStrictOrderDispatch()); queue.setOptimizedDispatch(isOptimizedDispatch()); @@ -144,7 +144,7 @@ public class PolicyEntry extends DestinationMapEntry { } topic.setLazyDispatch(isLazyDispatch()); } - + public void baseConfiguration(Broker broker,BaseDestination destination) { destination.setProducerFlowControl(isProducerFlowControl()); destination.setAlwaysRetroactive(isAlwaysRetroactive()); @@ -231,19 +231,21 @@ public class PolicyEntry extends DestinationMapEntry { sub.setMaxAuditDepth(auditDepth); } sub.setMaxProducersToAudit(getMaxProducersToAudit()); - sub.setUsePrefetchExtension(isUsePrefetchExtension()); + sub.setUsePrefetchExtension(isUsePrefetchExtension()); } - + public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription sub) { configurePrefetch(sub); sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); sub.setUsePrefetchExtension(isUsePrefetchExtension()); + sub.setMaxProducersToAudit(getMaxProducersToAudit()); } - + public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) { configurePrefetch(sub); sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); sub.setUsePrefetchExtension(isUsePrefetchExtension()); + sub.setMaxProducersToAudit(getMaxProducersToAudit()); } public void configurePrefetch(Subscription subscription) { @@ -442,13 +444,13 @@ public class PolicyEntry extends DestinationMapEntry { public void setAlwaysRetroactive(boolean alwaysRetroactive) { this.alwaysRetroactive = alwaysRetroactive; } - - + + /** * Set's the interval at which warnings about producers being blocked by * resource usage will be triggered. Values of 0 or less will disable * warnings - * + * * @param blockedProducerWarningInterval the interval at which warning about * blocked producers will be triggered. */ @@ -457,14 +459,14 @@ public class PolicyEntry extends DestinationMapEntry { } /** - * + * * @return the interval at which warning about blocked producers will be * triggered. */ public long getBlockedProducerWarningInterval() { return blockedProducerWarningInterval; } - + /** * @return the maxProducersToAudit */ @@ -522,23 +524,23 @@ public class PolicyEntry extends DestinationMapEntry { public void setOptimizedDispatch(boolean optimizedDispatch) { this.optimizedDispatch = optimizedDispatch; } - + public int getMaxPageSize() { return maxPageSize; } public void setMaxPageSize(int maxPageSize) { this.maxPageSize = maxPageSize; - } - + } + public int getMaxBrowsePageSize() { return maxBrowsePageSize; } public void setMaxBrowsePageSize(int maxPageSize) { this.maxBrowsePageSize = maxPageSize; - } - + } + public boolean isUseCache() { return useCache; } @@ -553,8 +555,8 @@ public class PolicyEntry extends DestinationMapEntry { public void setMinimumMessageSize(long minimumMessageSize) { this.minimumMessageSize = minimumMessageSize; - } - + } + public boolean isUseConsumerPriority() { return useConsumerPriority; } @@ -665,7 +667,7 @@ public class PolicyEntry extends DestinationMapEntry { public void setAdvisoryForConsumed(boolean advisoryForConsumed) { this.advisoryForConsumed = advisoryForConsumed; } - + /** * @return the advisdoryForFastProducers */ @@ -683,15 +685,15 @@ public class PolicyEntry extends DestinationMapEntry { public void setMaxExpirePageSize(int maxExpirePageSize) { this.maxExpirePageSize = maxExpirePageSize; } - + public int getMaxExpirePageSize() { return maxExpirePageSize; } - + public void setExpireMessagesPeriod(long expireMessagesPeriod) { this.expireMessagesPeriod = expireMessagesPeriod; } - + public long getExpireMessagesPeriod() { return expireMessagesPeriod; } @@ -759,7 +761,7 @@ public class PolicyEntry extends DestinationMapEntry { public void setDurableTopicPrefetch(int durableTopicPrefetch) { this.durableTopicPrefetch = durableTopicPrefetch; } - + public boolean isUsePrefetchExtension() { return this.usePrefetchExtension; } @@ -767,17 +769,17 @@ public class PolicyEntry extends DestinationMapEntry { public void setUsePrefetchExtension(boolean usePrefetchExtension) { this.usePrefetchExtension = usePrefetchExtension; } - + public int getCursorMemoryHighWaterMark() { return this.cursorMemoryHighWaterMark; } public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; - } + } public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) { - this.storeUsageHighWaterMark = storeUsageHighWaterMark; + this.storeUsageHighWaterMark = storeUsageHighWaterMark; } public int getStoreUsageHighWaterMark() { @@ -787,12 +789,12 @@ public class PolicyEntry extends DestinationMapEntry { public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) { this.slowConsumerStrategy = slowConsumerStrategy; } - + public SlowConsumerStrategy getSlowConsumerStrategy() { return this.slowConsumerStrategy; } - - + + public boolean isPrioritizedMessages() { return this.prioritizedMessages; }