Prevent any calls to wakeup becoming recursive calls into iterate() and instead queue a wakeup so that we don't miss dispatching any messages as things change in the memory usage.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1241077 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-02-06 16:59:07 +00:00
parent 1a566ecff7
commit a1d5ff0316
1 changed files with 12 additions and 7 deletions

View File

@ -82,8 +82,6 @@ import org.slf4j.MDC;
/** /**
* The Queue is a List of MessageEntry objects that are dispatched to matching * The Queue is a List of MessageEntry objects that are dispatched to matching
* subscriptions. * subscriptions.
*
*
*/ */
public class Queue extends BaseDestination implements Task, UsageListener { public class Queue extends BaseDestination implements Task, UsageListener {
protected static final Logger LOG = LoggerFactory.getLogger(Queue.class); protected static final Logger LOG = LoggerFactory.getLogger(Queue.class);
@ -105,12 +103,12 @@ public class Queue extends BaseDestination implements Task, UsageListener {
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
final Lock sendLock = new ReentrantLock(); final Lock sendLock = new ReentrantLock();
private ExecutorService executor; private ExecutorService executor;
protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections private final Map<MessageId, Runnable> messagesWaitingForSpace = new LinkedHashMap<MessageId, Runnable>();
.synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
private boolean useConsumerPriority = true; private boolean useConsumerPriority = true;
private boolean strictOrderDispatch = false; private boolean strictOrderDispatch = false;
private final QueueDispatchSelector dispatchSelector; private final QueueDispatchSelector dispatchSelector;
private boolean optimizedDispatch = false; private boolean optimizedDispatch = false;
private boolean iterationRunning = false;
private boolean firstConsumer = false; private boolean firstConsumer = false;
private int timeBeforeDispatchStarts = 0; private int timeBeforeDispatchStarts = 0;
private int consumersBeforeDispatchStarts = 0; private int consumersBeforeDispatchStarts = 0;
@ -1403,6 +1401,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
boolean pageInMoreMessages = false; boolean pageInMoreMessages = false;
synchronized (iteratingMutex) { synchronized (iteratingMutex) {
// If optimize dispatch is on or this is a slave this method could be called recursively
// we set this state value to short-circuit wakeup in those cases to avoid that as it
// could lead to errors.
iterationRunning = true;
// do early to allow dispatch of these waiting messages // do early to allow dispatch of these waiting messages
synchronized (messagesWaitingForSpace) { synchronized (messagesWaitingForSpace) {
Iterator<Runnable> it = messagesWaitingForSpace.values().iterator(); Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
@ -1454,14 +1457,14 @@ public class Queue extends BaseDestination implements Task, UsageListener {
messagesLock.readLock().lock(); messagesLock.readLock().lock();
try{ try{
pageInMoreMessages |= !messages.isEmpty(); pageInMoreMessages |= !messages.isEmpty();
}finally { } finally {
messagesLock.readLock().unlock(); messagesLock.readLock().unlock();
} }
pagedInPendingDispatchLock.readLock().lock(); pagedInPendingDispatchLock.readLock().lock();
try { try {
pageInMoreMessages |= !pagedInPendingDispatch.isEmpty(); pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
}finally { } finally {
pagedInPendingDispatchLock.readLock().unlock(); pagedInPendingDispatchLock.readLock().unlock();
} }
@ -1517,6 +1520,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
pendingWakeups.decrementAndGet(); pendingWakeups.decrementAndGet();
} }
MDC.remove("activemq.destination"); MDC.remove("activemq.destination");
iterationRunning = false;
return pendingWakeups.get() > 0; return pendingWakeups.get() > 0;
} }
} }
@ -1677,7 +1682,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} }
public void wakeup() { public void wakeup() {
if (optimizedDispatch || isSlave()) { if ((optimizedDispatch || isSlave()) && !iterationRunning) {
iterate(); iterate();
pendingWakeups.incrementAndGet(); pendingWakeups.incrementAndGet();
} else { } else {