This commit is contained in:
Clebert Suconic 2019-02-05 15:51:26 -05:00
commit 8873beb828
1 changed files with 19 additions and 7 deletions

View File

@ -112,11 +112,12 @@ import org.jboss.logging.Logger;
*/
public class QueueImpl extends CriticalComponentImpl implements Queue {
protected static final int CRITICAL_PATHS = 4;
protected static final int CRITICAL_PATHS = 5;
protected static final int CRITICAL_PATH_ADD_TAIL = 0;
protected static final int CRITICAL_PATH_ADD_HEAD = 1;
protected static final int CRITICAL_DELIVER = 2;
protected static final int CRITICAL_CONSUMER = 3;
protected static final int CRITICAL_CHECK_DEPAGE = 4;
private static final Logger logger = Logger.getLogger(QueueImpl.class);
private static final AtomicIntegerFieldUpdater dispatchingUpdater = AtomicIntegerFieldUpdater.newUpdater(QueueImpl.class, "dispatching");
@ -2409,7 +2410,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
* This method will deliver as many messages as possible until all consumers are busy or there
* are no more matching or available messages.
*/
private void deliver() {
private boolean deliver() {
if (logger.isDebugEnabled()) {
logger.debug(this + " doing deliver. messageReferences=" + messageReferences.size());
}
@ -2430,7 +2431,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
deliverAsync();
return;
return false;
}
if (System.currentTimeMillis() > timeout) {
@ -2440,7 +2441,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
deliverAsync();
return;
return false;
}
MessageReference ref;
@ -2451,7 +2452,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// Need to do these checks inside the synchronized
if (paused || !canDispatch() && redistributor == null) {
return;
return false;
}
if (messageReferences.size() == 0) {
@ -2571,7 +2572,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
checkDepage();
return true;
}
protected void removeMessageReference(ConsumerHolder<? extends Consumer> holder, MessageReference ref) {
@ -3423,13 +3424,24 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// We will be using the deliverRunner instance as the guard object to avoid multiple threads executing
// an asynchronous delivery
enterCritical(CRITICAL_DELIVER);
boolean needCheckDepage = false;
try {
synchronized (QueueImpl.this.deliverRunner) {
deliver();
needCheckDepage = deliver();
}
} finally {
leaveCritical(CRITICAL_DELIVER);
}
if (needCheckDepage) {
enterCritical(CRITICAL_CHECK_DEPAGE);
try {
checkDepage();
} finally {
leaveCritical(CRITICAL_CHECK_DEPAGE);
}
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorDelivering(e);
} finally {