From bed10aac1e4d8e1e6f5dc7817b63014525be299d Mon Sep 17 00:00:00 2001 From: James Strachan Date: Fri, 3 Mar 2006 10:34:33 +0000 Subject: [PATCH] allowed the maximum pending message count to be specified on the ActiveMQPrefetchPolicy or the ConsumerInfo git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@382753 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/ActiveMQMessageConsumer.java | 6 ++-- .../activemq/ActiveMQPrefetchPolicy.java | 15 ++++++++- .../apache/activemq/ActiveMQQueueBrowser.java | 4 ++- .../activemq/ActiveMQQueueReceiver.java | 4 +-- .../org/apache/activemq/ActiveMQSession.java | 31 ++++++++++++------- .../activemq/ActiveMQTopicSubscriber.java | 4 +-- .../broker/region/policy/PolicyEntry.java | 13 ++++++++ .../apache/activemq/command/ConsumerInfo.java | 16 ++++++++++ 8 files changed, 73 insertions(+), 20 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index a95b34b90c..c71f5b4bfc 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -116,18 +116,19 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC * Create a MessageConsumer * * @param session - * @param value * @param dest * @param name * @param selector * @param prefetch + * @param maximumPendingMessageCount TODO * @param noLocal * @param browser * @param dispatchAsync + * @param value * @throws JMSException */ public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, - String name, String selector, int prefetch, boolean noLocal, boolean browser, boolean dispatchAsync) + String name, String selector, int prefetch, int maximumPendingMessageCount, boolean noLocal, boolean browser, boolean dispatchAsync) throws JMSException { if (dest == null) { throw new InvalidDestinationException("Don't understand null destinations"); @@ -158,6 +159,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC this.info = new ConsumerInfo(consumerId); this.info.setSubcriptionName(name); this.info.setPrefetchSize(prefetch); + this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount); this.info.setNoLocal(noLocal); this.info.setDispatchAsync(dispatchAsync); this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer()); diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java index 67d9f8e232..dfd4af94b7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java @@ -33,7 +33,7 @@ public class ActiveMQPrefetchPolicy implements Serializable { private int topicPrefetch; private int durableTopicPrefetch; private int inputStreamPrefetch; - + private int maximumPendingMessageLimit; /** * Initialize default prefetch policies @@ -102,6 +102,19 @@ public class ActiveMQPrefetchPolicy implements Serializable { this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch); } + public int getMaximumPendingMessageLimit() { + return maximumPendingMessageLimit; + } + + /** + * Sets how many messages a broker will keep around, above the prefetch limit, for non-durable + * topics before starting to discard older messages. + */ + public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) { + this.maximumPendingMessageLimit = maximumPendingMessageLimit; + } + + private int getMaxPrefetchLimit(int value) { int result = Math.min(value, MAX_PREFETCH_SIZE); if (result < value) { diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java index f6e8b611c9..e0cd5a6ec1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java @@ -97,7 +97,9 @@ public class ActiveMQQueueBrowser implements */ private ActiveMQMessageConsumer createConsumer() throws JMSException { browseDone.set(false); - return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, session.connection.getPrefetchPolicy().getQueueBrowserPrefetch(), false, true, dispatchAsync) { + ActiveMQPrefetchPolicy prefetchPolicy = session.connection.getPrefetchPolicy(); + return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, prefetchPolicy.getQueueBrowserPrefetch(), + prefetchPolicy.getMaximumPendingMessageLimit(), false, true, dispatchAsync) { public void dispatch(MessageDispatch md) { if( md.getMessage()==null ) { browseDone.set(true); diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java index d6568baa6d..39295978e6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java @@ -67,9 +67,9 @@ public class ActiveMQQueueReceiver extends ActiveMQMessageConsumer implements * @throws JMSException */ protected ActiveMQQueueReceiver(ActiveMQSession theSession, - ConsumerId consumerId, ActiveMQDestination destination, String selector, int prefetch, boolean asyncDispatch) + ConsumerId consumerId, ActiveMQDestination destination, String selector, int prefetch, int maximumPendingMessageCount, boolean asyncDispatch) throws JMSException { - super(theSession, consumerId, destination, null, selector, prefetch, false, false, asyncDispatch); + super(theSession, consumerId, destination, null, selector, prefetch, maximumPendingMessageCount, false, false, asyncDispatch); } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index 2b6362f048..3de31748ff 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -802,14 +802,16 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta checkClosed(); int prefetch = 0; + ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); if (destination instanceof Topic) { - prefetch = connection.getPrefetchPolicy().getTopicPrefetch(); + prefetch = prefetchPolicy.getTopicPrefetch(); } else { - prefetch = connection.getPrefetchPolicy().getQueuePrefetch(); + prefetch = prefetchPolicy.getQueuePrefetch(); } - return new ActiveMQMessageConsumer(this, getNextConsumerId(), ActiveMQMessageTransformation - .transformDestination(destination), null, messageSelector, prefetch, false, false, asyncDispatch); + return new ActiveMQMessageConsumer(this, getNextConsumerId(), + ActiveMQMessageTransformation.transformDestination(destination), null, messageSelector, prefetch, + prefetchPolicy.getMaximumPendingMessageLimit(), false, false, asyncDispatch); } /** @@ -870,9 +872,10 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) throws JMSException { checkClosed(); - return new ActiveMQMessageConsumer(this, getNextConsumerId(), ActiveMQMessageTransformation - .transformDestination(destination), null, messageSelector, connection.getPrefetchPolicy() - .getTopicPrefetch(), NoLocal, false, asyncDispatch); + ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); + return new ActiveMQMessageConsumer(this, getNextConsumerId(), + ActiveMQMessageTransformation.transformDestination(destination), null, messageSelector, + prefetchPolicy.getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), NoLocal, false, asyncDispatch); } /** @@ -1033,9 +1036,10 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta throws JMSException { checkClosed(); connection.checkClientIDWasManuallySpecified(); + ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation - .transformDestination(topic), name, messageSelector, this.connection.getPrefetchPolicy() - .getDurableTopicPrefetch(), noLocal, false, asyncDispatch); + .transformDestination(topic), name, messageSelector, prefetchPolicy.getDurableTopicPrefetch(), + prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch); } /** @@ -1155,8 +1159,10 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta */ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { checkClosed(); + ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation - .transformDestination(queue), messageSelector, this.connection.getPrefetchPolicy().getQueuePrefetch(), asyncDispatch); + .transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(), + prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch); } /** @@ -1247,9 +1253,10 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta */ public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { checkClosed(); + ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation - .transformDestination(topic), null, messageSelector, this.connection.getPrefetchPolicy() - .getTopicPrefetch(), noLocal, false, asyncDispatch); + .transformDestination(topic), null, messageSelector, prefetchPolicy.getTopicPrefetch(), + prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch); } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java index 65d3819b42..6593e4b424 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java @@ -112,9 +112,9 @@ public class ActiveMQTopicSubscriber extends ActiveMQMessageConsumer implements * @throws JMSException */ protected ActiveMQTopicSubscriber(ActiveMQSession theSession, - ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch, + ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch, int maximumPendingMessageCount, boolean noLocalValue, boolean browserValue, boolean asyncDispatch) throws JMSException { - super(theSession, consumerId, dest, name, selector, prefetch, noLocalValue, browserValue, asyncDispatch); + super(theSession, consumerId, dest, name, selector, prefetch, maximumPendingMessageCount, noLocalValue, browserValue, asyncDispatch); } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index c7114f930d..83f32bd53a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -20,6 +20,8 @@ import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.filter.DestinationMapEntry; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * Represents an entry in a {@link PolicyMap} for assigning policies to a @@ -31,6 +33,8 @@ import org.apache.activemq.filter.DestinationMapEntry; */ public class PolicyEntry extends DestinationMapEntry { + private static final Log log = LogFactory.getLog(PolicyEntry.class); + private DispatchPolicy dispatchPolicy; private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; private boolean sendAdvisoryIfNoConsumers; @@ -64,7 +68,16 @@ public class PolicyEntry extends DestinationMapEntry { public void configure(TopicSubscription subscription) { if (pendingMessageLimitStrategy != null) { int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription); + int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit(); + if (consumerLimit > 0) { + if (value < 0 || consumerLimit < value) { + value = consumerLimit; + } + } if (value >= 0) { + if (log.isDebugEnabled()) { + log.debug("Setting the maximumPendingMessages size to: " + value + " for consumer: " + subscription.getInfo().getConsumerId()); + } subscription.setMaximumPendingMessages(value); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java index ba87040575..697dc69b38 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ConsumerInfo.java @@ -38,6 +38,7 @@ public class ConsumerInfo extends BaseCommand { protected ConsumerId consumerId; protected ActiveMQDestination destination; protected int prefetchSize; + protected int maximumPendingMessageLimit; protected boolean browser; protected boolean dispatchAsync; protected String selector; @@ -73,6 +74,7 @@ public class ConsumerInfo extends BaseCommand { info.consumerId = consumerId; info.destination = destination; info.prefetchSize = prefetchSize; + info.maximumPendingMessageLimit = maximumPendingMessageLimit; info.browser = browser; info.dispatchAsync = dispatchAsync; info.selector = selector; @@ -143,6 +145,20 @@ public class ConsumerInfo extends BaseCommand { this.prefetchSize = prefetchSize; } + /** + * How many messages a broker will keep around, above the prefetch limit, for non-durable + * topics before starting to discard older messages. + * + * @openwire:property version=1 + */ + public int getMaximumPendingMessageLimit() { + return maximumPendingMessageLimit; + } + + public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) { + this.maximumPendingMessageLimit = maximumPendingMessageLimit; + } + /** * Should the broker dispatch a message to the consumer async? If he does it async, then * he uses a more SEDA style of processing while if it is not done async, then he broker