From cc6fec68f41d7eb796376d635d9d06b136412735 Mon Sep 17 00:00:00 2001 From: James Strachan Date: Fri, 7 Apr 2006 17:58:17 +0000 Subject: [PATCH] added a hook to eagerly evict expired messages on non-durable topics first before we apply other eviction policies such as old messages etc git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@392349 13f79535-47bb-0310-9956-ffa450edef68 --- .../region/IndirectMessageReference.java | 16 ++++++++ .../broker/region/MessageReference.java | 6 +++ .../broker/region/TopicSubscription.java | 34 ++++++++++++++-- .../policy/MessageEvictionStrategy.java | 5 +++ .../MessageEvictionStrategySupport.java | 40 +++++++++++++++++++ .../policy/OldestMessageEvictionStrategy.java | 2 +- ...ageWithLowestPriorityEvictionStrategy.java | 2 +- 7 files changed, 99 insertions(+), 6 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java index 39b41209e2..adad07e524 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java @@ -59,6 +59,8 @@ public class IndirectMessageReference implements MessageReference { private int referenceCount; /** the size of the message **/ private int cachedSize = 0; + /** the expiration time of the message */ + private long expiration; /** * Only used by the END_OF_BROWSE_MARKER singleton @@ -71,6 +73,7 @@ public class IndirectMessageReference implements MessageReference { this.groupID = null; this.groupSequence = 0; this.targetConsumerId=null; + this.expiration = message.getExpiration(); this.cachedSize = message != null ? message.getSize() : 0; } @@ -82,6 +85,7 @@ public class IndirectMessageReference implements MessageReference { this.groupID = message.getGroupID(); this.groupSequence = message.getGroupSequence(); this.targetConsumerId=message.getTargetConsumerId(); + this.expiration = message.getExpiration(); this.referenceCount=1; message.incrementReferenceCount(); @@ -207,6 +211,18 @@ public class IndirectMessageReference implements MessageReference { return targetConsumerId; } + public long getExpiration() { + return expiration; + } + + public boolean isExpired() { + long expireTime = getExpiration(); + if (expireTime > 0 && System.currentTimeMillis() > expireTime) { + return true; + } + return false; + } + public int getSize(){ Message msg = message; if (msg != null){ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java index b8c48f397d..7a24ae4d3e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java @@ -47,5 +47,11 @@ public interface MessageReference { public int decrementReferenceCount(); public ConsumerId getTargetConsumerId(); public int getSize(); + public long getExpiration(); + + /** + * Returns true if this message is expired + */ + public boolean isExpired(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index ffd3511807..7c6b5b5913 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -72,13 +72,23 @@ public class TopicSubscription extends AbstractSubscription{ synchronized(matchedListMutex){ matched.addLast(node); // NOTE - be careful about the slaveBroker! - if(maximumPendingMessages>0){ + if (maximumPendingMessages > 0) { + + // calculate the high water mark from which point we will eagerly evict expired messages + int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark(); + if (maximumPendingMessages > 0 && maximumPendingMessages < max) { + max = maximumPendingMessages; + } + if (!matched.isEmpty() && matched.size() > max) { + removeExpiredMessages(matched); + } + // lets discard old messages as we are a slow consumer - while(!matched.isEmpty()&&matched.size()>maximumPendingMessages){ - MessageReference oldMessage=messageEvictionStrategy.evictMessage(matched); + while (!matched.isEmpty() && matched.size() > maximumPendingMessages) { + MessageReference oldMessage = messageEvictionStrategy.evictMessage(matched); oldMessage.decrementReferenceCount(); discarded++; - if (log.isDebugEnabled()){ + if (log.isDebugEnabled()) { log.debug("Discarding message " + oldMessage); } } @@ -88,6 +98,22 @@ public class TopicSubscription extends AbstractSubscription{ } } + /** + * Discard any expired messages from the matched list. Called from a synchronized block. + * @throws IOException + */ + protected void removeExpiredMessages(LinkedList messages) throws IOException { + for(Iterator i=matched.iterator();i.hasNext();){ + MessageReference node=(MessageReference) i.next(); + if (node.isExpired()) { + i.remove(); + dispatched.incrementAndGet(); + node.decrementReferenceCount(); + break; + } + } + } + public void processMessageDispatchNotification(MessageDispatchNotification mdn){ synchronized(matchedListMutex){ for(Iterator i=matched.iterator();i.hasNext();){ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java index dae96c6f4e..030577a388 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java @@ -37,4 +37,9 @@ public interface MessageEvictionStrategy { */ MessageReference evictMessage(LinkedList messages) throws IOException; + /** + * REturns the high water mark on which we will eagerly evict expired messages from RAM + */ + int getEvictExpiredMessagesHighWatermark(); + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java new file mode 100644 index 0000000000..6e24572f52 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java @@ -0,0 +1,40 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.policy; + +/** + * A useful base class for implementation inheritence. + * + * @version $Revision$ + */ +public abstract class MessageEvictionStrategySupport implements MessageEvictionStrategy { + + private int evictExpiredMessagesHighWatermark = 1000; + + public int getEvictExpiredMessagesHighWatermark() { + return evictExpiredMessagesHighWatermark; + } + + /** + * Sets the high water mark on which we will eagerly evict expired messages from RAM + */ + public void setEvictExpiredMessagesHighWatermark(int evictExpiredMessagesHighWaterMark) { + this.evictExpiredMessagesHighWatermark = evictExpiredMessagesHighWaterMark; + } + + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageEvictionStrategy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageEvictionStrategy.java index a4ef1db2c2..791666b018 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageEvictionStrategy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageEvictionStrategy.java @@ -28,7 +28,7 @@ import java.util.LinkedList; * * @version $Revision$ */ -public class OldestMessageEvictionStrategy implements MessageEvictionStrategy { +public class OldestMessageEvictionStrategy extends MessageEvictionStrategySupport { public MessageReference evictMessage(LinkedList messages) { return (MessageReference) messages.removeFirst(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageWithLowestPriorityEvictionStrategy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageWithLowestPriorityEvictionStrategy.java index 6fad3212ce..73fda4a82a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageWithLowestPriorityEvictionStrategy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/OldestMessageWithLowestPriorityEvictionStrategy.java @@ -29,7 +29,7 @@ import java.util.LinkedList; * * @version $Revision$messageEvictionStrategy */ -public class OldestMessageWithLowestPriorityEvictionStrategy implements MessageEvictionStrategy { +public class OldestMessageWithLowestPriorityEvictionStrategy extends MessageEvictionStrategySupport { public MessageReference evictMessage(LinkedList messages) throws IOException { byte lowestPriority = Byte.MAX_VALUE;