ARTEMIS-1604 Artemis deadlock using MQTT Protocol

Direct and async deliveries lock QueueImpl::this and
ServerConsumerImpl::this in different order causing deadlock:
has been introduced a deliverLock to prevent both type of delivers
to concurrently happen, making irrelevant the lock ordering.
This commit is contained in:
Francesco Nigro 2019-03-11 14:04:35 +01:00
parent a6dc57967b
commit c83fce8db4
1 changed files with 70 additions and 46 deletions

View File

@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -210,6 +211,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final Runnable deliverRunner = new DeliverRunner(); private final Runnable deliverRunner = new DeliverRunner();
//This lock is used to prevent deadlocks between direct and async deliveries
private final ReentrantLock deliverLock = new ReentrantLock();
private volatile boolean depagePending = false; private volatile boolean depagePending = false;
private final StorageManager storageManager; private final StorageManager storageManager;
@ -881,7 +885,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return; return;
} }
if (supportsDirectDeliver && !directDeliver && direct && System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) { if (direct && supportsDirectDeliver && !directDeliver && System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Checking to re-enable direct deliver on queue " + this.getName()); logger.trace("Checking to re-enable direct deliver on queue " + this.getName());
} }
@ -3069,57 +3073,74 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
* This method delivers the reference on the callers thread - this can give us better latency in the case there is nothing in the queue * This method delivers the reference on the callers thread - this can give us better latency in the case there is nothing in the queue
*/ */
private boolean deliverDirect(final MessageReference ref) { private boolean deliverDirect(final MessageReference ref) {
synchronized (this) { //The order to enter the deliverLock re QueueImpl::this lock is very important:
if (!supportsDirectDeliver) { //- acquire deliverLock::lock
return false; //- acquire QueueImpl::this lock
} //DeliverRunner::run is doing the same to avoid deadlocks.
if (paused || !canDispatch() && redistributor == null) { //Without deliverLock, a directDeliver happening while a DeliverRunner::run
return false; //could cause a deadlock.
} //Both DeliverRunner::run and deliverDirect could trigger a ServerConsumerImpl::individualAcknowledge:
//- deliverDirect first acquire QueueImpl::this, then ServerConsumerImpl::this
if (checkExpired(ref)) { //- DeliverRunner::run first acquire ServerConsumerImpl::this then QueueImpl::this
return true; if (!deliverLock.tryLock()) {
} logger.tracef("Cannot perform a directDelivery because there is a running async deliver");
return false;
consumers.reset(); }
try {
while (consumers.hasNext() || redistributor != null) { synchronized (this) {
if (!supportsDirectDeliver) {
ConsumerHolder<? extends Consumer> holder = redistributor == null ? consumers.next() : redistributor; return false;
Consumer consumer = holder.consumer; }
if (paused || !canDispatch() && redistributor == null) {
final SimpleString groupID = extractGroupID(ref); return false;
Consumer groupConsumer = getGroupConsumer(groupID);
if (groupConsumer != null) {
consumer = groupConsumer;
} }
HandleStatus status = handle(ref, consumer); if (checkExpired(ref)) {
if (status == HandleStatus.HANDLED) {
if (redistributor == null) {
handleMessageGroup(ref, consumer, groupConsumer, groupID);
}
messagesAdded.incrementAndGet();
deliveriesInTransit.countUp();
proceedDeliver(consumer, ref);
consumers.reset();
return true; return true;
} }
if (redistributor != null || groupConsumer != null) { consumers.reset();
break;
}
}
if (logger.isTraceEnabled()) { while (consumers.hasNext() || redistributor != null) {
logger.tracef("Queue " + getName() + " is out of direct delivery as no consumers handled a delivery");
ConsumerHolder<? extends Consumer> holder = redistributor == null ? consumers.next() : redistributor;
Consumer consumer = holder.consumer;
final SimpleString groupID = extractGroupID(ref);
Consumer groupConsumer = getGroupConsumer(groupID);
if (groupConsumer != null) {
consumer = groupConsumer;
}
HandleStatus status = handle(ref, consumer);
if (status == HandleStatus.HANDLED) {
if (redistributor == null) {
handleMessageGroup(ref, consumer, groupConsumer, groupID);
}
messagesAdded.incrementAndGet();
deliveriesInTransit.countUp();
proceedDeliver(consumer, ref);
consumers.reset();
return true;
}
if (redistributor != null || groupConsumer != null) {
break;
}
}
if (logger.isTraceEnabled()) {
logger.tracef("Queue " + getName() + " is out of direct delivery as no consumers handled a delivery");
}
return false;
} }
return false; } finally {
deliverLock.unlock();
} }
} }
@ -3464,8 +3485,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
enterCritical(CRITICAL_DELIVER); enterCritical(CRITICAL_DELIVER);
boolean needCheckDepage = false; boolean needCheckDepage = false;
try { try {
synchronized (QueueImpl.this.deliverRunner) { deliverLock.lock();
try {
needCheckDepage = deliver(); needCheckDepage = deliver();
} finally {
deliverLock.unlock();
} }
} finally { } finally {
leaveCritical(CRITICAL_DELIVER); leaveCritical(CRITICAL_DELIVER);