From 28ea18ea790820257e279bd82d14aa857f37a80c Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Sat, 27 Jul 2019 10:58:39 -0400 Subject: [PATCH] ARTEMIS-2434 Improving Consumer/Queue Delivery Lock This is a less invasive improvement then the one I proposed at PR #2772 or commit 7507a9fd4b282523c2b2f3517ed788153a35df4c --- .../core/server/impl/ServerConsumerImpl.java | 51 +++++-------------- .../integration/jms/client/GroupingTest.java | 1 - 2 files changed, 14 insertions(+), 38 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index c709d4ec77..568af35bca 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -27,8 +27,6 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -65,6 +63,7 @@ import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.FutureLatch; +import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.jboss.logging.Logger; @@ -107,12 +106,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { private SlowConsumerDetectionListener slowConsumerListener; - /** - * We get a readLock when a message is handled, and return the readLock when the message is finally delivered - * When stopping the consumer we need to get a writeLock to make sure we had all delivery finished - * otherwise a rollback may get message sneaking in - */ - private final ReadWriteLock lockDelivery = new ReentrantReadWriteLock(); + private final ReusableLatch pendingDelivery = new ReusableLatch(0); private volatile AtomicInteger availableCredits = new AtomicInteger(0); @@ -481,7 +475,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } - lockDelivery.readLock().lock(); + pendingDelivery.countUp(); return HandleStatus.HANDLED; } @@ -510,7 +504,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { deliverStandardMessage(reference, message); } } finally { - lockDelivery.readLock().unlock(); + pendingDelivery.countDown(); callback.afterDelivery(); if (server.hasBrokerMessagePlugins()) { server.callBrokerMessagePlugins(plugin -> plugin.afterDeliver(this, reference)); @@ -730,30 +724,20 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { @Override public void setStarted(final boolean started) { synchronized (lock) { - boolean locked = lockDelivery(); - - // This is to make sure nothing would sneak to the client while started = false - // the client will stop the session and perform a rollback in certain cases. - // in case something sneaks to the client you could get to messaging delivering forever until - // you restart the server - try { - this.started = browseOnly || started; - } finally { - if (locked) { - lockDelivery.writeLock().unlock(); - } - } + this.started = browseOnly || started; } // Outside the lock if (started) { promptDelivery(); + } else { + flushDelivery(); } } - private boolean lockDelivery() { + private boolean flushDelivery() { try { - if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) { + if (!pendingDelivery.await(30, TimeUnit.SECONDS)) { ActiveMQServerLogger.LOGGER.timeoutLockingConsumer(); if (server != null) { server.threadDump(); @@ -770,16 +754,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { @Override public void setTransferring(final boolean transferring) { synchronized (lock) { - // This is to make sure that the delivery process has finished any pending delivery - // otherwise a message may sneak in on the client while we are trying to stop the consumer - boolean locked = lockDelivery(); - try { - this.transferring = transferring; - } finally { - if (locked) { - lockDelivery.writeLock().unlock(); - } - } + this.transferring = transferring; } // Outside the lock @@ -801,6 +776,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { if (!transferring) { promptDelivery(); + } else { + flushDelivery(); } } @@ -1275,7 +1252,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } public boolean deliver() throws Exception { - lockDelivery.readLock().lock(); + pendingDelivery.countUp(); try { if (!started) { return false; @@ -1392,7 +1369,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { return true; } finally { - lockDelivery.readLock().unlock(); + pendingDelivery.countDown(); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java index ba8cd95304..fc32c44c2d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java @@ -271,7 +271,6 @@ public class GroupingTest extends JMSTestBase { assertEquals(tm.getStringProperty("JMSXGroupID"), jmsxgroupID); } - Thread.sleep(2000); //session.rollback(); //session.close(); //consume all msgs from 2nd first consumer