From 64ba930f43b8fa3a78730d875d7e354cb9631675 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Thu, 25 Jul 2019 09:23:47 +0200 Subject: [PATCH] ARTEMIS-2434 Don't lock ServerConsumerImpl for long period of time --- .../core/server/impl/ServerConsumerImpl.java | 79 +++++++++++-------- 1 file changed, 45 insertions(+), 34 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index c709d4ec77..e19b1e5c03 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -729,58 +730,68 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { @Override public void setStarted(final boolean started) { - synchronized (lock) { - boolean locked = lockDelivery(); - + lockDelivery(locked -> { // This is to make sure nothing would sneak to the client while started = false // the client will stop the session and perform a rollback in certain cases. // in case something sneaks to the client you could get to messaging delivering forever until // you restart the server - try { - this.started = browseOnly || started; - } finally { - if (locked) { - lockDelivery.writeLock().unlock(); - } - } - } - + this.started = browseOnly || started; + }); // Outside the lock if (started) { promptDelivery(); } } - private boolean lockDelivery() { - try { - if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) { - ActiveMQServerLogger.LOGGER.timeoutLockingConsumer(); - if (server != null) { - server.threadDump(); + private static final long LOCK_DELIVERY_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30); + private static final long TRY_LOCK_NS = TimeUnit.MILLISECONDS.toNanos(100); + + private boolean lockDelivery(java.util.function.Consumer task) { + final long startWait = System.nanoTime(); + long now; + while (((now = System.nanoTime()) - startWait) < LOCK_DELIVERY_TIMEOUT_NS) { + try { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); + } + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e); + synchronized (lock) { + task.accept(false); } return false; } - return true; - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e); - return false; + synchronized (lock) { + if (lockDelivery.writeLock().tryLock()) { + try { + task.accept(true); + } finally { + lockDelivery.writeLock().unlock(); + } + return true; + } + } + //entering the lock can take some time: discount that time from the + //time before attempting to lock delivery + final long timeToLock = System.nanoTime() - now; + if (timeToLock < TRY_LOCK_NS) { + final long timeToWait = TRY_LOCK_NS - timeToLock; + LockSupport.parkNanos(timeToWait); + } } + ActiveMQServerLogger.LOGGER.timeoutLockingConsumer(); + if (server != null) { + server.threadDump(); + } + synchronized (lock) { + task.accept(false); + } + return false; } @Override public void setTransferring(final boolean transferring) { - synchronized (lock) { - // This is to make sure that the delivery process has finished any pending delivery - // otherwise a message may sneak in on the client while we are trying to stop the consumer - boolean locked = lockDelivery(); - try { - this.transferring = transferring; - } finally { - if (locked) { - lockDelivery.writeLock().unlock(); - } - } - } + lockDelivery(locked -> this.transferring = transferring); // Outside the lock if (transferring) {