diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 874653688e..1d44d96025 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -39,6 +39,7 @@ public abstract class BaseDestination implements Destination { private int maxProducersToAudit=1024; private int maxAuditDepth=1; private boolean enableAudit=true; + private int maxPageSize=1000; protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); /** @@ -152,7 +153,12 @@ public abstract class BaseDestination implements Destination { return destinationStatistics.getConsumers().getCount() != 0 || destinationStatistics.getProducers().getCount() != 0; } - - + public int getMaxPageSize() { + return maxPageSize; + } + + public void setMaxPageSize(int maxPageSize) { + this.maxPageSize = maxPageSize; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java index bfa97451cd..6c4bbaef82 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -84,4 +84,8 @@ public interface Destination extends Service { void setEnableAudit(boolean enableAudit); boolean isActive(); + + int getMaxPageSize(); + + public void setMaxPageSize(int maxPageSize); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index 2eb319a35c..612f7b851c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -171,4 +171,12 @@ public class DestinationFilter implements Destination { public boolean isActive() { return next.isActive(); } + + public int getMaxPageSize() { + return next.getMaxPageSize(); + } + + public void setMaxPageSize(int maxPageSize) { + next.setMaxPageSize(maxPageSize); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 412e307cb9..fa4c943800 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -70,7 +70,6 @@ import org.apache.commons.logging.LogFactory; * @version $Revision: 1.28 $ */ public class Queue extends BaseDestination implements Task { - private static int MAXIMUM_PAGE_SIZE = 1000; private final Log log; private final List consumers = new ArrayList(50); private PendingMessageCursor messages; @@ -974,7 +973,7 @@ public class Queue extends BaseDestination implements Task { } private List buildList(boolean force) throws Exception { - final int toPageIn = MAXIMUM_PAGE_SIZE - pagedInMessages.size(); + final int toPageIn = getMaxPageSize() - pagedInMessages.size(); List result = null; if ((force || !consumers.isEmpty()) && toPageIn > 0) { messages.setMaxBatchSize(toPageIn); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index f41a6c97b5..9cab9ee4b0 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -56,6 +56,7 @@ public class PolicyEntry extends DestinationMapEntry { private boolean enableAudit=true; private boolean producerFlowControl = true; private boolean optimizedDispatch=false; + private int maxPageSize=1000; public void configure(Broker broker,Queue queue) { if (dispatchPolicy != null) { @@ -76,6 +77,7 @@ public class PolicyEntry extends DestinationMapEntry { queue.setEnableAudit(isEnableAudit()); queue.setMaxAuditDepth(getMaxQueueAuditDepth()); queue.setMaxProducersToAudit(getMaxProducersToAudit()); + queue.setMaxPageSize(getMaxPageSize()); } public void configure(Topic topic) { @@ -96,6 +98,7 @@ public class PolicyEntry extends DestinationMapEntry { topic.setEnableAudit(isEnableAudit()); topic.setMaxAuditDepth(getMaxAuditDepth()); topic.setMaxProducersToAudit(getMaxProducersToAudit()); + topic.setMaxPageSize(getMaxPageSize()); } public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) { @@ -349,5 +352,13 @@ public class PolicyEntry extends DestinationMapEntry { public void setOptimizedDispatch(boolean optimizedDispatch) { this.optimizedDispatch = optimizedDispatch; } + + public int getMaxPageSize() { + return maxPageSize; + } + + public void setMaxPageSize(int maxPageSize) { + this.maxPageSize = maxPageSize; + } }