mirror of https://github.com/apache/activemq.git
added patch submitted by Mathew Kuppe for AMQ-871 to allow the eviction strategies to decide to evict multiple messages
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@430023 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
75952abdbb
commit
171e3223c1
|
@ -94,11 +94,21 @@ public class TopicSubscription extends AbstractSubscription{
|
|||
|
||||
// lets discard old messages as we are a slow consumer
|
||||
while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
|
||||
MessageReference oldMessage = messageEvictionStrategy.evictMessage(matched);
|
||||
oldMessage.decrementReferenceCount();
|
||||
discarded++;
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Discarding message " + oldMessage);
|
||||
MessageReference[] oldMessages = messageEvictionStrategy.evictMessages(matched);
|
||||
int messagesToEvict = oldMessages.length;
|
||||
for(int i = 0; i < messagesToEvict; i++) {
|
||||
oldMessages[i].decrementReferenceCount();
|
||||
discarded++;
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Discarding message " + oldMessages[i]);
|
||||
}
|
||||
}
|
||||
|
||||
// lets avoid an infinite loop if we are given a bad eviction strategy
|
||||
// for a bad strategy lets just not evict
|
||||
if (messagesToEvict == 0) {
|
||||
log.warn("No messages to evict returned from eviction strategy: " + messageEvictionStrategy);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ public interface MessageEvictionStrategy {
|
|||
* @throws IOException if an exception occurs such as reading a message content (but should not ever happen
|
||||
* as usually all the messages will be in RAM when this method is called).
|
||||
*/
|
||||
MessageReference evictMessage(LinkedList messages) throws IOException;
|
||||
MessageReference[] evictMessages(LinkedList messages) throws IOException;
|
||||
|
||||
/**
|
||||
* REturns the high water mark on which we will eagerly evict expired messages from RAM
|
||||
|
|
|
@ -31,7 +31,7 @@ import java.util.LinkedList;
|
|||
*/
|
||||
public class OldestMessageEvictionStrategy extends MessageEvictionStrategySupport {
|
||||
|
||||
public MessageReference evictMessage(LinkedList messages) {
|
||||
return (MessageReference) messages.removeFirst();
|
||||
public MessageReference[] evictMessages(LinkedList messages) {
|
||||
return new MessageReference[] {(MessageReference) messages.removeFirst()};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ import java.util.LinkedList;
|
|||
*/
|
||||
public class OldestMessageWithLowestPriorityEvictionStrategy extends MessageEvictionStrategySupport {
|
||||
|
||||
public MessageReference evictMessage(LinkedList messages) throws IOException {
|
||||
public MessageReference[] evictMessages(LinkedList messages) throws IOException {
|
||||
byte lowestPriority = Byte.MAX_VALUE;
|
||||
int pivot = 0;
|
||||
Iterator iter = messages.iterator();
|
||||
|
@ -44,6 +44,6 @@ public class OldestMessageWithLowestPriorityEvictionStrategy extends MessageEvic
|
|||
pivot = i;
|
||||
}
|
||||
}
|
||||
return (MessageReference) messages.remove(pivot);
|
||||
return new MessageReference[] {(MessageReference) messages.remove(pivot)};
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue