diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 81e7f2ef2e..0b289379d7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -99,6 +99,12 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us this.active = true; this.context = context; this.info = info; + int prefetch = info.getPrefetchSize(); + if (prefetch>0) { + prefetch += prefetch/2; + } + int depth = Math.max(prefetch, this.pending.getMaxAuditDepth()); + this.pending.setMaxAuditDepth(depth); if (!keepDurableSubsActive) { for (Iterator iter = destinations.values() .iterator(); iter.hasNext();) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index a6720c6ac1..8e7b7902a6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -127,25 +127,29 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } public void add(MessageReference node) throws Exception { - boolean pendingEmpty = false; - synchronized(pendingLock) { - pendingEmpty = pending.isEmpty(); - } - enqueueCounter++; - if (optimizedDispatch && !isFull() && pendingEmpty && !isSlave()) { - dispatch(node); - } else { - optimizePrefetch(); - synchronized(pendingLock) { - if (pending.isEmpty() && LOG.isDebugEnabled()) { - LOG.debug("Prefetch limit."); - } - pending.addMessageLast(node); - - } - dispatchPending(); - } - } + boolean pendingEmpty = false; + boolean dispatchPending = false; + synchronized (pendingLock) { + pendingEmpty = pending.isEmpty(); + enqueueCounter++; + if (optimizedDispatch && !isFull() && pendingEmpty && !isSlave()) { + pending.dispatched(node); + dispatch(node); + } else { + optimizePrefetch(); + synchronized (pendingLock) { + if (pending.isEmpty() && LOG.isDebugEnabled()) { + LOG.debug("Prefetch limit."); + } + pending.addMessageLast(node); + dispatchPending = true; + } + } + } + if (dispatchPending) { + dispatchPending(); + } + } public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception { synchronized(pendingLock) { @@ -511,8 +515,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { final Message message = node.getMessage(); if (message == null) { return false; - } - + } // Make sure we can dispatch a message. if (canDispatch(node) && !isSlave()) { MessageDispatch md = createMessageDispatch(node, message); @@ -520,11 +523,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription { if (node != QueueMessageReference.NULL_MESSAGE) { dispatchCounter++; dispatched.add(node); - if(pending != null) { - synchronized(pendingLock) { - pending.dispatched(message); - } - } } else { prefetchExtension = Math.max(0, prefetchExtension - 1); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java index a143319ed6..5633bf3d12 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java @@ -245,9 +245,26 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor { * Mark a message as already dispatched * @param message */ - public void dispatched(MessageReference message) { + public void dispatched(MessageReference message) { + //add it to the audit + isDuplicate(message.getMessageId()); + } + + /** + * set the audit + * @param audit + */ + public void setMessageAudit(ActiveMQMessageAudit audit) { + this.audit=audit; + } + + + /** + * @return the audit + */ + public ActiveMQMessageAudit getMessageAudit() { + return audit; } - protected synchronized boolean isDuplicate(MessageId messageId) { if (!this.enableAudit || this.audit==null) { @@ -265,6 +282,4 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor { protected synchronized boolean isStarted() { return started; } - - } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java index 13e3fad91c..20011fe6b2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java @@ -19,10 +19,12 @@ package org.apache.activemq.broker.region.cursors; import java.io.IOException; import java.util.LinkedList; +import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.Service; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.MessageId; import org.apache.activemq.usage.SystemUsage; /** @@ -253,6 +255,18 @@ public interface PendingMessageCursor extends Service { * @param message */ public void dispatched(MessageReference message); + + /** + * set the audit + * @param audit + */ + public void setMessageAudit(ActiveMQMessageAudit audit); + + + /** + * @return the audit - could be null + */ + public ActiveMQMessageAudit getMessageAudit(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index 8f2dc75208..cd9a9ab584 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -74,6 +74,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { started = true; super.start(); for (PendingMessageCursor tsp : storePrefetches) { + tsp.setMessageAudit(getMessageAudit()); tsp.start(); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java index 5521a648d2..dd9513cdaa 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java @@ -66,7 +66,9 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { nonPersistent.setMaxAuditDepth(getMaxAuditDepth()); nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit()); } + nonPersistent.setMessageAudit(getMessageAudit()); nonPersistent.start(); + persistent.setMessageAudit(getMessageAudit()); persistent.start(); pendingCount = persistent.size() + nonPersistent.size(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java index a59100a88d..55d254efd2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java @@ -30,7 +30,6 @@ import org.apache.activemq.command.MessageId; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.TopicMessageStore; -import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.UsageListener; import org.apache.commons.logging.Log; @@ -51,7 +50,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message private String subscriberName; private Destination regionDestination; private boolean batchResetNeeded = true; - private boolean storeMayHaveMoreMessages = true; + private boolean storeHasMessages = false; private boolean started; private final Subscription subscription; @@ -74,8 +73,8 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message if (!started) { started = true; super.start(); + this.storeHasMessages = getStoreSize() > 0; getSystemUsage().getMemoryUsage().addUsageListener(this); - safeFillBatch(); } } @@ -104,14 +103,14 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message public synchronized void addMessageLast(MessageReference node) throws Exception { if (node != null) { - storeMayHaveMoreMessages=true; + storeHasMessages=true; node.decrementReferenceCount(); } } public synchronized void addMessageFirst(MessageReference node) throws Exception { if (node != null) { - storeMayHaveMoreMessages=true; + storeHasMessages=true; node.decrementReferenceCount(); rollback(node.getMessageId()); } @@ -168,8 +167,6 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message } batchList.put(message.getMessageId(), message); - }else { - this.storeMayHaveMoreMessages=true; } } return true; @@ -208,14 +205,13 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message if (batchResetNeeded) { this.store.resetBatching(clientId, subscriberName); this.batchResetNeeded = false; - this.storeMayHaveMoreMessages = true; } - while (this.batchList.isEmpty() && this.storeMayHaveMoreMessages) { - this.storeMayHaveMoreMessages = false; + while (this.batchList.isEmpty() && this.storeHasMessages) { + this.storeHasMessages = false; this.store.recoverNextMessages(clientId, subscriberName, maxBatchSize, this); if (!this.batchList.isEmpty()) { - this.storeMayHaveMoreMessages=true; + this.storeHasMessages=true; } } } @@ -240,7 +236,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message public void onUsageChanged(Usage usage, int oldPercentUsage,int newPercentUsage) { if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) { - storeMayHaveMoreMessages = true; + storeHasMessages = true; try { fillBatch(); } catch (Exception e) { diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java index 8148d21024..650dcc7afa 100755 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java @@ -39,7 +39,7 @@ import org.apache.commons.logging.LogFactory; */ public class JmsDurableTopicSlowReceiveTest extends JmsTopicSendReceiveTest { - static final int NMSG = 100; + static final int NMSG = 200; static final int MSIZE = 256000; private static final transient Log LOG = LogFactory.getLog(JmsDurableTopicSlowReceiveTest.class); private static final String COUNT_PROPERY_NAME = "count";