Fix for CursorDurableTest.

The TopicStorePrefetch was iterating items that were in the subscription but not added to the pending list.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@491346 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-12-30 23:49:03 +00:00
parent 2a682e2fb2
commit 7695676339
2 changed files with 51 additions and 23 deletions

View File

@ -406,6 +406,8 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
pending.reset(); pending.reset();
while(pending.hasNext()&&!isFull()&&count<numberToDispatch){ while(pending.hasNext()&&!isFull()&&count<numberToDispatch){
MessageReference node=pending.next(); MessageReference node=pending.next();
if ( node == null )
break;
if(canDispatch(node)){ if(canDispatch(node)){
pending.remove(); pending.remove();

View File

@ -20,7 +20,7 @@ package org.apache.activemq.broker.region.cursors;
import java.io.IOException; import java.io.IOException;
import java.util.LinkedList; import java.util.LinkedList;
import javax.jms.JMSException;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.Topic;
@ -48,6 +48,10 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements
private String subscriberName; private String subscriberName;
private Destination regionDestination; private Destination regionDestination;
boolean empty=true;
private MessageId firstMessageId;
private MessageId lastMessageId;
/** /**
* @param topic * @param topic
* @param clientId * @param clientId
@ -73,7 +77,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements
* @return true if there are no pending messages * @return true if there are no pending messages
*/ */
public boolean isEmpty(){ public boolean isEmpty(){
return batchList.isEmpty(); return empty;
} }
public synchronized int size(){ public synchronized int size(){
@ -86,27 +90,55 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements
} }
public synchronized void addMessageLast(MessageReference node) throws Exception{ public synchronized void addMessageLast(MessageReference node) throws Exception{
if(node!=null){ if(node!=null){
if( empty ) {
firstMessageId = node.getMessageId();
empty=false;
}
lastMessageId = node.getMessageId();
node.decrementReferenceCount(); node.decrementReferenceCount();
} }
} }
public synchronized boolean hasNext(){ public synchronized boolean hasNext() {
if(isEmpty()){
try{
fillBatch();
}catch(Exception e){
log.error("Failed to fill batch",e);
throw new RuntimeException(e);
}
}
return !isEmpty(); return !isEmpty();
} }
public synchronized MessageReference next(){ public synchronized MessageReference next(){
Message result = (Message)batchList.removeFirst();
result.setRegionDestination(regionDestination); if( empty ) {
return result; return null;
} else {
// We may need to fill in the batch...
if(batchList.isEmpty()){
try{
fillBatch();
}catch(Exception e){
log.error("Failed to fill batch",e);
throw new RuntimeException(e);
}
if( batchList.isEmpty()) {
return null;
}
}
Message result = (Message)batchList.removeFirst();
if( firstMessageId != null ) {
// Skip messages until we get to the first message.
if( !result.getMessageId().equals(firstMessageId) )
return null;
firstMessageId = null;
}
if( lastMessageId != null ) {
if( result.getMessageId().equals(lastMessageId) ) {
empty=true;
}
}
result.setRegionDestination(regionDestination);
return result;
}
} }
public void reset(){ public void reset(){
@ -130,13 +162,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements
// implementation // implementation
protected void fillBatch() throws Exception{ protected void fillBatch() throws Exception{
store.recoverNextMessages(clientId,subscriberName, store.recoverNextMessages(clientId,subscriberName,maxBatchSize,this);
maxBatchSize,this);
// this will add more messages to the batch list
if(!batchList.isEmpty()){
Message message=(Message)batchList.getLast();
}
} }
public void gc() { public void gc() {