mirror of https://github.com/apache/activemq.git
some fixes for: https://issues.apache.org/jira/browse/AMQ-4487
Allow the maxProducersToAudit value to be set of Queue and QueueBrowser subscriptions which can help to workaround OOM errors on QueueBrowsers when there are more than the max number of producers audited. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1489180 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bc4392bdde
commit
37ede54486
|
@ -834,6 +834,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
|
||||
public void setMaxProducersToAudit(int maxProducersToAudit) {
|
||||
this.maxProducersToAudit = maxProducersToAudit;
|
||||
if (this.pending != null) {
|
||||
this.pending.setMaxProducersToAudit(maxProducersToAudit);
|
||||
}
|
||||
}
|
||||
|
||||
public int getMaxAuditDepth() {
|
||||
|
@ -842,6 +845,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
|
||||
public void setMaxAuditDepth(int maxAuditDepth) {
|
||||
this.maxAuditDepth = maxAuditDepth;
|
||||
if (this.pending != null) {
|
||||
this.pending.setMaxAuditDepth(maxAuditDepth);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isUsePrefetchExtension() {
|
||||
|
|
|
@ -39,9 +39,9 @@ import org.slf4j.LoggerFactory;
|
|||
/**
|
||||
* Represents an entry in a {@link PolicyMap} for assigning policies to a
|
||||
* specific destination or a hierarchical wildcard area of destinations.
|
||||
*
|
||||
*
|
||||
* @org.apache.xbean.XBean
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class PolicyEntry extends DestinationMapEntry {
|
||||
|
||||
|
@ -118,7 +118,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(broker,queue);
|
||||
queue.setMessages(messages);
|
||||
}
|
||||
|
||||
|
||||
queue.setUseConsumerPriority(isUseConsumerPriority());
|
||||
queue.setStrictOrderDispatch(isStrictOrderDispatch());
|
||||
queue.setOptimizedDispatch(isOptimizedDispatch());
|
||||
|
@ -144,7 +144,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
}
|
||||
topic.setLazyDispatch(isLazyDispatch());
|
||||
}
|
||||
|
||||
|
||||
public void baseConfiguration(Broker broker,BaseDestination destination) {
|
||||
destination.setProducerFlowControl(isProducerFlowControl());
|
||||
destination.setAlwaysRetroactive(isAlwaysRetroactive());
|
||||
|
@ -231,19 +231,21 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
sub.setMaxAuditDepth(auditDepth);
|
||||
}
|
||||
sub.setMaxProducersToAudit(getMaxProducersToAudit());
|
||||
sub.setUsePrefetchExtension(isUsePrefetchExtension());
|
||||
sub.setUsePrefetchExtension(isUsePrefetchExtension());
|
||||
}
|
||||
|
||||
|
||||
public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription sub) {
|
||||
configurePrefetch(sub);
|
||||
sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
|
||||
sub.setUsePrefetchExtension(isUsePrefetchExtension());
|
||||
sub.setMaxProducersToAudit(getMaxProducersToAudit());
|
||||
}
|
||||
|
||||
|
||||
public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) {
|
||||
configurePrefetch(sub);
|
||||
sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
|
||||
sub.setUsePrefetchExtension(isUsePrefetchExtension());
|
||||
sub.setMaxProducersToAudit(getMaxProducersToAudit());
|
||||
}
|
||||
|
||||
public void configurePrefetch(Subscription subscription) {
|
||||
|
@ -442,13 +444,13 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
public void setAlwaysRetroactive(boolean alwaysRetroactive) {
|
||||
this.alwaysRetroactive = alwaysRetroactive;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Set's the interval at which warnings about producers being blocked by
|
||||
* resource usage will be triggered. Values of 0 or less will disable
|
||||
* warnings
|
||||
*
|
||||
*
|
||||
* @param blockedProducerWarningInterval the interval at which warning about
|
||||
* blocked producers will be triggered.
|
||||
*/
|
||||
|
@ -457,14 +459,14 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @return the interval at which warning about blocked producers will be
|
||||
* triggered.
|
||||
*/
|
||||
public long getBlockedProducerWarningInterval() {
|
||||
return blockedProducerWarningInterval;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return the maxProducersToAudit
|
||||
*/
|
||||
|
@ -522,23 +524,23 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
public void setOptimizedDispatch(boolean optimizedDispatch) {
|
||||
this.optimizedDispatch = optimizedDispatch;
|
||||
}
|
||||
|
||||
|
||||
public int getMaxPageSize() {
|
||||
return maxPageSize;
|
||||
}
|
||||
|
||||
public void setMaxPageSize(int maxPageSize) {
|
||||
this.maxPageSize = maxPageSize;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public int getMaxBrowsePageSize() {
|
||||
return maxBrowsePageSize;
|
||||
}
|
||||
|
||||
public void setMaxBrowsePageSize(int maxPageSize) {
|
||||
this.maxBrowsePageSize = maxPageSize;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public boolean isUseCache() {
|
||||
return useCache;
|
||||
}
|
||||
|
@ -553,8 +555,8 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
|
||||
public void setMinimumMessageSize(long minimumMessageSize) {
|
||||
this.minimumMessageSize = minimumMessageSize;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public boolean isUseConsumerPriority() {
|
||||
return useConsumerPriority;
|
||||
}
|
||||
|
@ -665,7 +667,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
|
||||
this.advisoryForConsumed = advisoryForConsumed;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return the advisdoryForFastProducers
|
||||
*/
|
||||
|
@ -683,15 +685,15 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
public void setMaxExpirePageSize(int maxExpirePageSize) {
|
||||
this.maxExpirePageSize = maxExpirePageSize;
|
||||
}
|
||||
|
||||
|
||||
public int getMaxExpirePageSize() {
|
||||
return maxExpirePageSize;
|
||||
}
|
||||
|
||||
|
||||
public void setExpireMessagesPeriod(long expireMessagesPeriod) {
|
||||
this.expireMessagesPeriod = expireMessagesPeriod;
|
||||
}
|
||||
|
||||
|
||||
public long getExpireMessagesPeriod() {
|
||||
return expireMessagesPeriod;
|
||||
}
|
||||
|
@ -759,7 +761,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
public void setDurableTopicPrefetch(int durableTopicPrefetch) {
|
||||
this.durableTopicPrefetch = durableTopicPrefetch;
|
||||
}
|
||||
|
||||
|
||||
public boolean isUsePrefetchExtension() {
|
||||
return this.usePrefetchExtension;
|
||||
}
|
||||
|
@ -767,17 +769,17 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
public void setUsePrefetchExtension(boolean usePrefetchExtension) {
|
||||
this.usePrefetchExtension = usePrefetchExtension;
|
||||
}
|
||||
|
||||
|
||||
public int getCursorMemoryHighWaterMark() {
|
||||
return this.cursorMemoryHighWaterMark;
|
||||
}
|
||||
|
||||
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
|
||||
this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
|
||||
}
|
||||
}
|
||||
|
||||
public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
|
||||
this.storeUsageHighWaterMark = storeUsageHighWaterMark;
|
||||
this.storeUsageHighWaterMark = storeUsageHighWaterMark;
|
||||
}
|
||||
|
||||
public int getStoreUsageHighWaterMark() {
|
||||
|
@ -787,12 +789,12 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
|
||||
this.slowConsumerStrategy = slowConsumerStrategy;
|
||||
}
|
||||
|
||||
|
||||
public SlowConsumerStrategy getSlowConsumerStrategy() {
|
||||
return this.slowConsumerStrategy;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
public boolean isPrioritizedMessages() {
|
||||
return this.prioritizedMessages;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue