allowed the maximum pending message count to be specified on the ActiveMQPrefetchPolicy or the ConsumerInfo

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@382753 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-03-03 10:34:33 +00:00
parent 6d003a89e2
commit bed10aac1e
8 changed files with 73 additions and 20 deletions

View File

@ -116,18 +116,19 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
* Create a MessageConsumer * Create a MessageConsumer
* *
* @param session * @param session
* @param value
* @param dest * @param dest
* @param name * @param name
* @param selector * @param selector
* @param prefetch * @param prefetch
* @param maximumPendingMessageCount TODO
* @param noLocal * @param noLocal
* @param browser * @param browser
* @param dispatchAsync * @param dispatchAsync
* @param value
* @throws JMSException * @throws JMSException
*/ */
public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
String name, String selector, int prefetch, boolean noLocal, boolean browser, boolean dispatchAsync) String name, String selector, int prefetch, int maximumPendingMessageCount, boolean noLocal, boolean browser, boolean dispatchAsync)
throws JMSException { throws JMSException {
if (dest == null) { if (dest == null) {
throw new InvalidDestinationException("Don't understand null destinations"); throw new InvalidDestinationException("Don't understand null destinations");
@ -158,6 +159,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
this.info = new ConsumerInfo(consumerId); this.info = new ConsumerInfo(consumerId);
this.info.setSubcriptionName(name); this.info.setSubcriptionName(name);
this.info.setPrefetchSize(prefetch); this.info.setPrefetchSize(prefetch);
this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
this.info.setNoLocal(noLocal); this.info.setNoLocal(noLocal);
this.info.setDispatchAsync(dispatchAsync); this.info.setDispatchAsync(dispatchAsync);
this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer()); this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());

View File

@ -33,7 +33,7 @@ public class ActiveMQPrefetchPolicy implements Serializable {
private int topicPrefetch; private int topicPrefetch;
private int durableTopicPrefetch; private int durableTopicPrefetch;
private int inputStreamPrefetch; private int inputStreamPrefetch;
private int maximumPendingMessageLimit;
/** /**
* Initialize default prefetch policies * Initialize default prefetch policies
@ -102,6 +102,19 @@ public class ActiveMQPrefetchPolicy implements Serializable {
this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch); this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch);
} }
public int getMaximumPendingMessageLimit() {
return maximumPendingMessageLimit;
}
/**
* Sets how many messages a broker will keep around, above the prefetch limit, for non-durable
* topics before starting to discard older messages.
*/
public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) {
this.maximumPendingMessageLimit = maximumPendingMessageLimit;
}
private int getMaxPrefetchLimit(int value) { private int getMaxPrefetchLimit(int value) {
int result = Math.min(value, MAX_PREFETCH_SIZE); int result = Math.min(value, MAX_PREFETCH_SIZE);
if (result < value) { if (result < value) {

View File

@ -97,7 +97,9 @@ public class ActiveMQQueueBrowser implements
*/ */
private ActiveMQMessageConsumer createConsumer() throws JMSException { private ActiveMQMessageConsumer createConsumer() throws JMSException {
browseDone.set(false); browseDone.set(false);
return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, session.connection.getPrefetchPolicy().getQueueBrowserPrefetch(), false, true, dispatchAsync) { ActiveMQPrefetchPolicy prefetchPolicy = session.connection.getPrefetchPolicy();
return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, prefetchPolicy.getQueueBrowserPrefetch(),
prefetchPolicy.getMaximumPendingMessageLimit(), false, true, dispatchAsync) {
public void dispatch(MessageDispatch md) { public void dispatch(MessageDispatch md) {
if( md.getMessage()==null ) { if( md.getMessage()==null ) {
browseDone.set(true); browseDone.set(true);

View File

@ -67,9 +67,9 @@ public class ActiveMQQueueReceiver extends ActiveMQMessageConsumer implements
* @throws JMSException * @throws JMSException
*/ */
protected ActiveMQQueueReceiver(ActiveMQSession theSession, protected ActiveMQQueueReceiver(ActiveMQSession theSession,
ConsumerId consumerId, ActiveMQDestination destination, String selector, int prefetch, boolean asyncDispatch) ConsumerId consumerId, ActiveMQDestination destination, String selector, int prefetch, int maximumPendingMessageCount, boolean asyncDispatch)
throws JMSException { throws JMSException {
super(theSession, consumerId, destination, null, selector, prefetch, false, false, asyncDispatch); super(theSession, consumerId, destination, null, selector, prefetch, maximumPendingMessageCount, false, false, asyncDispatch);
} }
/** /**

View File

@ -802,14 +802,16 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
checkClosed(); checkClosed();
int prefetch = 0; int prefetch = 0;
ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
if (destination instanceof Topic) { if (destination instanceof Topic) {
prefetch = connection.getPrefetchPolicy().getTopicPrefetch(); prefetch = prefetchPolicy.getTopicPrefetch();
} else { } else {
prefetch = connection.getPrefetchPolicy().getQueuePrefetch(); prefetch = prefetchPolicy.getQueuePrefetch();
} }
return new ActiveMQMessageConsumer(this, getNextConsumerId(), ActiveMQMessageTransformation return new ActiveMQMessageConsumer(this, getNextConsumerId(),
.transformDestination(destination), null, messageSelector, prefetch, false, false, asyncDispatch); ActiveMQMessageTransformation.transformDestination(destination), null, messageSelector, prefetch,
prefetchPolicy.getMaximumPendingMessageLimit(), false, false, asyncDispatch);
} }
/** /**
@ -870,9 +872,10 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal)
throws JMSException { throws JMSException {
checkClosed(); checkClosed();
return new ActiveMQMessageConsumer(this, getNextConsumerId(), ActiveMQMessageTransformation ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
.transformDestination(destination), null, messageSelector, connection.getPrefetchPolicy() return new ActiveMQMessageConsumer(this, getNextConsumerId(),
.getTopicPrefetch(), NoLocal, false, asyncDispatch); ActiveMQMessageTransformation.transformDestination(destination), null, messageSelector,
prefetchPolicy.getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), NoLocal, false, asyncDispatch);
} }
/** /**
@ -1033,9 +1036,10 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
throws JMSException { throws JMSException {
checkClosed(); checkClosed();
connection.checkClientIDWasManuallySpecified(); connection.checkClientIDWasManuallySpecified();
ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation
.transformDestination(topic), name, messageSelector, this.connection.getPrefetchPolicy() .transformDestination(topic), name, messageSelector, prefetchPolicy.getDurableTopicPrefetch(),
.getDurableTopicPrefetch(), noLocal, false, asyncDispatch); prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
} }
/** /**
@ -1155,8 +1159,10 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
*/ */
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
checkClosed(); checkClosed();
ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation
.transformDestination(queue), messageSelector, this.connection.getPrefetchPolicy().getQueuePrefetch(), asyncDispatch); .transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),
prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
} }
/** /**
@ -1247,9 +1253,10 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
*/ */
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
checkClosed(); checkClosed();
ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation
.transformDestination(topic), null, messageSelector, this.connection.getPrefetchPolicy() .transformDestination(topic), null, messageSelector, prefetchPolicy.getTopicPrefetch(),
.getTopicPrefetch(), noLocal, false, asyncDispatch); prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
} }
/** /**

View File

@ -112,9 +112,9 @@ public class ActiveMQTopicSubscriber extends ActiveMQMessageConsumer implements
* @throws JMSException * @throws JMSException
*/ */
protected ActiveMQTopicSubscriber(ActiveMQSession theSession, protected ActiveMQTopicSubscriber(ActiveMQSession theSession,
ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch, ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch, int maximumPendingMessageCount,
boolean noLocalValue, boolean browserValue, boolean asyncDispatch) throws JMSException { boolean noLocalValue, boolean browserValue, boolean asyncDispatch) throws JMSException {
super(theSession, consumerId, dest, name, selector, prefetch, noLocalValue, browserValue, asyncDispatch); super(theSession, consumerId, dest, name, selector, prefetch, maximumPendingMessageCount, noLocalValue, browserValue, asyncDispatch);
} }
/** /**

View File

@ -20,6 +20,8 @@ import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.filter.DestinationMapEntry; import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/** /**
* Represents an entry in a {@link PolicyMap} for assigning policies to a * Represents an entry in a {@link PolicyMap} for assigning policies to a
@ -31,6 +33,8 @@ import org.apache.activemq.filter.DestinationMapEntry;
*/ */
public class PolicyEntry extends DestinationMapEntry { public class PolicyEntry extends DestinationMapEntry {
private static final Log log = LogFactory.getLog(PolicyEntry.class);
private DispatchPolicy dispatchPolicy; private DispatchPolicy dispatchPolicy;
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private boolean sendAdvisoryIfNoConsumers; private boolean sendAdvisoryIfNoConsumers;
@ -64,7 +68,16 @@ public class PolicyEntry extends DestinationMapEntry {
public void configure(TopicSubscription subscription) { public void configure(TopicSubscription subscription) {
if (pendingMessageLimitStrategy != null) { if (pendingMessageLimitStrategy != null) {
int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription); int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit();
if (consumerLimit > 0) {
if (value < 0 || consumerLimit < value) {
value = consumerLimit;
}
}
if (value >= 0) { if (value >= 0) {
if (log.isDebugEnabled()) {
log.debug("Setting the maximumPendingMessages size to: " + value + " for consumer: " + subscription.getInfo().getConsumerId());
}
subscription.setMaximumPendingMessages(value); subscription.setMaximumPendingMessages(value);
} }
} }

View File

@ -38,6 +38,7 @@ public class ConsumerInfo extends BaseCommand {
protected ConsumerId consumerId; protected ConsumerId consumerId;
protected ActiveMQDestination destination; protected ActiveMQDestination destination;
protected int prefetchSize; protected int prefetchSize;
protected int maximumPendingMessageLimit;
protected boolean browser; protected boolean browser;
protected boolean dispatchAsync; protected boolean dispatchAsync;
protected String selector; protected String selector;
@ -73,6 +74,7 @@ public class ConsumerInfo extends BaseCommand {
info.consumerId = consumerId; info.consumerId = consumerId;
info.destination = destination; info.destination = destination;
info.prefetchSize = prefetchSize; info.prefetchSize = prefetchSize;
info.maximumPendingMessageLimit = maximumPendingMessageLimit;
info.browser = browser; info.browser = browser;
info.dispatchAsync = dispatchAsync; info.dispatchAsync = dispatchAsync;
info.selector = selector; info.selector = selector;
@ -143,6 +145,20 @@ public class ConsumerInfo extends BaseCommand {
this.prefetchSize = prefetchSize; this.prefetchSize = prefetchSize;
} }
/**
* How many messages a broker will keep around, above the prefetch limit, for non-durable
* topics before starting to discard older messages.
*
* @openwire:property version=1
*/
public int getMaximumPendingMessageLimit() {
return maximumPendingMessageLimit;
}
public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) {
this.maximumPendingMessageLimit = maximumPendingMessageLimit;
}
/** /**
* Should the broker dispatch a message to the consumer async? If he does it async, then * Should the broker dispatch a message to the consumer async? If he does it async, then
* he uses a more SEDA style of processing while if it is not done async, then he broker * he uses a more SEDA style of processing while if it is not done async, then he broker