From 2a328ed27cd5f19b0c96aeda7176a6aa146ddf6a Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Tue, 1 Apr 2008 13:22:48 +0000 Subject: [PATCH] When pulling a message, iterate the destinations first to make sure that it has pushed all available messages to the sub. This should fix the ZeroPrefetchTest that was intermitently failing on slower machines. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@643390 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/region/Destination.java | 3 +- .../broker/region/DestinationFilter.java | 4 + .../broker/region/PrefetchSubscription.java | 51 ++++++----- .../apache/activemq/broker/region/Queue.java | 90 ++++++++++--------- 4 files changed, 83 insertions(+), 65 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java index b470ee5efb..8cb9d22e93 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -27,12 +27,13 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.thread.Task; import org.apache.activemq.usage.MemoryUsage; /** * @version $Revision: 1.12 $ */ -public interface Destination extends Service { +public interface Destination extends Service, Task { void addSubscription(ConnectionContext context, Subscription sub) throws Exception; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index 7635675871..93fa409b06 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -206,4 +206,8 @@ public class DestinationFilter implements Destination { public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node) { next.messageExpired(context, prefetchSubscription, node); } + + public boolean iterate() { + return next.iterate(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 20079598c4..d84544e114 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -79,32 +79,43 @@ public abstract class PrefetchSubscription extends AbstractSubscription { /** * Allows a message to be pulled on demand by a client */ - public synchronized Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception { + public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception { // The slave should not deliver pull messages. TODO: when the slave // becomes a master, // He should send a NULL message to all the consumers to 'wake them up' // in case // they were waiting for a message. if (getPrefetchSize() == 0 && !isSlave()) { - prefetchExtension++; - final long dispatchCounterBeforePull = dispatchCounter; - dispatchPending(); - // If there was nothing dispatched.. we may need to setup a timeout. - if (dispatchCounterBeforePull == dispatchCounter) { - // imediate timeout used by receiveNoWait() - if (pull.getTimeout() == -1) { - // Send a NULL message. - add(QueueMessageReference.NULL_MESSAGE); - dispatchPending(); - } - if (pull.getTimeout() > 0) { - Scheduler.executeAfterDelay(new Runnable() { - - public void run() { - pullTimeout(dispatchCounterBeforePull); - } - }, pull.getTimeout()); - } + final long dispatchCounterBeforePull; + synchronized(this) { + prefetchExtension++; + dispatchCounterBeforePull = dispatchCounter; + } + + // Have the destination push us some messages. + for (Destination dest : destinations) { + dest.iterate(); + } + dispatchPending(); + + synchronized(this) { + // If there was nothing dispatched.. we may need to setup a timeout. + if (dispatchCounterBeforePull == dispatchCounter) { + // imediate timeout used by receiveNoWait() + if (pull.getTimeout() == -1) { + // Send a NULL message. + add(QueueMessageReference.NULL_MESSAGE); + dispatchPending(); + } + if (pull.getTimeout() > 0) { + Scheduler.executeAfterDelay(new Runnable() { + + public void run() { + pullTimeout(dispatchCounterBeforePull); + } + }, pull.getTimeout()); + } + } } } return null; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 21c2c3d7a8..c2616f6705 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -99,6 +99,7 @@ public class Queue extends BaseDestination implements Task { wakeup(); } }; + private final Object iteratingMutex = new Object() {}; private static final ComparatororderedCompare = new Comparator() { @@ -914,51 +915,52 @@ public class Queue extends BaseDestination implements Task { * @see org.apache.activemq.thread.Task#iterate() */ public boolean iterate() { - - RecoveryDispatch rd; - while ((rd = getNextRecoveryDispatch()) != null) { - try { - MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); - msgContext.setDestination(destination); - - for (QueueMessageReference node : rd.messages) { - if (!node.isDropped() && !node.isAcked() && (!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) { - msgContext.setMessageReference(node); - if (rd.subscription.matches(node, msgContext)) { - rd.subscription.add(node); - } - } - } - - if( rd.subscription instanceof QueueBrowserSubscription ) { - ((QueueBrowserSubscription)rd.subscription).decrementQueueRef(); - } - - } catch (Exception e) { - e.printStackTrace(); - } + synchronized(iteratingMutex) { + RecoveryDispatch rd; + while ((rd = getNextRecoveryDispatch()) != null) { + try { + MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); + msgContext.setDestination(destination); + + for (QueueMessageReference node : rd.messages) { + if (!node.isDropped() && !node.isAcked() && (!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) { + msgContext.setMessageReference(node); + if (rd.subscription.matches(node, msgContext)) { + rd.subscription.add(node); + } + } + } + + if( rd.subscription instanceof QueueBrowserSubscription ) { + ((QueueBrowserSubscription)rd.subscription).decrementQueueRef(); + } + + } catch (Exception e) { + e.printStackTrace(); + } + } + + boolean result = false; + synchronized (messages) { + result = !messages.isEmpty(); + } + + if (result) { + try { + pageInMessages(false); + + } catch (Throwable e) { + log.error("Failed to page in more queue messages ", e); + } + } + synchronized(messagesWaitingForSpace) { + while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) { + Runnable op = messagesWaitingForSpace.removeFirst(); + op.run(); + } + } + return false; } - - boolean result = false; - synchronized (messages) { - result = !messages.isEmpty(); - } - - if (result) { - try { - pageInMessages(false); - - } catch (Throwable e) { - log.error("Failed to page in more queue messages ", e); - } - } - synchronized(messagesWaitingForSpace) { - while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) { - Runnable op = messagesWaitingForSpace.removeFirst(); - op.run(); - } - } - return false; } protected MessageReferenceFilter createMessageIdFilter(final String messageId) {