diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java index 53ed8a65ed..5034b1046a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java @@ -122,4 +122,8 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor{ public void release(){ } + + public boolean hasMessagesBufferedToDeliver() { + return false; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index 01387f869e..057adab940 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -213,6 +213,10 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple // we always have space - as we can persist to disk return false; } + + public boolean hasMessagesBufferedToDeliver() { + return !isEmpty(); + } public void setUsageManager(UsageManager usageManager){ super.setUsageManager(usageManager); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java index 5acf4afcbd..357d92f470 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java @@ -166,4 +166,9 @@ public interface PendingMessageCursor extends Service{ * @return true if the cursor is full */ public boolean isFull(); + + /** + * @return true if the cursor has buffered messages ready to deliver + */ + public boolean hasMessagesBufferedToDeliver(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java index 1232ff061f..42233295df 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java @@ -24,6 +24,7 @@ import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; import org.apache.commons.logging.Log; @@ -43,6 +44,7 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements private MessageStore store; private final LinkedList batchList=new LinkedList(); private Destination regionDestination; + private int size = 0; /** * @param topic @@ -68,26 +70,48 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements * @return true if there are no pending messages */ public boolean isEmpty(){ - return batchList.isEmpty(); + return size <= 0; + } + + public boolean hasMessagesBufferedToDeliver() { + return !batchList.isEmpty(); } public synchronized int size(){ try { - return store.getMessageCount(); + size = store.getMessageCount(); }catch(IOException e) { log.error("Failed to get message count",e); throw new RuntimeException(e); } + return size; } public synchronized void addMessageLast(MessageReference node) throws Exception{ if(node!=null){ node.decrementReferenceCount(); } + size++; + } + + public void addMessageFirst(MessageReference node) throws Exception{ + if(node!=null){ + node.decrementReferenceCount(); + } + size++; + } + + public void remove(){ + size--; } + public void remove(MessageReference node){ + size--; + } + + public synchronized boolean hasNext(){ - if(isEmpty()){ + if(batchList.isEmpty()){ try{ fillBatch(); }catch(Exception e){ @@ -95,7 +119,7 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements throw new RuntimeException(e); } } - return !isEmpty(); + return !batchList.isEmpty(); } public synchronized MessageReference next(){ @@ -117,10 +141,15 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements batchList.addLast(message); } - public void recoverMessageReference(String messageReference) - throws Exception{ - // shouldn't get called - throw new RuntimeException("Not supported"); + public void recoverMessageReference(String messageReference) throws Exception{ + Message msg=store.getMessage(new MessageId(messageReference)); + if(msg!=null){ + recoverMessage(msg); + }else{ + String err = "Failed to retrieve message for id: "+messageReference; + log.error(err); + throw new IOException(err); + } } public void gc() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java index bef30a39a0..bdf71b9cbf 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java @@ -37,6 +37,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{ private QueueStorePrefetch persistent; private boolean started; private PendingMessageCursor currentCursor; + /** * Construct @@ -48,6 +49,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{ this.queue=queue; this.tmpStore=tmpStore; this.persistent=new QueueStorePrefetch(queue); + currentCursor = persistent; } public synchronized void start() throws Exception{ @@ -134,7 +136,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{ pendingCount--; } - public void remove(MessageReference node){ + public synchronized void remove(MessageReference node){ if (!node.isPersistent()) { nonPersistent.remove(node); }else { @@ -145,6 +147,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{ public synchronized void reset(){ nonPersistent.reset(); + persistent.reset(); } public int size(){ @@ -208,8 +211,12 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor{ } protected synchronized PendingMessageCursor getNextCursor() throws Exception{ - if(currentCursor==null||currentCursor.isEmpty()){ + if(currentCursor == null || !currentCursor.hasMessagesBufferedToDeliver()){ currentCursor=currentCursor==persistent?nonPersistent:persistent; + //sanity check + if (currentCursor.isEmpty()) { + currentCursor=currentCursor==persistent?nonPersistent:persistent; + } } return currentCursor; }