git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@799090 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2009-07-29 21:40:29 +00:00
parent d5333d4e44
commit fc63a92339
4 changed files with 152 additions and 11 deletions

View File

@ -26,9 +26,17 @@ import org.apache.commons.logging.LogFactory;
* @org.apache.xbean.XBean element="prefetchPolicy"
* @version $Revision: 1.3 $
*/
public class ActiveMQPrefetchPolicy implements Serializable {
public class ActiveMQPrefetchPolicy extends Object implements Serializable {
public static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE - 1;
public static final int DEFAULT_QUEUE_PREFETCH = 1000;
public static final int DEFAULT_QUEUE_BROWSER_PREFETCH = 500;
public static final int DEFAULT_DURABLE_TOPIC_PREFETCH = 100;
public static final int DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH=1000;
public static final int DEFAULT_INPUT_STREAM_PREFETCH=100;
public static final int DEFAULT_TOPIC_PREFETCH = MAX_PREFETCH_SIZE;
private static final Log LOG = LogFactory.getLog(ActiveMQPrefetchPolicy.class);
private static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE - 1;
private int queuePrefetch;
private int queueBrowserPrefetch;
private int topicPrefetch;
@ -41,12 +49,12 @@ public class ActiveMQPrefetchPolicy implements Serializable {
* Initialize default prefetch policies
*/
public ActiveMQPrefetchPolicy() {
this.queuePrefetch = 1000;
this.queueBrowserPrefetch = 500;
this.topicPrefetch = MAX_PREFETCH_SIZE;
this.durableTopicPrefetch = 100;
this.optimizeDurableTopicPrefetch = 1000;
this.inputStreamPrefetch = 100;
this.queuePrefetch = DEFAULT_QUEUE_PREFETCH;
this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH;
this.topicPrefetch = DEFAULT_TOPIC_PREFETCH;
this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH;
this.optimizeDurableTopicPrefetch = DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH;
this.inputStreamPrefetch = DEFAULT_INPUT_STREAM_PREFETCH;
}
/**
@ -156,5 +164,18 @@ public class ActiveMQPrefetchPolicy implements Serializable {
public void setInputStreamPrefetch(int inputStreamPrefetch) {
this.inputStreamPrefetch = getMaxPrefetchLimit(inputStreamPrefetch);
}
public boolean equals(Object object){
if (object instanceof ActiveMQPrefetchPolicy){
ActiveMQPrefetchPolicy other = (ActiveMQPrefetchPolicy) object;
return this.queuePrefetch == other.queuePrefetch &&
this.queueBrowserPrefetch == other.queueBrowserPrefetch &&
this.topicPrefetch == other.topicPrefetch &&
this.durableTopicPrefetch == other.durableTopicPrefetch &&
this.optimizeDurableTopicPrefetch == other.optimizeDurableTopicPrefetch &&
this.inputStreamPrefetch == other.inputStreamPrefetch;
}
return false;
}
}

View File

@ -154,6 +154,9 @@ public abstract class AbstractSubscription implements Subscription {
public int getPrefetchSize() {
return info.getPrefetchSize();
}
public void setPrefetchSize(int newSize) {
info.setPrefetchSize(newSize);
}
public boolean isRecoveryRequired() {
return true;

View File

@ -22,6 +22,7 @@ import java.util.Set;
import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageDispatchNotification;
@ -47,11 +48,24 @@ public class QueueRegion extends AbstractRegion {
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)
throws JMSException {
ActiveMQDestination destination = info.getDestination();
PolicyEntry entry = null;
if (destination != null && broker.getDestinationPolicy() != null) {
entry = broker.getDestinationPolicy().getEntryFor(destination);
}
if (info.isBrowser()) {
return new QueueBrowserSubscription(broker,usageManager, context, info);
QueueBrowserSubscription sub = new QueueBrowserSubscription(broker,usageManager, context, info);
if (entry != null) {
entry.configure(broker, usageManager, sub);
}
return sub;
} else {
return new QueueSubscription(broker, usageManager,context, info);
QueueSubscription sub = new QueueSubscription(broker, usageManager,context, info);
if (entry != null) {
entry.configure(broker, usageManager, sub);
}
return sub;
}
}

View File

@ -16,11 +16,14 @@
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.QueueBrowserSubscription;
import org.apache.activemq.broker.region.QueueSubscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@ -75,6 +78,11 @@ public class PolicyEntry extends DestinationMapEntry {
private boolean advisoryForConsumed;
private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD;
private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE;
private int queuePrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH;
private int queueBrowserPrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH;
private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH;
private int durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH;
public void configure(Broker broker,Queue queue) {
baseConfiguration(queue);
@ -155,6 +163,11 @@ public class PolicyEntry extends DestinationMapEntry {
}
if (pendingSubscriberPolicy != null) {
String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId();
//override prefetch size if not set by the Consumer
int prefetch=subscription.getConsumerInfo().getPrefetchSize();
if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH){
subscription.getConsumerInfo().setPrefetchSize(getTopicPrefetch());
}
int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize();
subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize));
}
@ -164,6 +177,11 @@ public class PolicyEntry extends DestinationMapEntry {
String clientId = sub.getSubscriptionKey().getClientId();
String subName = sub.getSubscriptionKey().getSubscriptionName();
int prefetch = sub.getPrefetchSize();
//override prefetch size if not set by the Consumer
if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH){
sub.setPrefetchSize(getDurableTopicPrefetch());
}
if (pendingDurableSubscriberPolicy != null) {
PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(broker,clientId, subName,prefetch,sub);
cursor.setSystemUsage(memoryManager);
@ -172,6 +190,26 @@ public class PolicyEntry extends DestinationMapEntry {
sub.setMaxAuditDepth(getMaxAuditDepth());
sub.setMaxProducersToAudit(getMaxProducersToAudit());
}
public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription sub) {
int prefetch = sub.getPrefetchSize();
//override prefetch size if not set by the Consumer
if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH){
sub.setPrefetchSize(getQueueBrowserPrefetch());
}
}
public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) {
int prefetch = sub.getPrefetchSize();
//override prefetch size if not set by the Consumer
if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH){
sub.setPrefetchSize(getQueuePrefetch());
}
}
// Properties
// -------------------------------------------------------------------------
@ -559,5 +597,70 @@ public class PolicyEntry extends DestinationMapEntry {
return expireMessagesPeriod;
}
/**
* Get the queuePrefetch
* @return the queuePrefetch
*/
public int getQueuePrefetch() {
return this.queuePrefetch;
}
/**
* Set the queuePrefetch
* @param queuePrefetch the queuePrefetch to set
*/
public void setQueuePrefetch(int queuePrefetch) {
this.queuePrefetch = queuePrefetch;
}
/**
* Get the queueBrowserPrefetch
* @return the queueBrowserPrefetch
*/
public int getQueueBrowserPrefetch() {
return this.queueBrowserPrefetch;
}
/**
* Set the queueBrowserPrefetch
* @param queueBrowserPrefetch the queueBrowserPrefetch to set
*/
public void setQueueBrowserPrefetch(int queueBrowserPrefetch) {
this.queueBrowserPrefetch = queueBrowserPrefetch;
}
/**
* Get the topicPrefetch
* @return the topicPrefetch
*/
public int getTopicPrefetch() {
return this.topicPrefetch;
}
/**
* Set the topicPrefetch
* @param topicPrefetch the topicPrefetch to set
*/
public void setTopicPrefetch(int topicPrefetch) {
this.topicPrefetch = topicPrefetch;
}
/**
* Get the durableTopicPrefetch
* @return the durableTopicPrefetch
*/
public int getDurableTopicPrefetch() {
return this.durableTopicPrefetch;
}
/**
* Set the durableTopicPrefetch
* @param durableTopicPrefetch the durableTopicPrefetch to set
*/
public void setDurableTopicPrefetch(int durableTopicPrefetch) {
this.durableTopicPrefetch = durableTopicPrefetch;
}
}