mirror of https://github.com/apache/activemq.git
Make maximum page size for queues configurable
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@618654 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
071b4b109e
commit
d0f3d4dce9
|
@ -39,6 +39,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
private int maxProducersToAudit=1024;
|
private int maxProducersToAudit=1024;
|
||||||
private int maxAuditDepth=1;
|
private int maxAuditDepth=1;
|
||||||
private boolean enableAudit=true;
|
private boolean enableAudit=true;
|
||||||
|
private int maxPageSize=1000;
|
||||||
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
|
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -153,6 +154,11 @@ public abstract class BaseDestination implements Destination {
|
||||||
destinationStatistics.getProducers().getCount() != 0;
|
destinationStatistics.getProducers().getCount() != 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getMaxPageSize() {
|
||||||
|
return maxPageSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxPageSize(int maxPageSize) {
|
||||||
|
this.maxPageSize = maxPageSize;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,4 +84,8 @@ public interface Destination extends Service {
|
||||||
void setEnableAudit(boolean enableAudit);
|
void setEnableAudit(boolean enableAudit);
|
||||||
|
|
||||||
boolean isActive();
|
boolean isActive();
|
||||||
|
|
||||||
|
int getMaxPageSize();
|
||||||
|
|
||||||
|
public void setMaxPageSize(int maxPageSize);
|
||||||
}
|
}
|
||||||
|
|
|
@ -171,4 +171,12 @@ public class DestinationFilter implements Destination {
|
||||||
public boolean isActive() {
|
public boolean isActive() {
|
||||||
return next.isActive();
|
return next.isActive();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getMaxPageSize() {
|
||||||
|
return next.getMaxPageSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxPageSize(int maxPageSize) {
|
||||||
|
next.setMaxPageSize(maxPageSize);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,7 +70,6 @@ import org.apache.commons.logging.LogFactory;
|
||||||
* @version $Revision: 1.28 $
|
* @version $Revision: 1.28 $
|
||||||
*/
|
*/
|
||||||
public class Queue extends BaseDestination implements Task {
|
public class Queue extends BaseDestination implements Task {
|
||||||
private static int MAXIMUM_PAGE_SIZE = 1000;
|
|
||||||
private final Log log;
|
private final Log log;
|
||||||
private final List<Subscription> consumers = new ArrayList<Subscription>(50);
|
private final List<Subscription> consumers = new ArrayList<Subscription>(50);
|
||||||
private PendingMessageCursor messages;
|
private PendingMessageCursor messages;
|
||||||
|
@ -974,7 +973,7 @@ public class Queue extends BaseDestination implements Task {
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<MessageReference> buildList(boolean force) throws Exception {
|
private List<MessageReference> buildList(boolean force) throws Exception {
|
||||||
final int toPageIn = MAXIMUM_PAGE_SIZE - pagedInMessages.size();
|
final int toPageIn = getMaxPageSize() - pagedInMessages.size();
|
||||||
List<MessageReference> result = null;
|
List<MessageReference> result = null;
|
||||||
if ((force || !consumers.isEmpty()) && toPageIn > 0) {
|
if ((force || !consumers.isEmpty()) && toPageIn > 0) {
|
||||||
messages.setMaxBatchSize(toPageIn);
|
messages.setMaxBatchSize(toPageIn);
|
||||||
|
|
|
@ -56,6 +56,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
||||||
private boolean enableAudit=true;
|
private boolean enableAudit=true;
|
||||||
private boolean producerFlowControl = true;
|
private boolean producerFlowControl = true;
|
||||||
private boolean optimizedDispatch=false;
|
private boolean optimizedDispatch=false;
|
||||||
|
private int maxPageSize=1000;
|
||||||
|
|
||||||
public void configure(Broker broker,Queue queue) {
|
public void configure(Broker broker,Queue queue) {
|
||||||
if (dispatchPolicy != null) {
|
if (dispatchPolicy != null) {
|
||||||
|
@ -76,6 +77,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
||||||
queue.setEnableAudit(isEnableAudit());
|
queue.setEnableAudit(isEnableAudit());
|
||||||
queue.setMaxAuditDepth(getMaxQueueAuditDepth());
|
queue.setMaxAuditDepth(getMaxQueueAuditDepth());
|
||||||
queue.setMaxProducersToAudit(getMaxProducersToAudit());
|
queue.setMaxProducersToAudit(getMaxProducersToAudit());
|
||||||
|
queue.setMaxPageSize(getMaxPageSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void configure(Topic topic) {
|
public void configure(Topic topic) {
|
||||||
|
@ -96,6 +98,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
||||||
topic.setEnableAudit(isEnableAudit());
|
topic.setEnableAudit(isEnableAudit());
|
||||||
topic.setMaxAuditDepth(getMaxAuditDepth());
|
topic.setMaxAuditDepth(getMaxAuditDepth());
|
||||||
topic.setMaxProducersToAudit(getMaxProducersToAudit());
|
topic.setMaxProducersToAudit(getMaxProducersToAudit());
|
||||||
|
topic.setMaxPageSize(getMaxPageSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
|
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
|
||||||
|
@ -350,4 +353,12 @@ public class PolicyEntry extends DestinationMapEntry {
|
||||||
this.optimizedDispatch = optimizedDispatch;
|
this.optimizedDispatch = optimizedDispatch;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getMaxPageSize() {
|
||||||
|
return maxPageSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxPageSize(int maxPageSize) {
|
||||||
|
this.maxPageSize = maxPageSize;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue