When pulling a message, iterate the destinations first to make sure that it has pushed all available messages to

the sub.  This should fix the ZeroPrefetchTest that was intermitently failing on slower machines.



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@643390 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2008-04-01 13:22:48 +00:00
parent 3eb6aed961
commit 2a328ed27c
4 changed files with 83 additions and 65 deletions

View File

@ -27,12 +27,13 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.Task;
import org.apache.activemq.usage.MemoryUsage;
/**
* @version $Revision: 1.12 $
*/
public interface Destination extends Service {
public interface Destination extends Service, Task {
void addSubscription(ConnectionContext context, Subscription sub) throws Exception;

View File

@ -206,4 +206,8 @@ public class DestinationFilter implements Destination {
public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node) {
next.messageExpired(context, prefetchSubscription, node);
}
public boolean iterate() {
return next.iterate();
}
}

View File

@ -79,32 +79,43 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
/**
* Allows a message to be pulled on demand by a client
*/
public synchronized Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
// The slave should not deliver pull messages. TODO: when the slave
// becomes a master,
// He should send a NULL message to all the consumers to 'wake them up'
// in case
// they were waiting for a message.
if (getPrefetchSize() == 0 && !isSlave()) {
prefetchExtension++;
final long dispatchCounterBeforePull = dispatchCounter;
dispatchPending();
// If there was nothing dispatched.. we may need to setup a timeout.
if (dispatchCounterBeforePull == dispatchCounter) {
// imediate timeout used by receiveNoWait()
if (pull.getTimeout() == -1) {
// Send a NULL message.
add(QueueMessageReference.NULL_MESSAGE);
dispatchPending();
}
if (pull.getTimeout() > 0) {
Scheduler.executeAfterDelay(new Runnable() {
public void run() {
pullTimeout(dispatchCounterBeforePull);
}
}, pull.getTimeout());
}
final long dispatchCounterBeforePull;
synchronized(this) {
prefetchExtension++;
dispatchCounterBeforePull = dispatchCounter;
}
// Have the destination push us some messages.
for (Destination dest : destinations) {
dest.iterate();
}
dispatchPending();
synchronized(this) {
// If there was nothing dispatched.. we may need to setup a timeout.
if (dispatchCounterBeforePull == dispatchCounter) {
// imediate timeout used by receiveNoWait()
if (pull.getTimeout() == -1) {
// Send a NULL message.
add(QueueMessageReference.NULL_MESSAGE);
dispatchPending();
}
if (pull.getTimeout() > 0) {
Scheduler.executeAfterDelay(new Runnable() {
public void run() {
pullTimeout(dispatchCounterBeforePull);
}
}, pull.getTimeout());
}
}
}
}
return null;

View File

@ -99,6 +99,7 @@ public class Queue extends BaseDestination implements Task {
wakeup();
}
};
private final Object iteratingMutex = new Object() {};
private static final Comparator<Subscription>orderedCompare = new Comparator<Subscription>() {
@ -914,51 +915,52 @@ public class Queue extends BaseDestination implements Task {
* @see org.apache.activemq.thread.Task#iterate()
*/
public boolean iterate() {
RecoveryDispatch rd;
while ((rd = getNextRecoveryDispatch()) != null) {
try {
MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
msgContext.setDestination(destination);
for (QueueMessageReference node : rd.messages) {
if (!node.isDropped() && !node.isAcked() && (!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) {
msgContext.setMessageReference(node);
if (rd.subscription.matches(node, msgContext)) {
rd.subscription.add(node);
}
}
}
if( rd.subscription instanceof QueueBrowserSubscription ) {
((QueueBrowserSubscription)rd.subscription).decrementQueueRef();
}
} catch (Exception e) {
e.printStackTrace();
}
synchronized(iteratingMutex) {
RecoveryDispatch rd;
while ((rd = getNextRecoveryDispatch()) != null) {
try {
MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
msgContext.setDestination(destination);
for (QueueMessageReference node : rd.messages) {
if (!node.isDropped() && !node.isAcked() && (!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) {
msgContext.setMessageReference(node);
if (rd.subscription.matches(node, msgContext)) {
rd.subscription.add(node);
}
}
}
if( rd.subscription instanceof QueueBrowserSubscription ) {
((QueueBrowserSubscription)rd.subscription).decrementQueueRef();
}
} catch (Exception e) {
e.printStackTrace();
}
}
boolean result = false;
synchronized (messages) {
result = !messages.isEmpty();
}
if (result) {
try {
pageInMessages(false);
} catch (Throwable e) {
log.error("Failed to page in more queue messages ", e);
}
}
synchronized(messagesWaitingForSpace) {
while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) {
Runnable op = messagesWaitingForSpace.removeFirst();
op.run();
}
}
return false;
}
boolean result = false;
synchronized (messages) {
result = !messages.isEmpty();
}
if (result) {
try {
pageInMessages(false);
} catch (Throwable e) {
log.error("Failed to page in more queue messages ", e);
}
}
synchronized(messagesWaitingForSpace) {
while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) {
Runnable op = messagesWaitingForSpace.removeFirst();
op.run();
}
}
return false;
}
protected MessageReferenceFilter createMessageIdFilter(final String messageId) {