diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java index 14e8f4120c..983d618ebf 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java @@ -246,15 +246,7 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor { return false; } - /** - * Mark a message as already dispatched - * @param message - */ - public void dispatched(MessageReference message) { - //add it to the audit - isDuplicate(message.getMessageId()); - } - + /** * set the audit * @param audit diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 9a2576dee6..85c20c5b0f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -37,11 +37,11 @@ import org.apache.commons.logging.LogFactory; public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener, UsageListener { private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class); protected final Destination regionDestination; - protected final LinkedHashMap batchList = new LinkedHashMap (); + private final LinkedHashMap batchList = new LinkedHashMap (); + private Iterator> iterator = null; protected boolean cacheEnabled=false; protected boolean batchResetNeeded = true; protected boolean storeHasMessages = false; - protected Iterator> iterator = null; protected int size; protected AbstractStoreCursor(Destination destination) { @@ -84,6 +84,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } message.incrementReferenceCount(); batchList.put(message.getMessageId(), message); + clearIterator(true); } else { if (LOG.isDebugEnabled()) { LOG.debug("Ignoring batched duplicated from store: " + message); @@ -102,11 +103,25 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i throw new RuntimeException(e); } } - this.iterator = this.batchList.entrySet().iterator(); + clearIterator(true); } - public void release() { + public synchronized void release() { + clearIterator(false); + } + + private synchronized void clearIterator(boolean ensureIterator) { + boolean haveIterator = this.iterator != null; this.iterator=null; + if(haveIterator&&ensureIterator) { + ensureIterator(); + } + } + + private synchronized void ensureIterator() { + if(this.iterator==null) { + this.iterator=this.batchList.entrySet().iterator(); + } } @@ -117,16 +132,12 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i if (batchList.isEmpty()) { try { fillBatch(); - this.iterator = this.batchList.entrySet().iterator(); } catch (Exception e) { LOG.error("Failed to fill batch", e); throw new RuntimeException(e); } - }else { - if (this.iterator==null) { - this.iterator=this.batchList.entrySet().iterator(); - } } + ensureIterator(); return this.iterator.hasNext(); } @@ -192,6 +203,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i msg.decrementReferenceCount(); } batchList.clear(); + clearIterator(false); batchResetNeeded = true; this.cacheEnabled=false; if (isStarted()) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java index aa0304007a..22c12ed05c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java @@ -255,11 +255,6 @@ public interface PendingMessageCursor extends Service { */ public boolean isTransient(); - /** - * Mark a message as already dispatched - * @param message - */ - public void dispatched(MessageReference message); /** * set the audit diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index b7735440f6..c84d826da6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -300,17 +300,6 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { } } - /** - * Mark a message as already dispatched - * @param message - */ - public synchronized void dispatched(MessageReference message) { - super.dispatched(message); - for (PendingMessageCursor cursor : storePrefetches) { - cursor.dispatched(message); - } - } - protected synchronized PendingMessageCursor getNextCursor() throws Exception { if (currentCursor == null || currentCursor.isEmpty()) { currentCursor = null; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java index 809ba2d531..1ae09c0c4f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java @@ -64,20 +64,7 @@ class TopicStorePrefetch extends AbstractStoreCursor { throw new RuntimeException("Not supported"); } - /** - * Mark a message as already dispatched - * @param message - */ - public synchronized void dispatched(MessageReference message) { - if (this.audit != null) { - isDuplicate(message.getMessageId()); - Message removed = this.batchList.remove(message.getMessageId()); - if (removed != null) { - removed.decrementReferenceCount(); - } - } - } - + public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { MessageEvaluationContext messageEvaluationContext = new NonCachedMessageEvaluationContext(); messageEvaluationContext.setMessageReference(message);