diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java index 9b3fc371b4..4dc2f81559 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java @@ -115,4 +115,16 @@ abstract public class AbstractSubscription implements Subscription { public boolean isSlaveBroker(){ return broker.isSlaveBroker(); } + + public ConnectionContext getContext() { + return context; + } + + public ConsumerInfo getInfo() { + return info; + } + + public BooleanExpression getSelector() { + return selector; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java index 9b41f123e6..3213566783 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java @@ -204,7 +204,17 @@ public class TopicRegion extends AbstractRegion { return sub; } else { - return new TopicSubscription(broker,context, info, memoryManager); + TopicSubscription answer = new TopicSubscription(broker,context, info, memoryManager); + + // lets configure the subscription depending on the destination + ActiveMQDestination destination = info.getDestination(); + if (destination != null && policyMap != null) { + PolicyEntry entry = policyMap.getEntryFor(destination); + if (entry != null) { + entry.configure(answer); + } + } + return answer; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 45a60804d0..2d8393ee93 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -42,7 +42,7 @@ public class TopicSubscription extends AbstractSubscription { final protected UsageManager usageManager; protected int dispatched=0; protected int delivered=0; - private int maximumPendingMessages = 0; + private int maximumPendingMessages = -1; public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, UsageManager usageManager) throws InvalidSelectorException { super(broker,context, info); @@ -56,15 +56,17 @@ public class TopicSubscription extends AbstractSubscription { // have not been dispatched (i.e. we allow the prefetch buffer to be filled) dispatch(node); } else { - synchronized (matched) { - matched.addLast(node); - - // NOTE - be careful about the slaveBroker! - if (maximumPendingMessages > 0) { - // lets discard old messages as we are a slow consumer - while (matched.size() > maximumPendingMessages) { - MessageReference oldMessage = (MessageReference) matched.removeFirst(); - oldMessage.decrementReferenceCount(); + if (maximumPendingMessages != 0) { + synchronized (matched) { + matched.addLast(node); + + // NOTE - be careful about the slaveBroker! + if (maximumPendingMessages > 0) { + // lets discard old messages as we are a slow consumer + while (matched.size() > maximumPendingMessages) { + MessageReference oldMessage = (MessageReference) matched.removeFirst(); + oldMessage.decrementReferenceCount(); + } } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/ConstantPendingMessageLimitStrategy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/ConstantPendingMessageLimitStrategy.java new file mode 100644 index 0000000000..585d4c48ae --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/ConstantPendingMessageLimitStrategy.java @@ -0,0 +1,43 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.policy; + +import org.apache.activemq.broker.region.TopicSubscription; + +/** + * This PendingMessageLimitStrategy is configured to a constant value for all subscriptions. + * + * @org.apache.xbean.XBean + * + * @version $Revision$ + */ +public class ConstantPendingMessageLimitStrategy implements PendingMessageLimitStrategy { + + private int limit = -1; + + public int getMaximumPendingMessageLimit(TopicSubscription subscription) { + return limit; + } + + public int getLimit() { + return limit; + } + + public void setLimit(int limit) { + this.limit = limit; + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingMessageLimitStrategy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingMessageLimitStrategy.java new file mode 100644 index 0000000000..9fe1ba014f --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingMessageLimitStrategy.java @@ -0,0 +1,41 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.policy; + +import org.apache.activemq.broker.region.TopicSubscription; + +/** + * A pluggable strategy to calculate the maximum number of messages that are allowed to be pending on + * consumers (in addition to their prefetch sizes). + * + * Once the limit is reached, non-durable topics can then start discarding old messages. + * This allows us to keep dispatching messages to slow consumers while not blocking fast consumers + * and discarding the messages oldest first. + * + * @version $Revision$ + */ +public interface PendingMessageLimitStrategy { + + /** + * Calculate the maximum number of pending messages (in excess of the prefetch size) + * for the given subscription + * + * @return the maximum or -1 if there is no maximum + */ + int getMaximumPendingMessageLimit(TopicSubscription subscription); + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 3450f5acde..c7114f930d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.policy; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.filter.DestinationMapEntry; /** @@ -35,6 +36,7 @@ public class PolicyEntry extends DestinationMapEntry { private boolean sendAdvisoryIfNoConsumers; private DeadLetterStrategy deadLetterStrategy; private int messageGroupHashBucketCount = 1024; + private PendingMessageLimitStrategy pendingMessageLimitStrategy; public void configure(Queue queue) { if (dispatchPolicy != null) { @@ -59,6 +61,15 @@ public class PolicyEntry extends DestinationMapEntry { topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers); } + public void configure(TopicSubscription subscription) { + if (pendingMessageLimitStrategy != null) { + int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription); + if (value >= 0) { + subscription.setMaximumPendingMessages(value); + } + } + } + // Properties // ------------------------------------------------------------------------- public DispatchPolicy getDispatchPolicy() { @@ -94,7 +105,8 @@ public class PolicyEntry extends DestinationMapEntry { } /** - * Sets the policy used to determine which dead letter queue destination should be used + * Sets the policy used to determine which dead letter queue destination + * should be used */ public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { this.deadLetterStrategy = deadLetterStrategy; @@ -105,14 +117,30 @@ public class PolicyEntry extends DestinationMapEntry { } /** - * Sets the number of hash buckets to use for the message group functionality. - * This is only applicable to using message groups to parallelize processing of a queue - * while preserving order across an individual JMSXGroupID header value. - * This value sets the number of hash buckets that will be used (i.e. the maximum possible concurrency). + * Sets the number of hash buckets to use for the message group + * functionality. This is only applicable to using message groups to + * parallelize processing of a queue while preserving order across an + * individual JMSXGroupID header value. This value sets the number of hash + * buckets that will be used (i.e. the maximum possible concurrency). */ public void setMessageGroupHashBucketCount(int messageGroupHashBucketCount) { this.messageGroupHashBucketCount = messageGroupHashBucketCount; } - - + + public PendingMessageLimitStrategy getPendingMessageLimitStrategy() { + return pendingMessageLimitStrategy; + } + + /** + * Sets the strategy to calculate the maximum number of messages that are + * allowed to be pending on consumers (in addition to their prefetch sizes). + * + * Once the limit is reached, non-durable topics can then start discarding + * old messages. This allows us to keep dispatching messages to slow + * consumers while not blocking fast consumers and discarding the messages + * oldest first. + */ + public void setPendingMessageLimitStrategy(PendingMessageLimitStrategy pendingMessageLimitStrategy) { + this.pendingMessageLimitStrategy = pendingMessageLimitStrategy; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PrefetchRatePendingMessageLimitStrategy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PrefetchRatePendingMessageLimitStrategy.java new file mode 100644 index 0000000000..73babcb9d2 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PrefetchRatePendingMessageLimitStrategy.java @@ -0,0 +1,50 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.policy; + +import org.apache.activemq.broker.region.TopicSubscription; + +/** + * This PendingMessageLimitStrategy sets the maximum pending message limit value to be + * a multiplier of the prefetch limit of the subscription. + * + * @org.apache.xbean.XBean + * + * @version $Revision$ + */ +public class PrefetchRatePendingMessageLimitStrategy implements PendingMessageLimitStrategy { + + private double multiplier = 0.5; + + public int getMaximumPendingMessageLimit(TopicSubscription subscription) { + int prefetchSize = subscription.getConsumerInfo().getPrefetchSize(); + return (int) (prefetchSize * multiplier); + } + + public double getMultiplier() { + return multiplier; + } + + /** + * Sets the multiplier of the prefetch size which will be used to define the maximum number of pending + * messages for non-durable topics before messages are discarded. + */ + public void setMultiplier(double rate) { + this.multiplier = rate; + } + +}