ARTEMIS-2434 Improving Consumer/Queue Delivery Lock

This is a less invasive improvement then the one I proposed at PR #2772
or commit 7507a9fd4b
This commit is contained in:
Clebert Suconic 2019-07-27 10:58:39 -04:00
parent 8a1f267bd5
commit 28ea18ea79
2 changed files with 14 additions and 38 deletions

View File

@ -27,8 +27,6 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -65,6 +63,7 @@ import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
@ -107,12 +106,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private SlowConsumerDetectionListener slowConsumerListener;
/**
* We get a readLock when a message is handled, and return the readLock when the message is finally delivered
* When stopping the consumer we need to get a writeLock to make sure we had all delivery finished
* otherwise a rollback may get message sneaking in
*/
private final ReadWriteLock lockDelivery = new ReentrantReadWriteLock();
private final ReusableLatch pendingDelivery = new ReusableLatch(0);
private volatile AtomicInteger availableCredits = new AtomicInteger(0);
@ -481,7 +475,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
lockDelivery.readLock().lock();
pendingDelivery.countUp();
return HandleStatus.HANDLED;
}
@ -510,7 +504,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
deliverStandardMessage(reference, message);
}
} finally {
lockDelivery.readLock().unlock();
pendingDelivery.countDown();
callback.afterDelivery();
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.afterDeliver(this, reference));
@ -730,30 +724,20 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override
public void setStarted(final boolean started) {
synchronized (lock) {
boolean locked = lockDelivery();
// This is to make sure nothing would sneak to the client while started = false
// the client will stop the session and perform a rollback in certain cases.
// in case something sneaks to the client you could get to messaging delivering forever until
// you restart the server
try {
this.started = browseOnly || started;
} finally {
if (locked) {
lockDelivery.writeLock().unlock();
}
}
this.started = browseOnly || started;
}
// Outside the lock
if (started) {
promptDelivery();
} else {
flushDelivery();
}
}
private boolean lockDelivery() {
private boolean flushDelivery() {
try {
if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) {
if (!pendingDelivery.await(30, TimeUnit.SECONDS)) {
ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
if (server != null) {
server.threadDump();
@ -770,16 +754,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override
public void setTransferring(final boolean transferring) {
synchronized (lock) {
// This is to make sure that the delivery process has finished any pending delivery
// otherwise a message may sneak in on the client while we are trying to stop the consumer
boolean locked = lockDelivery();
try {
this.transferring = transferring;
} finally {
if (locked) {
lockDelivery.writeLock().unlock();
}
}
this.transferring = transferring;
}
// Outside the lock
@ -801,6 +776,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
if (!transferring) {
promptDelivery();
} else {
flushDelivery();
}
}
@ -1275,7 +1252,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
public boolean deliver() throws Exception {
lockDelivery.readLock().lock();
pendingDelivery.countUp();
try {
if (!started) {
return false;
@ -1392,7 +1369,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
return true;
} finally {
lockDelivery.readLock().unlock();
pendingDelivery.countDown();
}
}

View File

@ -271,7 +271,6 @@ public class GroupingTest extends JMSTestBase {
assertEquals(tm.getStringProperty("JMSXGroupID"), jmsxgroupID);
}
Thread.sleep(2000);
//session.rollback();
//session.close();
//consume all msgs from 2nd first consumer